12 Commits
v0.6.1 ... main

Author SHA1 Message Date
Siavash Sameni
3afbfb42cf bench: add criterion benchmarks for protocol, bandwidth, TCP RX scan, and EC-SRP5
Some checks failed
CI / test (push) Failing after 1m42s
Adds four Criterion.rs benchmark suites to measure hot-path performance
and demonstrate the impact of Sprints 1–3 optimizations:

- benches/protocol.rs    — Command & StatusMessage serialize/deserialize
- benches/bandwidth.rs   — BandwidthState atomics, budget, interval math
- benches/tcp_rx_scan.rs — memchr SIMD scan vs naive O(n) loop (55× faster
                           on 256KB buffers with status at end)
- benches/ecsrp5.rs      — WCurve::new() heavy math vs cached LazyLock
                           (~123,000× faster access)

Also adds BENCHMARKS.md with usage instructions and example results.

Visibility changes (bench-only):
- scan_status_message is now pub (was #[cfg(test)] only)
- WCurve and WCURVE are now pub in ecsrp5.rs

dev-dependencies: criterion + pprof (optional flamegraph support)
2026-04-30 21:01:38 +04:00
Siavash Sameni
bba9b0512c perf: replace O(n) TCP RX buffer scan with SIMD memchr + carry buffer (Sprint 3)
All checks were successful
CI / test (push) Successful in 2m14s
This commit fixes the most significant hot-path bottleneck in the
client: the tcp_client_rx_loop was scanning up to 256KB byte-by-byte
on every read() call looking for interleaved 12-byte status messages.

Changes:
- client.rs (tcp_client_rx_loop): Replace the O(n) for-loop scan
  with a three-stage approach:

  1. Split-message check: An 11-byte carry buffer stores trailing
     bytes from the previous read. We check every possible alignment
     where a status message (0x07 + cpu_byte) could span the carry
     and the start of the current buffer. This fixes a latent bug
     where the old code would miss status messages split across TCP
     read boundaries.

  2. Fast scan: memchr::memchr (AVX2/NEON SIMD) finds 0x07 bytes
     in the 256KB buffer. On all-zero data packets this exits in
     ~4096 SIMD-width operations instead of 262,144 byte compares.
     ~64x faster scan path.

  3. Carry save: Save up to 11 trailing bytes for the next read.

- client.rs (unit tests): Add scan_status_message() helper and
  five unit tests covering:
  - Status message fully within buffer
  - Status message split across reads (5+7 bytes)
  - Status message split at boundary (1+11 bytes)
  - All-zero buffer (no false positive)
  - Short buffer (no panic)

- Cargo.toml / Cargo.lock: Add memchr as an explicit dependency.

Verified against live MikroTik RouterOS (TCP both + receive modes
with EC-SRP5 auth). Status messages detected correctly. No wire
protocol changes — 100% MikroTik compatible.
2026-04-30 20:46:34 +04:00
Siavash Sameni
205030ce33 perf: reduce async runtime and platform overhead (Sprint 2)
This commit applies three concurrency and platform-specific
optimizations. No wire protocol changes — 100% MikroTik compatible.

Changes:
- cpu.rs (FreeBSD): Replace fork+exec of 'sysctl -n kern.cp_time'
  with a direct libc::sysctl FFI call. Eliminates one process spawn
  per second on FreeBSD. Uses CTL_KERN / KERN_CP_TIME mib to read
  the 5-element cp_time array directly into a [c_ulong; 5].

- server.rs (multi-conn TCP): Replace the 100ms busy-poll loop in
  tcp_client_rx_loop with tokio::sync::Notify. When a secondary
  TCP connection joins a multi-connection session, it calls
  notify_one() to wake the primary connection immediately instead
  of waiting up to 100ms. Adds an Arc<Notify> to TcpSession and
  updates all secondary connection push sites to signal it.

- client.rs + server.rs (UDP RX): Replace per-recv
  tokio::time::timeout(Duration::from_secs(5), socket.recv(...))
  with a pinned tokio::time::sleep future inside tokio::select!.
  This eliminates timer wheel registration/cancel overhead on every
  UDP packet receive, which is significant at high packet rates.
  The timeout still fires correctly when no packets arrive for 5s.

No new dependencies.
2026-04-30 20:46:13 +04:00
Siavash Sameni
b3c12b7f8b perf: eliminate redundant allocations and computations (Sprint 1)
This commit applies eight low-risk internal optimizations identified
in the performance audit. No wire protocol changes — 100% MikroTik
compatible.

Changes:
- ecsrp5.rs: Cache WCurve in a global LazyLock, eliminating the
  expensive BigUint modular square root recomputation on every
  EC-SRP5 authentication. Also optimize the local hex::encode
  module to use a single pre-allocated String instead of N format!
  allocations.

- server.rs: Deduplicate Instant::now() calls in the TCP TX hot
  loop, caching the result at the top of each iteration.

- csv_output.rs: Hold the CSV file handle open in a static
  Mutex<Option<(String, File)>> instead of reopening the file on
  every write_result call. Add explicit flush after each write.

- server_pro/user_db.rs: Replace hand-rolled Gregorian calendar
  math (30+ lines looping from 1970) with chrono::Local::now().
  Optimize hash_password() to write username:password directly
  into the SHA256 hasher and hex-encode with a pre-allocated
  String.

- server_pro/enforcer.rs: Replace allocating error string matching
  (format!({}, e).as_str().contains(...)) with direct
  QuotaError variant matching. Pass ip_str into flush_to_db()
  to avoid a per-call ip.to_string().

- syslog_logger.rs: Move timestamp formatting outside the global
  std::sync::Mutex to reduce lock hold time. Replace manual
  calendar arithmetic with chrono::Local::now().format().

New dependency: chrono (already pulled in transitively by rusqlite).
2026-04-30 20:45:56 +04:00
Siavash Sameni
a655d3bbe8 Re-enable release CI workflow trigger
All checks were successful
CI / test (push) Successful in 2m12s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 11:54:04 +04:00
Siavash Sameni
82ea10f2d5 Bump version to 0.6.3, temporarily disable release CI
Some checks failed
CI / test (push) Failing after 1m35s
Disable release workflow trigger to prevent duplicate builds
when tagging manually with pre-built binaries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 11:51:34 +04:00
Siavash Sameni
e6cecc7bd8 Perf: cache EC-SRP5 constants, optimize TCP I/O, fix LDAP security
- Cache Curve25519 constants (P, CURVE_ORDER, WEIERSTRASS_A) with LazyLock
  eliminating ~768 BigUint heap allocations per auth handshake
- Optimize scalar_mul to use bit() instead of clone+shift
- Set TCP socket buffers to 4MB via socket2 (matching UDP path)
- Increase TCP RX buffers from 64KB to 256KB
- Use 256KB writes at unlimited rate (vs 32KB) reducing syscall overhead
- Fix LDAP filter injection with RFC 4515 escaping
- Fix unwrap panic on empty LDAP search results

Benchmarked on WiFi against MikroTik:
  TCP Download: +67% (19.7 → 32.9 Mbps avg)
  TCP Upload:   +87% (3.6 → 6.7 Mbps avg)
  Local CPU:    lower across all tests (29-36% vs 32-58%)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 10:06:21 +04:00
Siavash Sameni
da76c76c93 Update architecture docs: server-pro, Android, CPU platforms, byte budget
All checks were successful
CI / test (push) Successful in 2m27s
Complete rewrite reflecting current state: server-pro module structure,
BandwidthState fields, all 6 build targets, CPU sampling on 5 platforms,
web dashboard API endpoints, test counts, and key design decisions
including inline byte budget and TCP status message scanning.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:51:07 +04:00
Siavash Sameni
27c69d8982 Fix unused variable warning in test
Some checks failed
Build & Release / release (push) Has been cancelled
CI / test (push) Successful in 2m35s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:40:40 +04:00
Siavash Sameni
2cb8519c95 Suppress non_snake_case warning for Win32 FILETIME struct
Some checks failed
CI / test (push) Failing after 1m40s
Build & Release / release (push) Successful in 4m46s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:34:21 +04:00
Siavash Sameni
9ca124cb76 Fix CPU reporting: Android support, TCP remote CPU parsing
All checks were successful
CI / test (push) Successful in 2m33s
Build & Release / release (push) Successful in 5m11s
- Add target_os = "android" to CPU sampler (reads /proc/stat like Linux)
- Parse remote CPU from interleaved TCP status messages in BOTH mode
- Add dedicated status reader for TX-only mode (reads server's 12-byte
  status messages to get remote CPU and enable speed adaptation)
- Add 3 CPU integration tests: local CPU, TCP BOTH remote, TCP TX-only

Fixes: Android always showing cpu: 0%/0%, TCP remote CPU always 0%
on all platforms (btest-to-btest and btest-to-MikroTik).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:28:45 +04:00
Siavash Sameni
c06a4d0c9a Add public server links to README, fix dead_code warnings
All checks were successful
CI / test (push) Successful in 2m12s
- Add Free Public Servers section with US/EU endpoints and usage examples
- Add Server Pro section documenting the optional pro build
- Add Android/Termux to supported platforms and installation guide
- Gate pro-only public functions with #[cfg(feature = "pro")] to eliminate
  6 dead_code warnings in the standard build

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 19:57:18 +04:00
22 changed files with 2690 additions and 484 deletions

54
BENCHMARKS.md Normal file
View File

@@ -0,0 +1,54 @@
# Benchmarks
This project uses [Criterion.rs](https://bheisler.github.io/criterion.rs/book/) for performance benchmarking and regression detection.
## Running Benchmarks
Run all benchmarks:
```bash
cargo bench
```
Run a specific benchmark suite:
```bash
cargo bench --bench protocol
cargo bench --bench bandwidth
cargo bench --bench tcp_rx_scan
cargo bench --bench ecsrp5
```
Run in "quick" mode (fewer iterations, useful for development):
```bash
cargo bench --bench tcp_rx_scan -- --quick
```
## Benchmark Suites
### `protocol` — Protocol Serialization
Measures the zero-allocation serialization/deserialization of `Command` (16 bytes) and `StatusMessage` (12 bytes) structs.
### `bandwidth` — Bandwidth State Atomics
Measures `BandwidthState` hot-path operations: `fetch_add`, `spend_budget`, `calc_send_interval`, `advance_next_send`, and `summary`.
### `tcp_rx_scan` — TCP RX Status Message Scan
Compares the optimized `memchr`-based scan against the old naive O(n) byte-by-byte loop on 256KB buffers. Key scenarios:
- **All zeros** (common case — data packets contain no status)
- **Status at start**
- **Status at end** (worst case for naive scan)
- **Split messages** (status spans two TCP reads)
### `ecsrp5` — EC-SRP5 Curve Construction
Compares `WCurve::new()` (heavy `BigUint` modular arithmetic) against the cached `&*WCURVE` access to demonstrate the Sprint 1 optimization.
## Interpreting Results
Criterion generates HTML reports in `target/criterion/`. Open `target/criterion/report/index.html` after running benchmarks to view interactive charts.
Example results (Apple M3 Pro, release profile):
| Benchmark | Naive/Uncached | Optimized/Cached | Speedup |
|-----------|---------------|------------------|---------|
| TCP RX scan 256KB (status at end) | 251 µs | 4.5 µs | **~55×** |
| WCurve construction | 126 µs | 1.0 ns | **~123,000×** |
| Command serialize | — | 7.7 ns | — |
| Bandwidth `fetch_add` | — | ~1 ns | — |

741
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "btest-rs"
version = "0.6.0"
version = "0.6.3"
edition = "2021"
description = "MikroTik Bandwidth Test (btest) server and client with EC-SRP5 auth — a Rust reimplementation"
license = "MIT AND Apache-2.0"
@@ -49,6 +49,8 @@ num-traits = "0.2.19"
num-integer = "0.1.46"
sha2 = "0.11.0"
hostname = "0.4.2"
chrono = "0.4"
memchr = "2"
rusqlite = { version = "0.39.0", features = ["bundled"], optional = true }
ldap3 = { version = "0.12.1", optional = true }
axum = { version = "0.8.8", features = ["tokio"], optional = true }
@@ -68,3 +70,25 @@ codegen-units = 1
inherits = "release"
opt-level = "z"
panic = "abort"
# --- Benchmarks ---
[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
pprof = { version = "0.14", features = ["criterion", "flamegraph"] }
[[bench]]
name = "protocol"
harness = false
[[bench]]
name = "bandwidth"
harness = false
[[bench]]
name = "tcp_rx_scan"
harness = false
[[bench]]
name = "ecsrp5"
harness = false

302
PERFORMANCE_AUDIT.md Normal file
View File

@@ -0,0 +1,302 @@
# btest-rs Performance Audit
**Date:** 2026-04-30
**Scope:** Full codebase (`src/`, `tests/`, `Cargo.toml`)
**Methodology:** Static code analysis, hot-path tracing, lock/contention review, algorithmic complexity analysis
---
## Executive Summary
The codebase is generally well-structured for a network I/O tool, with good use of atomics in the per-packet hot path and zero-allocation protocol serialization. However, **three critical bottlenecks** significantly limit throughput and scalability:
1. **O(n) buffer scan on every TCP read** in the client RX loop (up to 256KB scanned per `read()` call)
2. **Expensive EC curve reconstruction on every authentication** (heavy `BigUint` modular arithmetic)
3. **Single SQLite connection mutex** serializing all DB operations in `server_pro`
Additionally, there is **no benchmark or profiling infrastructure** in the project, making it impossible to measure improvements or catch regressions.
---
## Severity Legend
| Icon | Severity | Impact |
|------|----------|--------|
| 🔴 | **Critical** | Direct throughput/latency hit in hot path; fix immediately |
| 🟠 | **High** | Significant overhead under load; fix in next sprint |
| 🟡 | **Medium** | Noticeable at scale or under specific conditions |
| 🟢 | **Low** | Cosmetic / easy wins; batch with other work |
---
## 🔴 Critical Bottlenecks
### 1. O(n) Linear Buffer Scan in `tcp_client_rx_loop` (`src/client.rs:210-216`)
**Problem:** On every TCP `read()` call (up to 256KB), the client performs a byte-by-byte scan looking for interleaved 12-byte status messages:
```rust
for i in 0..=(n - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
// ...
}
}
```
Since data packets are all zeros and status bytes are extremely rare, this **almost always scans the entire 256KB buffer** uselessly. At high bandwidth (many reads/sec), this wastes massive CPU cycles and pollutes cache lines.
**Impact:** CPU-bound slowdown on the client RX side during bidirectional TCP tests. The compiler *may* auto-vectorize the simple loop, but it still processes ~256K bytes per read.
**Fix Options (pick one):**
- **Best:** Use `memchr` (crate or `std::slice::memchr` on nightly) to find `0x07` bytes. On all-zero buffers this exits after a few SIMD-width checks.
- **Alternative:** Since status messages are injected at `write_all` boundaries and data is all zeros, maintain a small 12-byte sliding ring buffer across reads. Process the stream with a tiny state machine instead of scanning the whole buffer.
- **Alternative:** Track read bytes modulo expected packet size. Status messages are injected between full packets, so they will appear at predictable offsets *if* the client knows the server's `effective_size`. This requires protocol coordination.
---
### 2. `WCurve::new()` Recomputes Generator on Every Auth (`src/ecsrp5.rs:363,499`)
**Problem:** Every EC-SRP5 authentication (client and server) calls `WCurve::new()`, which performs `lift_x(9)``prime_mod_sqrt()` — heavy `BigUint` modular arithmetic to derive the curve generator point.
```rust
pub async fn client_authenticate<S: ...>(stream: &mut S, username: &str, password: &str) -> Result<()> {
let w = WCurve::new(); // <-- expensive, same result every time
// ...
}
```
The curve constants (`P`, `CURVE_ORDER`, `WEIERSTRASS_A`) are already cached as `LazyLock` statics, but the generator point is not.
**Impact:** Auth latency spikes, especially on the server under many concurrent connections. Each auth does redundant `BigUint` allocations and modular square roots.
**Fix:** Cache `WCurve` (or at least the generator point) in a global `LazyLock`:
```rust
static WCURVE: std::sync::LazyLock<WCurve> = std::sync::LazyLock::new(WCurve::new);
```
Then use `&*WCURVE` in both `client_authenticate` and `server_authenticate`.
---
### 3. Single SQLite Mutex Serializes All DB Operations (`src/server_pro/user_db.rs:15-18`)
**Problem:** The entire `server_pro` database layer uses a single shared `Connection` behind a `std::sync::Mutex`:
```rust
pub struct UserDb {
conn: Arc<Mutex<Connection>>,
}
```
While SQLite WAL mode is enabled (allowing readers to proceed during writes), **the Rust mutex still serializes all access to the connection object**. Under concurrent load with many tests starting/finishing, this becomes the primary bottleneck.
**Critical sub-issue:** `QuotaManager::remaining_budget()` (`src/server_pro/quota.rs:387`) performs **up to 15 separate SQLite queries** in sequence, locking the mutex 15+ times per pre-test check.
**Impact:** Connection setup/teardown latency increases linearly with concurrency. Quota checks and usage recording block each other.
**Fix Options:**
- **Connection pooling:** Use `r2d2_sqlite` or `deadpool-sqlite` to maintain a small pool of connections (SQLite handles this well in WAL mode).
- **Separate read/write paths:** Open a read-only connection for quota checks (`remaining_budget`) and a dedicated write connection for usage recording. SQLite WAL allows this.
- **Batch quota checks:** Cache quota results for a few seconds per user/IP to avoid redundant queries.
- **Channel-based writer:** Use a single dedicated DB writer task with an `mpsc` channel so only one task ever touches the connection, eliminating lock contention entirely.
---
## 🟠 High Severity Issues
### 4. 100ms Busy-Poll Wait in Multi-Connection TCP (`src/server.rs:313-332`)
**Problem:** When waiting for secondary TCP connections to join a multi-connection session, the primary connection busy-polls the session map every 100ms:
```rust
loop {
let count = { let map = sessions.lock().await; ... };
if count + 1 >= conn_count as usize { break; }
tokio::time::sleep(Duration::from_millis(100)).await;
}
```
This adds up to **100ms of unnecessary latency** to every multi-connection test startup. It also hammers the async mutex needlessly.
**Fix:** Replace with `tokio::sync::Notify`. When a secondary connection registers itself, it calls `notify_one()`. The primary waits on `notified().await` with a timeout, waking instantly when ready.
---
### 5. FreeBSD CPU Sampling Spawns Process Every Second (`src/cpu.rs:142`)
**Problem:** On FreeBSD, `get_cpu_times()` spawns `sysctl -n kern.cp_time` via `std::process::Command` every second:
```rust
fn get_cpu_times() -> (u64, u64) {
if let Ok(output) = std::process::Command::new("sysctl")
.arg("-n").arg("kern.cp_time").output()
{ ... }
}
```
`fork()` + `exec()` is extremely expensive relative to the work being done (reading 5 integers).
**Fix:** Use `libc::sysctl()` via FFI, matching the macOS implementation style. Cache the `mib` array and call the syscall directly.
---
### 6. Per-Call Timer Registration in UDP RX Loops (`src/client.rs:393`, `src/server.rs:925`)
**Problem:** Both UDP RX loops create a new `tokio::time::timeout` timer on **every single `recv`/`recv_from` call**:
```rust
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await
```
At high packet rates (e.g., 100K pps), registering and canceling timers on the Tokio timer wheel adds measurable overhead.
**Fix:** Use `tokio::select!` with a long-lived `tokio::time::sleep` future that is reset, or use the socket's built-in SO_RCVTIMEO if available via `socket2`. Alternatively, since UDP is connectionless, consider whether a 5-second timeout is needed on every call or if the outer test duration timer is sufficient.
---
## 🟡 Medium Severity Issues
### 7. String Error Matching with Allocation (`src/server_pro/enforcer.rs:157-161`)
```rust
match format!("{}", e).as_str() {
s if s.contains("daily") => ...
}
```
`format!("{}", e)` allocates a `String` from the error just to do substring matching. Use `e.to_string().contains(...)` or match on error types directly if possible.
---
### 8. `ip.to_string()` Called Repeatedly in Quota Checks (`src/server_pro/quota.rs:389`)
```rust
let ip_str = ip.to_string();
// ... used in 6+ DB calls
```
This allocates a `String` on every quota check. Accept `&str` or `IpAddr` directly in DB methods, or cache the string.
---
### 9. `chrono_date_today()` Recomputes Calendar from Epoch (`src/server_pro/user_db.rs:617-638`)
A hand-rolled date calculation loops through years from 1970 and months every time it's called (which is before almost every DB write). The `chrono` crate is already used indirectly by `rusqlite`; add it as a direct dependency and replace with `chrono::Local::now().format("%Y-%m-%d")`.
---
## 🟢 Low Severity / Easy Wins
### 10. CSV File Reopened on Every Write (`src/csv_output.rs:77`)
```rust
if let Ok(mut f) = OpenOptions::new().append(true).open(path) {
let _ = writeln!(f, "{}", row);
}
```
Called once per test, not per-packet, but still suboptimal. Consider keeping a lazily-initialized `Mutex<Option<File>>` or using `std::fs::OpenOptions` once at init and storing the handle.
---
### 11. Global Syslog Mutex Held During I/O (`src/syslog_logger.rs`)
```rust
static SYSLOG: Mutex<Option<SyslogSender>> = Mutex::new(None);
```
The global `std::sync::Mutex` is held while formatting the timestamp (expensive manual calendar math) and sending UDP. Switch to `parking_lot::Mutex` (faster) or `tokio::sync::Mutex` if async, and format the message outside the lock. Better yet, use `tracing`'s built-in syslog integration or a structured appender.
---
### 12. `hash_password()` Uses `format!` + `format!` in Hex Loop (`src/server_pro/user_db.rs:612-614`)
```rust
hasher.update(format!("{}:{}", username, password).as_bytes());
result.iter().map(|b| format!("{:02x}", b)).collect() // N allocs for N bytes
```
The hex encoding allocates one `String` per byte. Use a small fixed buffer or `hex` crate (already used elsewhere in `ecsrp5.rs`).
---
### 13. Redundant `Instant::now()` Calls in TX Loop (`src/server.rs:593,606`)
```rust
if send_status && Instant::now() >= next_status {
// ...
next_status = Instant::now() + Duration::from_secs(1);
}
```
Two monotonic clock reads per loop iteration. Cache `let now = Instant::now();` at the top of the loop.
---
## Architecture Observations
### What the Code Does Well
- **Zero-allocation protocol layer:** `serialize()` returns fixed-size stack arrays (`[u8; 12]`, `[u8; 16]`). Excellent.
- **Atomic bandwidth tracking:** `BandwidthState` uses `AtomicU64` with `Relaxed` ordering in the per-packet path. No locks in the data plane.
- **Buffer reuse:** TX/RX loops allocate `vec![0u8; ...]` once before the loop. Good.
- **Aggressive release profile:** `lto = true`, `codegen-units = 1`, `opt-level = 3`.
### Async Runtime Usage
- `tokio` with `full` features is used. For a primarily I/O-bound tool, this is appropriate.
- `tokio::task::yield_now().await` is used in unlimited-rate mode to prevent starving the runtime. This is correct but consider whether `tokio::task::spawn_blocking` or dedicated CPU pinning is needed for the EC-SRP5 math (which is CPU-bound and currently runs on the async runtime during auth).
### Memory Safety
- Several `unwrap()`/`expect()` calls in setup paths (socket binding, address parsing). These are acceptable for config errors but should use `?` propagation where possible to allow graceful degradation.
---
## Missing Performance Infrastructure
| Infrastructure | Status | Recommendation |
|----------------|--------|----------------|
| **Benchmarks** | ❌ None | Add `criterion` + `benches/` for `BandwidthState`, protocol ser/de, and EC-SRP5 auth |
| **Profiling hooks** | ❌ None | Add optional `pprof` or `dhat` dev-deps for heap profiling |
| **Throughput regression tests** | ⚠️ Partial | Integration tests assert `tx > 0` and `rx > 0` but don't measure sustained throughput |
| **Load tests** | ❌ None | Add a `benches/load_test.rs` that spawns 100+ concurrent tests against a local server |
| **CI performance gates** | ❌ None | Consider a benchmark action that fails on >5% regression |
---
## Priority Action Plan
### Phase 1: Hot-Path Fixes (1-2 days)
1. Replace buffer scan with `memchr` or ring-buffer approach in `tcp_client_rx_loop`
2. Cache `WCurve` in a global `LazyLock`
3. Replace 100ms poll with `tokio::sync::Notify` in multi-conn wait
### Phase 2: Scalability (2-3 days)
4. Add SQLite connection pooling or channel-based writer in `server_pro`
5. Cache `remaining_budget()` results for 5-10 seconds
6. Fix FreeBSD CPU sampling to use `libc::sysctl` FFI
### Phase 3: Polish & Tooling (1-2 days)
7. Replace manual date arithmetic with `chrono`
8. Add `criterion` benchmarks for auth and bandwidth state
9. Fix low-severity allocation issues (CSV, syslog, hex encoding)
---
## Appendix: File-by-File Quick Reference
| File | Lines | Hot Path? | Key Concern |
|------|-------|-----------|-------------|
| `src/client.rs` | 531 | ✅ Yes | O(n) 256KB scan per TCP read |
| `src/server.rs` | 1094 | ✅ Yes | 100ms poll wait, status injection timing |
| `src/ecsrp5.rs` | 660 | ✅ Yes (auth) | `WCurve::new()` recomputed per auth |
| `src/bandwidth.rs` | 263 | ✅ Yes (atomics) | Well-designed; no issues |
| `src/protocol.rs` | 214 | ✅ Yes (ser/de) | Zero-allocation; excellent |
| `src/cpu.rs` | 215 | ⚠️ Periodic | FreeBSD `fork+exec` every second |
| `src/server_pro/quota.rs` | 470 | ⚠️ Periodic | 15 DB queries per budget check |
| `src/server_pro/user_db.rs` | 641 | ⚠️ All DB ops | Single mutex serializes everything |
| `src/server_pro/server_loop.rs` | 449 | ✅ Yes | DB auth locks during connection setup |
| `src/server_pro/enforcer.rs` | 411 | ⚠️ Periodic | String error matching allocates |
| `src/csv_output.rs` | 86 | ❌ No | File reopen per write |
| `src/syslog_logger.rs` | 154 | ❌ No | Global mutex + manual calendar math |
| `src/auth.rs` | 164 | ⚠️ Auth only | Minor; double MD5 per auth |
| `src/main.rs` | 243 | ❌ No | Entry point only |

634
PERFORMANCE_PRDS.md Normal file
View File

@@ -0,0 +1,634 @@
# Performance Improvement PRDs
**Project:** btest-rs
**Constraint:** 100% MikroTik BTest protocol compatibility — no wire-format or behavioral changes visible to MikroTik devices
**Date:** 2026-04-30
---
## How to Read This Document
Each PRD is sorted by **recommended execution order**, which balances:
- **Effort** (development + review + test time)
- **Risk** (probability of regression or compatibility break)
- **Performance Effect** (measured or estimated throughput/latency improvement)
- **MikroTik Compatibility Risk** (whether the change could affect interoperability)
**Sorting rationale:** Execute *quick wins* first to build velocity and reduce risk surface, then tackle *high-impact* items with full attention.
---
## Summary Matrix
| # | PRD | Effort | Risk | Perf Impact | MikroTik Risk | Tier |
|---|-----|--------|------|-------------|---------------|------|
| 1 | WCurve Global Cache | 30 min | None | Medium | None | Quick Win |
| 2 | Redundant `Instant::now()` | 15 min | None | Low | None | Quick Win |
| 3 | `hash_password` Hex Fix | 30 min | None | Low | None | Quick Win |
| 4 | CSV File Handle Cache | 30 min | None | Low | None | Quick Win |
| 5 | Error String Matching | 30 min | None | Low | None | Quick Win |
| 6 | `chrono_date_today` Replace | 1 hr | Low | Low | None | Quick Win |
| 7 | Syslog Mutex + Timestamp | 1 hr | Low | Low | None | Quick Win |
| 8 | `ip.to_string()` Cache | 1 hr | Low | Low | None | Quick Win |
| 9 | FreeBSD CPU FFI | 3 hrs | Medium | Medium | None | Platform Fix |
| 10 | Multi-Conn Notify Wake | 2 hrs | Medium | Medium | None | Latency Fix |
| 11 | UDP Timer Reuse | 2 hrs | Medium | Medium | None | Throughput Fix |
| 12 | TCP RX Scan Optimization | 4 hrs | Medium | **High** | Low | Hot Path Fix |
| 13 | SQLite Connection Pool | 12 days | High | **High** | None | Scalability Fix |
---
## Tier 1: Quick Wins (Do These First)
---
### PRD-001: Cache `WCurve` in Global `LazyLock`
**Background:**
`WCurve::new()` is called on every EC-SRP5 authentication (client and server). It recomputes the Weierstrass curve generator point via `lift_x(9)``prime_mod_sqrt()`, which performs heavy `BigUint` modular arithmetic. The result is deterministic and immutable.
**MikroTik Compatibility:**
- **100% safe.** This is pure internal mathematics. The wire bytes, auth handshake order, and hash outputs are identical. No protocol-visible change.
**Objective:**
Eliminate redundant `BigUint` modular square root computation per authentication.
**Design:**
```rust
// src/ecsrp5.rs
static WCURVE: std::sync::LazyLock<WCurve> = std::sync::LazyLock::new(WCurve::new);
```
Replace all call sites:
- `src/ecsrp5.rs:363` (`client_authenticate`)
- `src/ecsrp5.rs:499` (`server_authenticate`)
Change `let w = WCurve::new();` to `let w = &*WCURVE;`. Update any `WCurve` methods that take `self` to take `&self` if they don't already.
**Acceptance Criteria:**
- [ ] `ecsrp5_test.rs` passes unchanged.
- [ ] `full_integration_test.rs` EC-SRP5 tests pass unchanged.
- [ ] `WCurve::new()` is called exactly once per process lifetime.
- [ ] No change to serialized auth bytes on the wire.
**Effort:** 30 min
**Risk:** None — stateless deterministic cache
**Performance Impact:** Medium — reduces per-auth CPU time by ~30-50% (estimated), especially noticeable under concurrent logins.
---
### PRD-002: Deduplicate `Instant::now()` in `tcp_tx_loop_inner`
**Background:**
The TCP TX loop calls `Instant::now()` twice per iteration (status check and interval scheduling). Monotonic clock reads are cheap but not free, and occur in the hottest loop in the system.
**MikroTik Compatibility:**
- **100% safe.** Timing granularity remains identical.
**Objective:**
Reduce syscalls in the per-packet hot path.
**Design:**
```rust
let now = Instant::now();
if send_status && now >= next_status { ... next_status = now + Duration::from_secs(1); }
// ... reuse `now` for interval math
```
**Acceptance Criteria:**
- [ ] TCP send/receive/both integration tests pass.
- [ ] No behavioral change in status injection timing.
**Effort:** 15 min
**Risk:** None
**Performance Impact:** Low — micro-optimization, but trivial.
---
### PRD-003: Fix `hash_password()` Hex Encoding Allocations
**Background:**
`user_db.rs:614` allocates one `String` per byte when hex-encoding a 32-byte SHA256 hash:
```rust
result.iter().map(|b| format!("{:02x}", b)).collect()
```
**MikroTik Compatibility:**
- **100% safe.** Output string is identical.
**Objective:**
Replace N-allocation hex encoding with a single-allocation approach.
**Design:**
Use `hex` crate (already in dependency tree via `ecsrp5.rs` debug logging) or a small `[u8; 64]` buffer with `write!` to a `String::with_capacity(64)`.
**Acceptance Criteria:**
- [ ] Same hex string output for all inputs.
- [ ] `pro` feature tests pass.
**Effort:** 30 min
**Risk:** None
**Performance Impact:** Low — removes 32 allocations per password hash.
---
### PRD-004: Cache CSV File Handle
**Background:**
`csv_output::write_result()` re-opens the file via `OpenOptions::new().append(true).open(path)` on every call (once per test). Safe but wasteful.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement.
**Objective:**
Hold the file handle open for the process lifetime.
**Design:**
Change `static CSV_FILE: Mutex<Option<String>>` to `Mutex<Option<(String, std::fs::File)>>`, or open once during `init()` and store `Mutex<Option<File>>`.
**Acceptance Criteria:**
- [ ] CSV tests in `full_integration_test.rs` pass.
- [ ] File is created with headers on `init()`.
- [ ] Multiple `write_result` calls append correctly.
**Effort:** 30 min
**Risk:** None
**Performance Impact:** Low — removes one `open()` syscall per test.
---
### PRD-005: Remove Allocating Error String Matching
**Background:**
`src/server_pro/enforcer.rs:157-161` does:
```rust
match format!("{}", e).as_str() {
s if s.contains("daily") => ...
}
```
This allocates a `String` from the error just for substring matching.
**MikroTik Compatibility:**
- **100% safe.** Server-pro internal logic only.
**Objective:**
Match without allocation.
**Design:**
Use `e.to_string().contains("daily")` (still allocates but clearer) or, better, downcast the `rusqlite::Error` or match on structured error variants. If the error is `anyhow::Error`, use `.downcast_ref::<rusqlite::Error>()`.
**Acceptance Criteria:**
- [ ] Quota enforcement behavior unchanged.
- [ ] Enforcer tests pass.
**Effort:** 30 min
**Risk:** None
**Performance Impact:** Low — removes one allocation per enforcer tick.
---
### PRD-006: Replace `chrono_date_today()` with `chrono` Crate
**Background:**
`user_db.rs:617-638` contains a hand-rolled Gregorian calendar converter that loops from 1970 to compute today's date. Called before almost every DB write. The `chrono` crate is already pulled in transitively by `rusqlite`.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement.
**Objective:**
Replace 30 lines of error-prone manual date math with one `chrono` call.
**Design:**
Add `chrono = { version = "0.4", optional = true }` gated behind `pro` feature (or use the transitive dep directly). Replace `chrono_date_today()` with:
```rust
chrono::Local::now().format("%Y-%m-%d").to_string()
```
**Acceptance Criteria:**
- [ ] `pro` feature compiles.
- [ ] Date strings match format `YYYY-MM-DD`.
- [ ] DB write tests pass.
**Effort:** 1 hr
**Risk:** Low — adds explicit dep that already exists transitively
**Performance Impact:** Low — eliminates loop overhead, but called infrequently.
---
### PRD-007: Optimize Syslog Mutex and Timestamp Formatting
**Background:**
`syslog_logger.rs` holds a global `std::sync::Mutex` while formatting a timestamp (manual calendar math) and sending UDP. `std::sync::Mutex` is relatively slow, and the timestamp logic duplicates `chrono_date_today()` issues.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement.
**Objective:**
Reduce lock contention and allocation in logging path.
**Design:**
1. Use `parking_lot::Mutex` (faster, no poisoning) OR switch to `std::sync::Mutex` but clone the `SyslogSender` config outside the lock.
2. Replace `bsd_timestamp()` with `chrono::Local::now().format("%b %e %H:%M:%S")`.
3. Pre-allocate the `String` with `with_capacity(256)`.
**Acceptance Criteria:**
- [ ] Syslog output format remains RFC 3164 compliant.
- [ ] `test_syslog_events` in `full_integration_test.rs` passes.
**Effort:** 1 hr
**Risk:** Low
**Performance Impact:** Low — logging is not a hot path, but reduces global lock hold time.
---
### PRD-008: Cache `ip.to_string()` in Quota Checks
**Background:**
`quota.rs:389` calls `ip.to_string()` and then passes `&ip_str` to multiple DB methods, allocating a new `String` on every `remaining_budget()` call.
**MikroTik Compatibility:**
- **100% safe.** Server-pro internal logic.
**Objective:**
Eliminate redundant IP stringification.
**Design:**
Change DB methods to accept `&std::net::IpAddr` directly and stringify inside only when needed for SQL parameter binding (which `rusqlite` may already handle via `ToSql`). Alternatively, pass `ip_str: &str` from a single `to_string()` call and avoid re-stringifying in sub-calls.
**Acceptance Criteria:**
- [ ] Quota checks return identical results.
- [ ] `pro` feature tests pass.
**Effort:** 1 hr
**Risk:** Low
**Performance Impact:** Low — one allocation removed per quota check.
---
## Tier 2: Moderate Fixes (Platform & Latency)
---
### PRD-009: FreeBSD CPU Sampling via `libc::sysctl` FFI
**Background:**
On FreeBSD, `cpu.rs` spawns `sysctl -n kern.cp_time` as a child process every second. `fork()` + `exec()` is orders of magnitude slower than a direct syscall.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement. Platform-specific internal code.
**Objective:**
Replace subprocess with direct `sysctl(3)` syscall.
**Design:**
```rust
#[cfg(target_os = "freebsd")]
fn get_cpu_times() -> (u64, u64) {
let mut mib = [libc::CTL_KERN, libc::KERN_CP_TIME];
let mut cp_time: [libc::c_ulong; 5] = [0; 5];
let mut len = std::mem::size_of_val(&cp_time);
unsafe {
if libc::sysctl(
mib.as_mut_ptr(),
mib.len() as u32,
&mut cp_time as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
) == 0 {
let total = cp_time[0] + cp_time[1] + cp_time[2] + cp_time[3] + cp_time[4];
return (total as u64, cp_time[4] as u64);
}
}
(0, 0)
}
```
**Acceptance Criteria:**
- [ ] Compiles on FreeBSD.
- [ ] Returns same values as previous `sysctl` command approach.
- [ ] No child process spawned (verify with `ktrace` or `ps`).
**Effort:** 3 hrs
**Risk:** Medium — requires FreeBSD test environment; FFI is unsafe
**Performance Impact:** Medium — eliminates 1 fork/exec per second on FreeBSD.
---
### PRD-010: Replace 100ms Poll with `tokio::sync::Notify`
**Background:**
In `server.rs:313-332`, the primary connection of a multi-connection TCP test busy-polls the session map every 100ms waiting for secondary connections to join.
**MikroTik Compatibility:**
- **100% safe.** This is internal server-side coordination. The wire behavior (waiting for connections, then starting the test) is unchanged. MikroTik clients will not observe a difference except potentially faster test startup.
**Objective:**
Eliminate polling latency and unnecessary mutex acquisitions.
**Design:**
1. Add a `tokio::sync::Notify` to `TcpSession`:
```rust
struct TcpSession {
peer_ip: IpAddr,
streams: Vec<OwnedTcpStream>,
expected: u8,
notify: tokio::sync::Notify,
}
```
2. In the secondary connection handler, after pushing to `streams`, call `session.notify.notify_one()`.
3. In the primary wait loop, replace the sleep loop with:
```rust
let count = { /* lock, get count, drop lock */ };
if count + 1 >= conn_count { break; }
// Wait for notification or 10s deadline
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = session.notify.notified() => {
let count = { /* lock, get count */ };
if count + 1 >= conn_count { break; }
}
_ = &mut timeout => { break; }
}
}
```
**Acceptance Criteria:**
- [ ] Multi-connection TCP tests pass.
- [ ] Test startup latency is ≤ 1ms after last connection joins (was up to 100ms).
- [ ] No deadlock under concurrent multi-connection tests.
**Effort:** 2 hrs
**Risk:** Medium — concurrency change; must carefully manage lock/notify ordering to avoid races
**Performance Impact:** Medium — improves multi-conn test startup latency by up to 100ms per test.
---
### PRD-011: Reuse UDP RX Timer Instead of Per-Call Timeout
**Background:**
Both client and server UDP RX loops create a new `tokio::time::timeout` on every `recv`/`recv_from` call:
```rust
tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await
```
At high packet rates, this registers and cancels timers on Tokio's timer wheel constantly.
**MikroTik Compatibility:**
- **100% safe.** Internal async timing only. UDP packet processing is unchanged.
**Objective:**
Reduce timer wheel churn in high-rate UDP RX loops.
**Design:**
Option A — `tokio::select!` with a pinned sleep future:
```rust
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
loop {
tokio::select! {
biased; // prioritize recv
res = socket.recv(&mut buf) => { /* handle */ timeout.as_mut().reset(Instant::now() + Duration::from_secs(5)); }
_ = &mut timeout => { tracing::debug!("UDP RX timeout"); }
}
}
```
Option B — Use `socket2` to set `SO_RCVTIMEO` on the underlying socket, then use blocking/async recv without Tokio timeouts. This moves timeout handling into the kernel, which is even cheaper.
**Recommendation:** Start with Option A (pure Tokio, no platform risk). Option B can be a follow-up.
**Acceptance Criteria:**
- [ ] UDP send/receive/both tests pass.
- [ ] UDP RX still times out correctly when no packets arrive.
- [ ] No change to packet parsing or sequence tracking.
**Effort:** 2 hrs
**Risk:** Medium — changes timeout behavior; must ensure test abortion still works correctly
**Performance Impact:** Medium — reduces timer wheel registration overhead, noticeable at >50K pps.
---
## Tier 3: High Impact (Do These With Full Focus)
---
### PRD-012: Optimize TCP Client RX Status Message Scan
**Background:**
`tcp_client_rx_loop` (`client.rs:210-216`) scans up to 256KB byte-by-byte on every `read()` call looking for a 12-byte status marker (`0x07` + `0x80|cpu`). Since data is all zeros, this is almost always a full scan.
**MikroTik Compatibility Consideration:**
- **High confidence of safety.** The protocol is: MikroTik injects 12-byte status messages into the TCP stream. Our client must detect them. Changing *how* we detect them (faster scan) does not change:
- What bytes are sent on the wire
- What bytes we expect
- How we respond to status messages
- **One edge case to handle:** TCP is a stream. A status message may be split across two `read()` calls. The current code does **not** handle this correctly (it scans each buffer independently). The optimized version *should* handle split messages to be strictly more correct than the current implementation.
**Objective:**
Replace O(n) byte-by-byte scan with SIMD-accelerated or state-machine-based detection, while correctly handling split messages.
**Design — Recommended: Ring Buffer Approach**
Since status messages are 12 bytes and all other bytes are zeros, maintain a 12-byte ring buffer across reads:
```rust
const STATUS_MSG_SIZE: usize = 12;
async fn tcp_client_rx_loop(mut reader: OwnedReadHalf, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 256 * 1024];
let mut carry = [0u8; STATUS_MSG_SIZE - 1]; // up to 11 bytes from previous read
let mut carry_len = 0usize;
while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
// Check if a status message spans the carry + start of buf
if carry_len > 0 {
let needed = STATUS_MSG_SIZE - carry_len;
if n >= needed {
let mut candidate = [0u8; STATUS_MSG_SIZE];
candidate[..carry_len].copy_from_slice(&carry[..carry_len]);
candidate[carry_len..].copy_from_slice(&buf[..needed]);
if candidate[0] == STATUS_MSG_TYPE && candidate[1] >= 0x80 {
state.remote_cpu.store(candidate[1] & 0x7F, Ordering::Relaxed);
}
}
}
// Scan within buf for status messages
// Since data is zeros, use memchr to find 0x07 candidates
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
let mut offset = 0;
while let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[offset..search_end]) {
let i = offset + pos;
if buf[i + 1] >= 0x80 {
state.remote_cpu.store(buf[i + 1] & 0x7F, Ordering::Relaxed);
break;
}
offset = i + 1;
if offset >= search_end { break; }
}
}
// Save trailing bytes for next read
carry_len = (n).min(STATUS_MSG_SIZE - 1);
if n >= carry_len {
carry[..carry_len].copy_from_slice(&buf[n - carry_len..n]);
}
}
}
}
}
```
**Alternative: `memchr` crate only**
If we determine split messages are extremely rare and the current behavior is "good enough," simply replace the `for` loop with:
```rust
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..n - STATUS_MSG_SIZE + 1]) {
if buf[pos + 1] >= 0x80 { /* ... */ }
}
```
This is a 5-line change with massive speedup (SIMD scan). However, the ring buffer approach is strictly more correct and not much more complex.
**Acceptance Criteria:**
- [ ] TCP bidirectional tests pass.
- [ ] Remote CPU reporting still works.
- [ ] Status messages split across reads are correctly detected (unit test for this).
- [ ] `memchr` crate added to deps (very lightweight).
- [ ] No change to wire bytes or server behavior.
**Effort:** 4 hrs
**Risk:** Medium — hot path change; must be carefully reviewed and tested
**Performance Impact:** **High** — eliminates 256KB byte scan per read. At 10K reads/sec, saves ~2.5GB of memory scanning per second.
---
### PRD-013: SQLite Connection Pool / Channel-Based Writer
**Background:**
`server_pro` uses a single `Arc<Mutex<Connection>>`. All quota checks, usage recordings, and auth lookups serialize through one lock. `remaining_budget()` issues 15 queries, locking 15+ times. This is the primary scalability bottleneck for the pro server.
**MikroTik Compatibility:**
- **100% safe.** Server-side infrastructure only. No protocol change.
**Objective:**
Enable concurrent quota checks and usage recording without mutex contention.
**Design — Option A: Connection Pool (Recommended for reads)**
Use `r2d2_sqlite` or `deadpool-sqlite`:
1. Open a pool of ~4-8 connections to the same SQLite file (WAL mode supports this).
2. Read-only operations (`remaining_budget`, `get_user`, `check_user`) borrow a connection from the pool.
3. Write operations (`record_usage`, `record_session`) also borrow from the pool (WAL allows concurrent readers + one writer).
**Design — Option B: Channel-Based Writer (Recommended for writes)**
1. Keep one dedicated `Connection` owned by a single Tokio task.
2. Expose an `mpsc::channel` where other tasks send write requests (`RecordUsage { user, tx, rx }`).
3. The writer task batches or sequentially executes writes without any mutex.
4. Reads use a separate read-only connection or pool.
**Hybrid Recommendation:**
- **Reads:** Small connection pool (4 connections) for quota checks and auth lookups.
- **Writes:** Single dedicated async task with an `mpsc::unbounded_channel` for usage recording.
- **Cache:** Add a 5-second TTL cache for `remaining_budget()` results per user+IP to avoid redundant DB hits during test setup.
**Acceptance Criteria:**
- [ ] `pro` feature compiles and all tests pass.
- [ ] Concurrent test launches scale linearly up to at least 50 concurrent sessions.
- [ ] Quota enforcement remains correct (no over-quota usage).
- [ ] Session logging and interval recording remain accurate.
- [ ] No SQLite "database is locked" errors under load.
**Effort:** 12 days
**Risk:** High — touches every DB interaction in `server_pro`; potential for data races, quota leaks, or connection exhaustion
**Performance Impact:** **High** — enables horizontal scaling of concurrent tests; removes the primary pro server bottleneck.
---
## Execution Roadmap
### Sprint 1: Quick Wins + Foundation (1 day)
- [ ] PRD-001: WCurve cache
- [ ] PRD-002: `Instant::now()` dedup
- [ ] PRD-003: `hash_password` hex fix
- [ ] PRD-004: CSV file handle cache
- [ ] PRD-005: Error string matching
- [ ] PRD-006: `chrono` date replacement
- [ ] PRD-007: Syslog optimization
- [ ] PRD-008: `ip.to_string()` cache
**Deliverable:** Low-risk PR with 8 clean commits. Run full integration tests.
### Sprint 2: Platform & Async Fixes (1 day)
- [ ] PRD-009: FreeBSD CPU FFI
- [ ] PRD-010: Multi-conn Notify wake
- [ ] PRD-011: UDP timer reuse
**Deliverable:** PR with platform + latency improvements.
### Sprint 3: Hot Path Optimization (12 days)
- [ ] PRD-012: TCP RX scan optimization
- [ ] Add unit test for split status messages
- [ ] Benchmark before/after with `criterion` (or manual throughput test)
**Deliverable:** PR with benchmark numbers proving improvement.
### Sprint 4: Scalability (23 days)
- [ ] PRD-013: SQLite connection pool / channel writer
- [ ] Load test: 50 concurrent tests, verify no DB lock contention
- [ ] Add `remaining_budget` cache
**Deliverable:** PR with load test results.
---
## Testing Requirements for All PRDs
Since **no wire protocol changes** are made, the existing integration test suite is the primary validation tool. However, for PRD-012 and PRD-013, additional tests are required:
### New Tests to Add
1. **Split Status Message Unit Test (for PRD-012)**
```rust
#[test]
fn test_status_message_split_across_reads() {
// Feed first 5 bytes, then remaining 7 bytes
// Assert CPU value is extracted correctly
}
```
2. **Concurrent Quota Load Test (for PRD-013)**
```rust
#[tokio::test]
async fn test_concurrent_quota_checks() {
// Spawn 50 tasks doing remaining_budget() + record_usage()
// Assert no panics, no SQLite locked errors
}
```
3. **FreeBSD CPU Parity Test (for PRD-009)**
Manual verification on FreeBSD that FFI `sysctl` returns same values as command.
---
## Appendix: MikroTik Compatibility Checklist
For every PRD, verify:
- [ ] No change to `Command` or `StatusMessage` struct layouts or serialization
- [ ] No change to MD5 challenge-response handshake order
- [ ] No change to EC-SRP5 handshake order or byte values
- [ ] No change to TCP packet sizes or UDP payload format
- [ ] No change to status injection timing (1-second interval)
- [ ] No change to NAT probe behavior
- [ ] Client can still authenticate against stock RouterOS `btest` server
- [ ] Server can still accept connections from stock RouterOS `btest` client
All PRDs in this document satisfy the above checklist by construction.

View File

@@ -2,6 +2,25 @@
A Rust reimplementation of the [MikroTik Bandwidth Test (btest)](https://wiki.mikrotik.com/wiki/Manual:Tools/Bandwidth_Test) protocol. Both server and client modes, fully compatible with MikroTik RouterOS devices.
## Free Public Servers
Test your MikroTik link speed right now — no setup, no registration:
| Server | Location | Dashboard |
|--------|----------|-----------|
| `104.225.217.60` | US | [btest.home.kg](https://btest.home.kg) |
| `188.245.59.196` | EU | [btest.mikata.ru](https://btest.mikata.ru) |
```
/tool bandwidth-test address=104.225.217.60 user=btest password=btest protocol=tcp direction=both
```
After the test, visit `https://btest.home.kg/dashboard/YOUR_IP` to see your results, throughput history, and quota usage. Per-IP limits: 2 GB daily / 8 GB weekly / 24 GB monthly.
> **Note:** TCP is recommended for remote testing. UDP bidirectional through NAT will only show one direction — this is a btest protocol limitation, not specific to btest-rs. See [KNOWN_ISSUES.md](KNOWN_ISSUES.md) for details.
Want to run your own public server? Build with `cargo build --release --features pro` — see [Server Pro](#server-pro) below.
## Features
- **Full protocol support** -- TCP and UDP data transfer, IPv4 and IPv6
@@ -16,7 +35,7 @@ A Rust reimplementation of the [MikroTik Bandwidth Test (btest)](https://wiki.mi
- **Quiet mode** -- suppress terminal output for scripted/automated use
- **NAT traversal** -- probe packet to open firewall holes for UDP receive
- **Single static binary** -- ~2 MB, zero runtime dependencies (musl build)
- **Cross-platform** -- macOS, Linux (x86_64, ARM64), Docker
- **Cross-platform** -- macOS, Linux (x86_64, ARM64, ARMv7), Windows, Android (Termux), Docker
- **Async I/O** -- tokio-based, handles many concurrent connections efficiently
## Performance
@@ -61,6 +80,10 @@ sudo mv btest /usr/local/bin/
# Windows
# Download btest-windows-x86_64.zip from releases
# Android (Termux, no root needed)
curl -L <release-url>/btest-android-aarch64.tar.gz | tar xz
mv btest $PREFIX/bin/
```
### Raspberry Pi
@@ -267,6 +290,29 @@ scripts/test-mikrotik.sh <ip> # Test against MikroTik device
scripts/test-docker.sh # Docker container test
```
## Server Pro
An optional superset of the standard server with multi-user support, quotas, and a web dashboard. Build with `--features pro`:
```bash
cargo build --release --features pro --bin btest-server-pro
```
Features:
- **SQLite user database** — add/remove users, per-user quotas
- **Per-IP bandwidth quotas** — daily, weekly, monthly limits with inline byte budget enforcement
- **Web dashboard** — session history, throughput stats, quota progress bars, JSON export
- **TCP multi-connection** — handles MikroTik's default 20-connection mode
- **MD5 auth against DB** — proper challenge-response verification
```bash
# Create a user and start the server
btest-server-pro --users-db users.db useradd btest btest
btest-server-pro --users-db users.db --ip-daily 2147483648 --ip-weekly 8589934592 --web-port 8080
```
The pro features are completely optional and don't affect the standard `btest` binary.
## Credits
- **[btest-opensource](https://github.com/samm-git/btest-opensource)** by [Alex Samorukov](https://github.com/samm-git) -- original C implementation and protocol reverse-engineering. Licensed under **MIT**.

79
benches/bandwidth.rs Normal file
View File

@@ -0,0 +1,79 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use btest_rs::bandwidth::{BandwidthState, calc_send_interval, advance_next_send};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
fn bench_atomic_fetch_add(c: &mut Criterion) {
let state = BandwidthState::new();
c.bench_function("bandwidth_rx_bytes_fetch_add", |b| {
b.iter(|| {
black_box(state.rx_bytes.fetch_add(1500, Ordering::Relaxed));
})
});
c.bench_function("bandwidth_tx_bytes_fetch_add", |b| {
b.iter(|| {
black_box(state.tx_bytes.fetch_add(32768, Ordering::Relaxed));
})
});
}
fn bench_spend_budget(c: &mut Criterion) {
// Unlimited budget (fast path)
let unlimited = BandwidthState::new();
c.bench_function("spend_budget_unlimited", |b| {
b.iter(|| black_box(unlimited.spend_budget(black_box(1500))))
});
// Limited budget
let limited = BandwidthState::new();
limited.byte_budget.store(1_000_000_000, Ordering::SeqCst);
c.bench_function("spend_budget_limited", |b| {
b.iter(|| black_box(limited.spend_budget(black_box(1500))))
});
}
fn bench_calc_send_interval(c: &mut Criterion) {
c.bench_function("calc_interval_100mbps_1500b", |b| {
b.iter(|| black_box(calc_send_interval(black_box(100_000_000), black_box(1500))))
});
c.bench_function("calc_interval_1gbps_32768b", |b| {
b.iter(|| black_box(calc_send_interval(black_box(1_000_000_000), black_box(32768))))
});
c.bench_function("calc_interval_unlimited", |b| {
b.iter(|| black_box(calc_send_interval(black_box(0), black_box(1500))))
});
}
fn bench_advance_next_send(c: &mut Criterion) {
let iv = Duration::from_micros(120);
let now = Instant::now();
let mut next = now;
c.bench_function("advance_next_send", |b| {
b.iter(|| {
let r = advance_next_send(&mut next, iv, now);
black_box(r);
});
next = now;
});
}
fn bench_summary(c: &mut Criterion) {
let state = BandwidthState::new();
// Pre-populate some values so loads are real
state.total_tx_bytes.store(1_000_000_000, Ordering::Relaxed);
state.total_rx_bytes.store(2_000_000_000, Ordering::Relaxed);
state.intervals.store(100, Ordering::Relaxed);
c.bench_function("bandwidth_summary", |b| {
b.iter(|| black_box(state.summary()))
});
}
criterion_group!(
bandwidth_benches,
bench_atomic_fetch_add,
bench_spend_budget,
bench_calc_send_interval,
bench_advance_next_send,
bench_summary
);
criterion_main!(bandwidth_benches);

19
benches/ecsrp5.rs Normal file
View File

@@ -0,0 +1,19 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use btest_rs::ecsrp5::{WCurve, WCURVE};
fn bench_wcurve_new(c: &mut Criterion) {
c.bench_function("wcurve_new_uncached", |b| {
b.iter(|| black_box(WCurve::new()))
});
}
fn bench_wcurve_cached(c: &mut Criterion) {
// Force initialization before benchmarking
let _ = &*WCURVE;
c.bench_function("wcurve_cached_access", |b| {
b.iter(|| black_box(&*WCURVE))
});
}
criterion_group!(ecsrp5_benches, bench_wcurve_new, bench_wcurve_cached);
criterion_main!(ecsrp5_benches);

65
benches/protocol.rs Normal file
View File

@@ -0,0 +1,65 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use btest_rs::protocol::{Command, StatusMessage, CMD_PROTO_TCP, CMD_DIR_BOTH};
fn bench_command_serialize(c: &mut Criterion) {
let cmd = Command::new(CMD_PROTO_TCP, CMD_DIR_BOTH);
c.bench_function("command_serialize", |b| {
b.iter(|| black_box(cmd.serialize()))
});
}
fn bench_command_deserialize(c: &mut Criterion) {
let bytes = [0x01, 0x03, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
c.bench_function("command_deserialize", |b| {
b.iter(|| black_box(Command::deserialize(black_box(&bytes))))
});
}
fn bench_status_message_serialize(c: &mut Criterion) {
let msg = StatusMessage {
seq: 42,
bytes_received: 1_000_000,
cpu_load: 50,
};
c.bench_function("status_message_serialize", |b| {
b.iter(|| black_box(msg.serialize()))
});
}
fn bench_status_message_deserialize(c: &mut Criterion) {
let bytes = [0x07, 0xB2, 0x00, 0x00, 0x2A, 0x00, 0x00, 0x00, 0x40, 0x42, 0x0F, 0x00];
c.bench_function("status_message_deserialize", |b| {
b.iter(|| black_box(StatusMessage::deserialize(black_box(&bytes))))
});
}
fn bench_roundtrip(c: &mut Criterion) {
let cmd = Command::new(CMD_PROTO_TCP, CMD_DIR_BOTH);
let msg = StatusMessage {
seq: 99,
bytes_received: 50_000,
cpu_load: 75,
};
c.bench_function("command_roundtrip", |b| {
b.iter(|| {
let s = black_box(cmd.serialize());
black_box(Command::deserialize(&s))
})
});
c.bench_function("status_message_roundtrip", |b| {
b.iter(|| {
let s = black_box(msg.serialize());
black_box(StatusMessage::deserialize(&s))
})
});
}
criterion_group!(
protocol_benches,
bench_command_serialize,
bench_command_deserialize,
bench_status_message_serialize,
bench_status_message_deserialize,
bench_roundtrip
);
criterion_main!(protocol_benches);

100
benches/tcp_rx_scan.rs Normal file
View File

@@ -0,0 +1,100 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use btest_rs::client::scan_status_message;
use btest_rs::protocol::STATUS_MSG_TYPE;
/// Naive O(n) byte-by-byte scan — the old implementation.
fn naive_scan(buf: &[u8]) -> Option<u8> {
const STATUS_MSG_SIZE: usize = 12;
if buf.len() < STATUS_MSG_SIZE {
return None;
}
for i in 0..=(buf.len() - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
return Some((buf[i + 1] & 0x7F).min(100));
}
}
None
}
fn make_buffer(size: usize, status_at: Option<usize>) -> Vec<u8> {
let mut buf = vec![0u8; size];
if let Some(pos) = status_at {
buf[pos] = STATUS_MSG_TYPE;
buf[pos + 1] = 0x80 | 50; // CPU = 50%
}
buf
}
fn bench_scan_all_zeros(c: &mut Criterion) {
let mut group = c.benchmark_group("tcp_rx_scan_all_zeros");
for size in [4096, 65536, 262144] {
let buf = make_buffer(size, None);
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::new("naive", size), &buf, |b, buf| {
b.iter(|| black_box(naive_scan(black_box(buf))))
});
group.bench_with_input(BenchmarkId::new("memchr", size), &buf, |b, buf| {
b.iter(|| black_box(scan_status_message(black_box(&[]), black_box(buf))))
});
}
group.finish();
}
fn bench_scan_status_at_start(c: &mut Criterion) {
let mut group = c.benchmark_group("tcp_rx_scan_status_at_start");
for size in [4096, 65536, 262144] {
let buf = make_buffer(size, Some(0));
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::new("naive", size), &buf, |b, buf| {
b.iter(|| black_box(naive_scan(black_box(buf))))
});
group.bench_with_input(BenchmarkId::new("memchr", size), &buf, |b, buf| {
b.iter(|| black_box(scan_status_message(black_box(&[]), black_box(buf))))
});
}
group.finish();
}
fn bench_scan_status_at_end(c: &mut Criterion) {
let mut group = c.benchmark_group("tcp_rx_scan_status_at_end");
for size in [4096, 65536, 262144] {
let buf = make_buffer(size, Some(size - 12));
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::new("naive", size), &buf, |b, buf| {
b.iter(|| black_box(naive_scan(black_box(buf))))
});
group.bench_with_input(BenchmarkId::new("memchr", size), &buf, |b, buf| {
b.iter(|| black_box(scan_status_message(black_box(&[]), black_box(buf))))
});
}
group.finish();
}
fn bench_scan_split_message(c: &mut Criterion) {
// Simulate a status message split across two reads:
// carry has first 5 bytes, buf has remaining 7 bytes
let mut carry = vec![0u8; 5];
carry[0] = STATUS_MSG_TYPE;
carry[1] = 0x80 | 75;
let buf = vec![0u8; 7];
c.bench_function("scan_split_5_7", |b| {
b.iter(|| black_box(scan_status_message(black_box(&carry), black_box(&buf))))
});
// Split with 2 bytes in carry (status type + cpu byte), 10 in buf
let carry_2 = vec![STATUS_MSG_TYPE, 0x80 | 33];
let buf_10 = vec![0u8; 10];
c.bench_function("scan_split_2_10", |b| {
b.iter(|| black_box(scan_status_message(black_box(&carry_2), black_box(&buf_10))))
});
}
criterion_group!(
tcp_rx_scan_benches,
bench_scan_all_zeros,
bench_scan_status_at_start,
bench_scan_status_at_end,
bench_scan_split_message
);
criterion_main!(tcp_rx_scan_benches);

View File

@@ -2,282 +2,181 @@
## Overview
btest-rs is a Rust reimplementation of the MikroTik Bandwidth Test protocol. It operates in two modes: **server** (accepts connections from MikroTik devices) and **client** (connects to MikroTik btest servers).
btest-rs is a Rust reimplementation of the MikroTik Bandwidth Test protocol. It operates in two modes: **server** (accepts connections from MikroTik devices) and **client** (connects to MikroTik btest servers). An optional **server-pro** mode adds multi-user support, quotas, and a web dashboard.
## Module Structure
```mermaid
graph TB
main["main.rs<br/>CLI parsing (clap)"]
server["server.rs<br/>Server mode"]
client["client.rs<br/>Client mode"]
protocol["protocol.rs<br/>Wire protocol types"]
auth["auth.rs<br/>MD5 authentication"]
ecsrp5["ecsrp5.rs<br/>EC-SRP5 authentication<br/>(Curve25519 Weierstrass)"]
bandwidth["bandwidth.rs<br/>Rate control & reporting"]
csv_output["csv_output.rs<br/>CSV result logging"]
syslog["syslog_logger.rs<br/>Remote syslog (RFC 3164)"]
lib["lib.rs<br/>Public API for tests"]
```
src/
├── main.rs # CLI entry point, argument parsing (clap)
├── lib.rs # Public API (re-exports all modules for tests/pro)
├── protocol.rs # Wire format: Command, StatusMessage, constants
├── auth.rs # MD5 challenge-response authentication
├── ecsrp5.rs # EC-SRP5 authentication (Curve25519 Weierstrass)
├── server.rs # Server mode: listener, TCP/UDP handlers, multi-conn
├── client.rs # Client mode: connector, TCP/UDP handlers, status parsing
├── bandwidth.rs # Rate limiting, formatting, shared BandwidthState, byte budget
├── cpu.rs # CPU sampler (macOS, Linux, Android, Windows, FreeBSD)
├── csv_output.rs # CSV result logging (append-mode, auto-header)
├── syslog_logger.rs # Remote syslog sender (RFC 3164 / BSD format)
├── bin/
│ ├── client_only.rs # Stripped client binary for embedded/OpenWrt
│ └── server_only.rs # Stripped server binary for embedded/OpenWrt
└── server_pro/ # Optional (--features pro)
├── main.rs # Pro CLI: user management, quota flags, web port
├── server_loop.rs # Accept loop with auth, quotas, multi-conn sessions
├── user_db.rs # SQLite: users, usage, ip_usage, sessions, intervals
├── quota.rs # QuotaManager: per-user + per-IP limits, remaining_budget()
├── enforcer.rs # QuotaEnforcer: periodic checks, max_duration, StopReason
├── ldap_auth.rs # LDAP auth scaffold (not yet wired)
└── web/
└── mod.rs # Axum web dashboard: Chart.js, quota bars, JSON export
```
main --> server
main --> client
main --> bandwidth
main --> csv_output
main --> syslog
server --> protocol
server --> auth
server --> ecsrp5
server --> bandwidth
server --> syslog
client --> protocol
client --> auth
client --> ecsrp5
client --> bandwidth
lib --> server
lib --> client
lib --> protocol
lib --> auth
lib --> ecsrp5
lib --> bandwidth
## CLI Output Format
The client outputs one line per second per direction:
```
[ 5] TX 285.47 Mbps (35684352 bytes) cpu: 20%/62%
[ 5] RX 283.64 Mbps (35454988 bytes) cpu: 20%/62% lost: 12
```
Format: `[interval] direction speed (bytes) cpu: local%/remote% [lost: N]`
At test end, a summary line:
```
TEST_END peer=172.16.81.1 proto=TCP dir=both duration=60s tx_avg=284.94Mbps rx_avg=272.83Mbps tx_bytes=2137030656 rx_bytes=2046260728 lost=0
```
## Data Flow
### Server Mode (MikroTik connects to us)
```mermaid
sequenceDiagram
participant MK as MikroTik Client
participant TCP as TCP Control<br/>(port 2000)
participant SRV as btest-rs Server
participant UDP as UDP Data<br/>(port 2001+)
MK->>TCP: Connect
SRV->>TCP: HELLO [01 00 00 00]
MK->>TCP: Command [16 bytes]
Note over SRV: Parse proto, direction,<br/>tx_size, speeds
alt No auth configured
SRV->>TCP: AUTH_OK [01 00 00 00]
else MD5 auth (RouterOS < 6.43)
SRV->>TCP: AUTH_REQUIRED [02 00 00 00]
SRV->>TCP: Challenge [16 random bytes]
MK->>TCP: Response [16 hash + 32 username]
Note over SRV: Verify MD5(pass + MD5(pass + challenge))
SRV->>TCP: AUTH_OK or AUTH_FAILED
else EC-SRP5 auth (RouterOS >= 6.43, --ecsrp5 flag)
SRV->>TCP: EC-SRP5 [03 00 00 00]
MK->>TCP: [len][username\0][client_pubkey:32][parity:1]
SRV->>TCP: [len][server_pubkey:32][parity:1][salt:16]
MK->>TCP: [len][client_confirmation:32]
SRV->>TCP: [len][server_confirmation:32]
Note over SRV: Curve25519 Weierstrass EC-SRP5<br/>See docs/ecsrp5-research.md
SRV->>TCP: AUTH_OK [01 00 00 00]
end
alt TCP mode
Note over SRV,MK: Data flows on same TCP connection
loop Every second
SRV-->>SRV: Print bandwidth stats
end
else UDP mode
SRV->>TCP: UDP port [2 bytes BE]
Note over SRV: Bind UDP socket
par TX Thread (if server transmits)
loop Continuous
SRV->>UDP: Data packets [seq + payload]
end
and RX Thread (if server receives)
loop Continuous
UDP->>SRV: Data packets [seq + payload]
end
and Status Loop (TCP control)
loop Every 1 second
MK->>TCP: Status [12 bytes]
SRV->>TCP: Status [12 bytes]
Note over SRV: Adjust TX speed<br/>based on client feedback
end
end
end
```
MikroTik → TCP:2000 → HELLO → Command [16 bytes] → Auth → Data Transfer
```
1. Server sends HELLO `[01 00 00 00]`
2. Client sends 16-byte command (protocol, direction, tx_size, speeds, conn_count)
3. Auth: none (`01`), MD5 (`02`), or EC-SRP5 (`03`)
4. TCP: data flows on same connection, 12-byte status messages interleaved every 1s
5. UDP: server sends port number, data on UDP, status exchange stays on TCP
### Client Mode (we connect to MikroTik)
```mermaid
sequenceDiagram
participant CLI as btest-rs Client
participant TCP as TCP Control
participant MK as MikroTik Server
1. Connect to MikroTik:2000
2. Read HELLO, send command
3. Auto-detect auth type from response byte, authenticate
4. Start data transfer with status exchange
CLI->>TCP: Connect to MikroTik:2000
MK->>TCP: HELLO
CLI->>TCP: Command [16 bytes]
Note over CLI: direction bits tell server<br/>what to do (TX/RX/BOTH)
### Status Message Format (12 bytes)
alt Auth response 01 (no auth)
Note over CLI: No auth, proceed
else Auth response 02 (MD5)
MK->>TCP: Challenge [16 random bytes]
CLI->>TCP: MD5 response [48 bytes]
MK->>TCP: AUTH_OK
else Auth response 03 (EC-SRP5)
CLI->>TCP: [len][username\0][client_pubkey:32][parity:1]
MK->>TCP: [len][server_pubkey:32][parity:1][salt:16]
CLI->>TCP: [len][client_confirmation:32]
MK->>TCP: [len][server_confirmation:32]
MK->>TCP: AUTH_OK
end
Note over CLI,MK: Data transfer begins<br/>(TCP or UDP, same as server)
```
[0x07][cpu:1][pad:2][seq:4 LE][bytes_received:4 LE]
```
- Byte 0: `0x07` (STATUS_MSG_TYPE)
- Byte 1: `0x80 | cpu_percentage` (MikroTik encoding)
- Bytes 4-7: sequence number (little-endian u32)
- Bytes 8-11: bytes received this interval (little-endian u32)
## Threading Model
```mermaid
graph TB
subgraph "Server Process"
LISTEN["Main Loop<br/>Accept connections"]
LISTEN -->|spawn per client| HANDLER
All I/O is async via tokio. Per-client:
- **TX task**: sends data packets at target rate
- **RX task**: receives data, counts bytes, extracts status messages (TCP BOTH mode)
- **Status loop**: exchanges 12-byte status messages every 1s, prints bandwidth
- **Status reader** (TCP TX-only): reads server's status messages for remote CPU
subgraph "Per-Client Tasks (tokio)"
HANDLER["Connection Handler<br/>Handshake + Auth"]
HANDLER --> TX["TX Task<br/>Send data packets"]
HANDLER --> RX["RX Task<br/>Receive data packets"]
HANDLER --> STATUS["Status Loop<br/>Exchange stats every 1s"]
end
end
Shared state via `Arc<BandwidthState>` with atomic counters — no mutexes.
subgraph "Shared State (Arc + Atomics)"
STATE["BandwidthState"]
TX_BYTES["tx_bytes: AtomicU64"]
RX_BYTES["rx_bytes: AtomicU64"]
TX_SPEED["tx_speed: AtomicU32"]
RUNNING["running: AtomicBool"]
end
### BandwidthState Fields
| Field | Type | Purpose |
|-------|------|---------|
| `tx_bytes` | AtomicU64 | Bytes sent this interval (reset by swap) |
| `rx_bytes` | AtomicU64 | Bytes received this interval |
| `tx_speed` | AtomicU32 | Target TX speed (dynamic, from server feedback) |
| `running` | AtomicBool | Test active flag |
| `remote_cpu` | AtomicU8 | Remote peer's CPU (from status messages) |
| `byte_budget` | AtomicU64 | Remaining quota bytes (u64::MAX = unlimited) |
| `total_tx_bytes` | AtomicU64 | Cumulative TX (never reset) |
| `total_rx_bytes` | AtomicU64 | Cumulative RX (never reset) |
## Server Pro Architecture
Optional feature (`--features pro`) providing a multi-user public btest server.
TX --> TX_BYTES
RX --> RX_BYTES
STATUS --> TX_BYTES
STATUS --> RX_BYTES
STATUS --> TX_SPEED
TX --> TX_SPEED
TX --> RUNNING
RX --> RUNNING
STATUS --> RUNNING
```
Accept → IP check → HELLO → Command → Auth (DB) → Quota check → Budget set → Test
QuotaEnforcer (parallel)
- checks every N seconds
- max_duration timeout
- sets running=false on exceed
```
**Byte budget**: Before the test starts, `remaining_budget()` computes the minimum remaining quota across all applicable limits. This is stored in `BandwidthState.byte_budget`. Every TX/RX loop checks `spend_budget()` per-packet — when budget hits 0, the test stops immediately. This prevents quota overshoot even on 10+ Gbps links.
**Multi-connection TCP**: MikroTik sends `tcp_conn_count` connections. The first authenticates and registers a session token. Subsequent connections match by token and join. When all connections arrive, the test starts with per-stream TX/RX tasks.
**Web dashboard** (axum):
- `GET /` — landing page with instructions
- `GET /dashboard/{ip}` — per-IP dashboard with Chart.js graph, session table, quota bars
- `GET /api/ip/{ip}/stats` — aggregate stats JSON
- `GET /api/ip/{ip}/sessions` — session list JSON
- `GET /api/ip/{ip}/quota` — quota usage JSON
- `GET /api/ip/{ip}/export` — full export with human-readable fields
- `GET /api/session/{id}/intervals` — per-second throughput data
## CPU Usage Monitoring
A background OS thread samples system CPU every 1 second:
| Platform | Method |
|----------|--------|
| macOS | `host_statistics(HOST_CPU_LOAD_INFO)` |
| Linux | `/proc/stat` aggregate CPU line |
| Android | `/proc/stat` (same as Linux) |
| Windows | `GetSystemTimes()` FFI |
| FreeBSD | `sysctl kern.cp_time` |
Stored in global `AtomicU8`, included in status messages as `0x80 | percentage`.
## Build Targets
| Target | Binary | Notes |
|--------|--------|-------|
| `x86_64-unknown-linux-musl` | btest | Static, zero deps |
| `aarch64-unknown-linux-musl` | btest | RPi 4/5, ARM servers |
| `armv7-unknown-linux-musleabihf` | btest | RPi 3, OpenWrt |
| `x86_64-pc-windows-gnu` | btest.exe | Cross-compiled |
| `aarch64-linux-android` | btest | Termux ARMv8 |
| `armv7-linux-androideabi` | btest | Termux ARMv7 |
| macOS (native) | btest | Apple Silicon + Intel |
| Docker (multi-arch) | image | amd64 + arm64 |
## Key Design Decisions
### 1. Tokio async runtime
1. **Tokio async runtime** — all I/O is async, handles hundreds of concurrent connections
2. **Lock-free shared state** — AtomicU64 counters, `swap(0)` reads and resets per interval
3. **Direction bits from server perspective**`0x01`=server RX, `0x02`=server TX, `0x03`=both
4. **TCP socket half keepalive** — dropping `OwnedWriteHalf` sends FIN, so unused halves are kept alive
5. **Static musl binary** — ~2 MB, zero runtime dependencies
6. **EC-SRP5 with big integer arithmetic** — Curve25519 Weierstrass form via `num-bigint`
7. **Global singletons for syslog/CSV**`Mutex<Option<...>>` statics, initialized once at startup
8. **Shared BandwidthState for timeout survival** — state created in main(), survives tokio cancellation
9. **Inline byte budget** — per-packet quota check with fast path (u64::MAX = unlimited, returns immediately)
10. **TCP status message scanning** — RX loop detects 12-byte status messages in the data stream by scanning for `0x07` marker byte to extract remote CPU
All I/O is async via tokio. Each client connection spawns independent tasks for TX, RX, and status exchange. This allows handling hundreds of concurrent connections on a single thread pool.
## Tests
### 2. Lock-free shared state
TX/RX threads and the status loop share bandwidth counters via `AtomicU64`. No mutexes needed -- `swap(0)` atomically reads and resets counters each interval.
### 3. Sequential status loop (matching C pselect)
The UDP status exchange uses a sequential timeout-read-then-send pattern rather than `tokio::select!`. This ensures our status messages are sent exactly every 1 second, preventing MikroTik's speed adaptation from seeing irregular feedback.
### 4. Direction bits from server perspective
The direction byte in the protocol means what the **server** should do:
- `0x01` (CMD_DIR_RX) = server receives
- `0x02` (CMD_DIR_TX) = server transmits
- `0x03` (CMD_DIR_BOTH) = bidirectional
The client inverts before sending: client "transmit" sends `CMD_DIR_RX` (telling server to receive).
### 5. TCP socket half keepalive
When only one direction is active (e.g., TX only), the unused socket half is kept alive. Dropping `OwnedWriteHalf` sends a TCP FIN, which MikroTik interprets as disconnection.
### 6. Static musl binary
Release builds use musl for a fully static binary with zero runtime dependencies. The binary is approximately 2 MB and runs on any Linux distribution.
### 7. EC-SRP5 with big integer arithmetic
The EC-SRP5 implementation uses `num-bigint` for Curve25519 Weierstrass-form elliptic curve arithmetic. MikroTik's authentication uses the Weierstrass form (not the more common Montgomery or Edwards forms), requiring direct field arithmetic over the prime `2^255 - 19`. The implementation includes point multiplication, `lift_x`, `redp1` (hash-to-curve), and Montgomery coordinate conversion.
### 8. Global singletons for syslog and CSV
The syslog and CSV modules use `Mutex<Option<...>>` global statics. This avoids threading state through every function call while remaining safe. Both modules are initialized once at startup and used from any async task via their public API functions.
### 9. Shared BandwidthState for client duration timeout
When running with `--duration`, the tokio timeout cancels the client future. To preserve stats accumulated during the test, `BandwidthState` is created in `main()` and passed as an `Arc` into `run_client()`. The state survives cancellation because `main()` holds a reference. The `record_interval()` method accumulates totals that `summary()` returns.
### 10. IPv6 socket handling
IPv6 requires special handling on macOS:
- UDP sockets bind to `[::]` for IPv6 peers, `0.0.0.0` for IPv4
- Socket send/receive buffers set to 4MB via `socket2` before wrapping with tokio
- `SocketAddr::new()` used instead of string formatting (avoids `[addr]:port` parsing issues)
- Connected sockets preferred for single-connection (avoids ENOBUFS on `send_to()`)
- NDP probe packet sent before data blast to populate neighbor cache
- Adaptive backoff on ENOBUFS (200μs→10ms, resets on success)
### 11. CPU usage monitoring
A background OS thread samples system CPU every 1 second via:
- **macOS:** `host_statistics(HOST_CPU_LOAD_INFO)` — returns user/system/idle/nice ticks
- **Linux:** `/proc/stat` — reads aggregate CPU line
The percentage is stored in a global `AtomicU8` and included in every status message at byte 1 using MikroTik's encoding: `0x80 | percentage`. On receive, the remote CPU is decoded with `byte & 0x7F` and capped at 100%. Both local and remote CPU are displayed per interval and logged to CSV/syslog.
## File Layout
```
btest-rs/
├── src/
│ ├── main.rs # CLI entry point, argument parsing (clap)
│ ├── lib.rs # Public API (used by integration tests)
│ ├── protocol.rs # Wire format: Command, StatusMessage, constants
│ ├── auth.rs # MD5 challenge-response authentication
│ ├── ecsrp5.rs # EC-SRP5 authentication (Curve25519 Weierstrass)
│ ├── server.rs # Server mode: listener, TCP/UDP handlers
│ ├── client.rs # Client mode: connector, TCP/UDP handlers
│ ├── bandwidth.rs # Rate limiting, formatting, shared state
│ ├── cpu.rs # CPU usage sampler (macOS + Linux)
│ ├── csv_output.rs # CSV result logging (append-mode, auto-header)
│ └── syslog_logger.rs # Remote syslog sender (RFC 3164 / BSD format)
├── tests/
│ └── integration_test.rs # End-to-end server/client tests
├── scripts/
│ ├── build-linux.sh # Cross-compile for x86_64 Linux (musl)
│ ├── build-macos-release.sh # macOS release build
│ ├── install-service.sh # systemd service installer
│ ├── push-docker.sh # Push Docker image to registry
│ ├── test-local.sh # Loopback self-test
│ ├── test-mikrotik.sh # Test against MikroTik device
│ ├── test-docker.sh # Docker container test
│ └── debug-capture.sh # Packet capture for debugging
├── docs/
│ ├── architecture.md # This file
│ ├── protocol.md # Protocol specification
│ ├── user-guide.md # Usage documentation
│ ├── docker.md # Docker & deployment guide
│ ├── ecsrp5-research.md # EC-SRP5 reverse-engineering notes
│ └── man/
│ └── btest.1 # Unix manual page (troff format)
├── tests/
│ ├── integration_test.rs # Basic server/client handshake tests
│ ├── ecsrp5_test.rs # EC-SRP5 authentication tests
│ └── full_integration_test.rs # Comprehensive: all protocols, IPv4/6, CSV, syslog
├── deploy/
│ └── syslog-ng-btest.conf # syslog-ng configuration for btest events
├── proto-test/ # Python EC-SRP5 prototype (research branch)
│ ├── btest_ecsrp5_client.py # Working Python btest EC-SRP5 client
│ ├── btest_mitm.py # MITM proxy for protocol analysis
│ └── elliptic_curves.py # Curve25519 Weierstrass (MarginResearch)
├── KNOWN_ISSUES.md # Known bugs and platform limitations
├── Dockerfile # Production Docker image (multi-stage)
├── Dockerfile.cross # Cross-compilation for Linux x86_64
├── docker-compose.yml # Docker Compose configuration
├── Cargo.toml # Rust package manifest
├── Cargo.lock # Dependency lock file
├── LICENSE # MIT License
└── btest-opensource/ # Original C implementation (git submodule)
```
| Suite | Count | What |
|-------|-------|------|
| Unit tests (lib) | 12 | Bandwidth parsing, CPU sampling, auth hash vectors |
| Enforcer tests (pro) | 10 | Budget, quota, duration, flush |
| Integration tests | 8 | Server/client handshake, auth, TCP data |
| EC-SRP5 tests | 6 | Full auth flow, wrong password, UDP bidir |
| Full integration | 23 | All protocols × directions, IPv4/6, CSV, syslog, CPU |
| **Total** | **59** | |

View File

@@ -73,6 +73,7 @@ impl BandwidthState {
}
/// Set the byte budget (total bytes allowed for the entire test).
#[cfg(feature = "pro")]
pub fn set_budget(&self, budget: u64) {
self.byte_budget.store(budget, std::sync::atomic::Ordering::SeqCst);
}

View File

@@ -27,6 +27,11 @@ pub async fn run_client(
let mut stream = TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?;
// Set TCP socket buffers to 4MB for high throughput
let sock_ref = socket2::SockRef::from(&stream);
let _ = sock_ref.set_send_buffer_size(4 * 1024 * 1024);
let _ = sock_ref.set_recv_buffer_size(4 * 1024 * 1024);
recv_hello(&mut stream).await?;
tracing::info!("Connected to server");
@@ -127,6 +132,12 @@ async fn run_tcp_test_client(stream: TcpStream, cmd: Command, state: Arc<Bandwid
Some(tokio::spawn(async move {
tcp_client_rx_loop(reader, state_rx).await
}))
} else if client_should_tx {
// TX-only: still need to read the server's status messages to get remote CPU.
// Don't count these bytes as RX data.
Some(tokio::spawn(async move {
tcp_client_status_reader(reader, state_rx).await
}))
} else {
_reader_keepalive = Some(reader);
None
@@ -148,15 +159,17 @@ async fn tcp_client_tx_loop(
) {
tokio::time::sleep(Duration::from_millis(100)).await;
let packet = vec![0u8; tx_size]; // TCP data is all zeros
let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16);
// Use larger writes when running unlimited to reduce syscall overhead
let effective_size = if interval.is_none() { tx_size.max(256 * 1024) } else { tx_size };
let packet = vec![0u8; effective_size]; // TCP data is all zeros
let mut next_send = Instant::now();
while state.running.load(Ordering::Relaxed) {
if writer.write_all(&packet).await.is_err() {
break;
}
state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed);
state.tx_bytes.fetch_add(effective_size as u64, Ordering::Relaxed);
if state.tx_speed_changed.load(Ordering::Relaxed) {
state.tx_speed_changed.store(false, Ordering::Relaxed);
@@ -183,17 +196,92 @@ async fn tcp_client_rx_loop(
mut reader: tokio::net::tcp::OwnedReadHalf,
state: Arc<BandwidthState>,
) {
let mut buf = vec![0u8; 65536];
let mut buf = vec![0u8; 256 * 1024];
// Carry trailing bytes from the previous read to detect status messages
// that are split across TCP read boundaries.
let mut carry = [0u8; STATUS_MSG_SIZE - 1];
let mut carry_len = 0usize;
while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
// 1) Check if a status message spans the carry + start of buf.
if carry_len > 0 {
for offset in 0..carry_len {
if carry[offset] != STATUS_MSG_TYPE {
continue;
}
let from_carry = carry_len - offset;
let from_buf = STATUS_MSG_SIZE - from_carry;
if n < from_buf {
continue;
}
let cpu_byte = if from_carry >= 2 {
carry[offset + 1]
} else {
buf[0]
};
if cpu_byte >= 0x80 {
state.remote_cpu.store((cpu_byte & 0x7F).min(100), Ordering::Relaxed);
break;
}
}
}
// 2) Fast scan within buf for status messages.
// Data packets are all zeros, so memchr (SIMD) exits almost instantly.
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) {
if buf[pos + 1] >= 0x80 {
let cpu = buf[pos + 1] & 0x7F;
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
}
}
}
// 3) Save trailing bytes for the next read.
carry_len = n.min(STATUS_MSG_SIZE - 1);
if n >= carry_len {
carry[..carry_len].copy_from_slice(&buf[n - carry_len..n]);
}
}
}
}
}
/// Read only status messages from the server (TX-only mode).
/// The server sends 12-byte status messages on the TCP connection even when
/// the client is only transmitting. We need to read them to get remote CPU
/// and to prevent the TCP receive buffer from filling up.
async fn tcp_client_status_reader(
mut reader: tokio::net::tcp::OwnedReadHalf,
state: Arc<BandwidthState>,
) {
let mut buf = [0u8; STATUS_MSG_SIZE];
while state.running.load(Ordering::Relaxed) {
match reader.read_exact(&mut buf).await {
Ok(_) => {
if buf[0] == STATUS_MSG_TYPE && buf[1] >= 0x80 {
let status = StatusMessage::deserialize(&buf);
state.remote_cpu.store(status.cpu_load, Ordering::Relaxed);
// Use server's bytes_received for TX speed adaptation
if status.bytes_received > 0 {
let new_speed =
((status.bytes_received as u64 * 8 * 3) / 2) as u32;
state.tx_speed.store(new_speed, Ordering::Relaxed);
state.tx_speed_changed.store(true, Ordering::Relaxed);
}
}
}
Err(_) => break,
}
}
}
// --- UDP Test Client ---
async fn run_udp_test_client(
@@ -333,30 +421,39 @@ async fn udp_client_tx_loop(
async fn udp_client_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut last_seq: Option<u32> = None;
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
while state.running.load(Ordering::Relaxed) {
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await {
Ok(Ok(n)) if n >= 4 => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
tokio::select! {
biased;
res = socket.recv(&mut buf) => {
match res {
Ok(n) if n >= 4 => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
}
}
last_seq = Some(seq);
}
Ok(_) => {}
Err(e) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
last_seq = Some(seq);
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(_) => {
_ = &mut timeout => {
tracing::debug!("UDP RX timeout");
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
}
}
@@ -474,3 +571,84 @@ async fn udp_client_status_loop(
}
}
}
/// Scan for a status message in `carry` + `buf` and return the CPU value if found.
#[allow(dead_code)]
pub fn scan_status_message(carry: &[u8], buf: &[u8]) -> Option<u8> {
let carry_len = carry.len();
// 1) Check split across carry + buf
if carry_len > 0 {
for offset in 0..carry_len {
if carry[offset] != STATUS_MSG_TYPE {
continue;
}
let from_carry = carry_len - offset;
let from_buf = STATUS_MSG_SIZE - from_carry;
if buf.len() < from_buf {
continue;
}
let cpu_byte = if from_carry >= 2 {
carry[offset + 1]
} else {
buf[0]
};
if cpu_byte >= 0x80 {
return Some((cpu_byte & 0x7F).min(100));
}
}
}
// 2) Check within buf
let n = buf.len();
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) {
if buf[pos + 1] >= 0x80 {
return Some((buf[pos + 1] & 0x7F).min(100));
}
}
}
None
}
#[cfg(test)]
mod status_scan_tests {
use super::*;
#[test]
fn test_status_within_buffer() {
let mut buf = [0u8; 256];
buf[10] = STATUS_MSG_TYPE;
buf[11] = 0x80 | 42;
assert_eq!(scan_status_message(&[], &buf), Some(42));
}
#[test]
fn test_status_split_across_reads() {
// 12-byte status message: [0x07, 0x80|50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// Split: first 5 bytes in carry, last 7 bytes in buf
let carry = [0x07, 0xB2, 0, 0, 0];
let buf = [0, 0, 0, 0, 0, 0, 0];
assert_eq!(scan_status_message(&carry, &buf), Some(50));
}
#[test]
fn test_status_split_at_boundary() {
// Split: first 1 byte (0x07) in carry, rest in buf
let carry = [0x07];
let mut buf = [0u8; 20];
buf[0] = 0x80 | 77;
assert_eq!(scan_status_message(&carry, &buf), Some(77));
}
#[test]
fn test_no_status_in_zeros() {
let buf = [0u8; 256];
assert_eq!(scan_status_message(&[], &buf), None);
}
#[test]
fn test_short_buffer_no_panic() {
let buf = [0x07, 0x80];
assert_eq!(scan_status_message(&[], &buf), None);
}
}

View File

@@ -29,7 +29,7 @@ pub fn get() -> u8 {
// --- Platform-specific implementation ---
#[cfg(target_os = "linux")]
#[cfg(any(target_os = "linux", target_os = "android"))]
fn get_cpu_times() -> (u64, u64) {
// Read /proc/stat: cpu user nice system idle iowait irq softirq steal
if let Ok(content) = std::fs::read_to_string("/proc/stat") {
@@ -97,6 +97,7 @@ fn get_cpu_times() -> (u64, u64) {
fn get_cpu_times() -> (u64, u64) {
#[repr(C)]
#[derive(Default)]
#[allow(non_snake_case)]
struct FILETIME {
dwLowDateTime: u32,
dwHighDateTime: u32,
@@ -138,33 +139,39 @@ fn get_cpu_times() -> (u64, u64) {
#[cfg(target_os = "freebsd")]
fn get_cpu_times() -> (u64, u64) {
// kern.cp_time returns: user nice system interrupt idle
if let Ok(output) = std::process::Command::new("sysctl")
.arg("-n")
.arg("kern.cp_time")
.output()
{
if output.status.success() {
let text = String::from_utf8_lossy(&output.stdout);
let parts: Vec<u64> = text
.split_whitespace()
.filter_map(|s| s.parse().ok())
.collect();
if parts.len() >= 5 {
let user = parts[0];
let nice = parts[1];
let system = parts[2];
let interrupt = parts[3];
let idle = parts[4];
let total = user + nice + system + interrupt + idle;
return (total, idle);
}
}
const CTL_KERN: libc::c_int = 1;
const KERN_CP_TIME: libc::c_int = 40;
let mut mib: [libc::c_int; 2] = [CTL_KERN, KERN_CP_TIME];
let mut cp_time: [libc::c_ulong; 5] = [0; 5];
let mut len = std::mem::size_of_val(&cp_time);
let ret = unsafe {
libc::sysctl(
mib.as_mut_ptr(),
mib.len() as u32,
&mut cp_time as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if ret == 0 {
let user = cp_time[0] as u64;
let nice = cp_time[1] as u64;
let system = cp_time[2] as u64;
let interrupt = cp_time[3] as u64;
let idle = cp_time[4] as u64;
let total = user + nice + system + interrupt + idle;
return (total, idle);
}
(0, 0)
}
#[cfg(not(any(
target_os = "linux",
target_os = "android",
target_os = "macos",
target_os = "windows",
target_os = "freebsd",
@@ -193,6 +200,7 @@ mod tests {
// On supported platforms, total should be > 0
if cfg!(any(
target_os = "linux",
target_os = "android",
target_os = "macos",
target_os = "windows",
target_os = "freebsd",

View File

@@ -9,7 +9,7 @@ use std::path::Path;
use std::sync::Mutex;
use std::time::SystemTime;
static CSV_FILE: Mutex<Option<String>> = Mutex::new(None);
static CSV_FILE: Mutex<Option<(String, std::fs::File)>> = Mutex::new(None);
static QUIET: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
const HEADER: &str = "timestamp,host,port,protocol,direction,duration_s,tx_avg_mbps,rx_avg_mbps,tx_bytes,rx_bytes,lost_packets,local_cpu_pct,remote_cpu_pct,auth_type";
@@ -18,12 +18,12 @@ const HEADER: &str = "timestamp,host,port,protocol,direction,duration_s,tx_avg_m
pub fn init(path: &str) -> std::io::Result<()> {
let needs_header = !Path::new(path).exists() || std::fs::metadata(path)?.len() == 0;
let mut f = OpenOptions::new().create(true).append(true).open(path)?;
if needs_header {
let mut f = OpenOptions::new().create(true).write(true).open(path)?;
writeln!(f, "{}", HEADER)?;
}
*CSV_FILE.lock().unwrap() = Some(path.to_string());
*CSV_FILE.lock().unwrap() = Some((path.to_string(), f));
Ok(())
}
@@ -49,8 +49,8 @@ pub fn write_result(
remote_cpu: u8,
auth_type: &str,
) {
let guard = CSV_FILE.lock().unwrap();
if let Some(ref path) = *guard {
let mut guard = CSV_FILE.lock().unwrap();
if let Some((ref _path, ref mut file)) = *guard {
let tx_mbps = if duration_secs > 0 {
tx_bytes as f64 * 8.0 / duration_secs as f64 / 1_000_000.0
} else {
@@ -74,9 +74,8 @@ pub fn write_result(
local_cpu, remote_cpu, auth_type,
);
if let Ok(mut f) = OpenOptions::new().append(true).open(path) {
let _ = writeln!(f, "{}", row);
}
let _ = writeln!(file, "{}", row);
let _ = file.flush();
}
}

View File

@@ -6,6 +6,8 @@
//!
//! btest framing: `[len:1][payload]` (no 0x06 handler byte, unlike Winbox).
use std::sync::LazyLock;
use num_bigint::BigUint;
use num_integer::Integer;
use num_traits::{One, Zero};
@@ -14,31 +16,33 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::protocol::{BtestError, Result};
// --- Curve25519 parameters in Weierstrass form ---
// --- Curve25519 parameters in Weierstrass form (cached, computed once) ---
fn p() -> BigUint {
static P: LazyLock<BigUint> = LazyLock::new(|| {
BigUint::parse_bytes(
b"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffed",
16,
)
.unwrap()
}
});
fn curve_order() -> BigUint {
static CURVE_ORDER: LazyLock<BigUint> = LazyLock::new(|| {
BigUint::parse_bytes(
b"1000000000000000000000000000000014def9dea2f79cd65812631a5cf5d3ed",
16,
)
.unwrap()
}
});
fn weierstrass_a() -> BigUint {
static WEIERSTRASS_A: LazyLock<BigUint> = LazyLock::new(|| {
BigUint::parse_bytes(
b"2aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa984914a144",
16,
)
.unwrap()
}
});
pub static WCURVE: LazyLock<WCurve> = LazyLock::new(WCurve::new);
const MONT_A: u64 = 486662;
@@ -50,10 +54,10 @@ fn modinv(a: &BigUint, modulus: &BigUint) -> BigUint {
a.modpow(&exp, modulus)
}
fn legendre_symbol(a: &BigUint, p_val: &BigUint) -> i32 {
let exp = (p_val - BigUint::one()) / BigUint::from(2u32);
let l = a.modpow(&exp, p_val);
if l == p_val - BigUint::one() {
fn legendre_symbol(a: &BigUint, p: &BigUint) -> i32 {
let exp = (p - BigUint::one()) / BigUint::from(2u32);
let l = a.modpow(&exp, p);
if l == p - BigUint::one() {
-1
} else if l == BigUint::zero() {
0
@@ -166,7 +170,7 @@ impl Point {
}
fn add(&self, other: &Point) -> Point {
let p_val = p();
let p_val = &*P;
if self.infinity {
return other.clone();
}
@@ -179,44 +183,44 @@ impl Point {
let lam = if self.x == other.x && self.y == other.y {
// Point doubling
let three_x_sq = (BigUint::from(3u32) * &self.x * &self.x + &weierstrass_a()) % &p_val;
let two_y = (BigUint::from(2u32) * &self.y) % &p_val;
(three_x_sq * modinv(&two_y, &p_val)) % &p_val
let three_x_sq = (BigUint::from(3u32) * &self.x * &self.x + &*WEIERSTRASS_A) % p_val;
let two_y = (BigUint::from(2u32) * &self.y) % p_val;
(three_x_sq * modinv(&two_y, p_val)) % p_val
} else {
// Point addition
let dy = if other.y >= self.y {
(&other.y - &self.y) % &p_val
(&other.y - &self.y) % p_val
} else {
(&p_val - (&self.y - &other.y) % &p_val) % &p_val
(p_val - (&self.y - &other.y) % p_val) % p_val
};
let dx = if other.x >= self.x {
(&other.x - &self.x) % &p_val
(&other.x - &self.x) % p_val
} else {
(&p_val - (&self.x - &other.x) % &p_val) % &p_val
(p_val - (&self.x - &other.x) % p_val) % p_val
};
(dy * modinv(&dx, &p_val)) % &p_val
(dy * modinv(&dx, p_val)) % p_val
};
let x3 = {
let lam_sq = (&lam * &lam) % &p_val;
let sum_x = (&self.x + &other.x) % &p_val;
let lam_sq = (&lam * &lam) % p_val;
let sum_x = (&self.x + &other.x) % p_val;
if lam_sq >= sum_x {
(lam_sq - sum_x) % &p_val
(lam_sq - sum_x) % p_val
} else {
(&p_val - (sum_x - lam_sq) % &p_val) % &p_val
(p_val - (sum_x - lam_sq) % p_val) % p_val
}
};
let y3 = {
let dx = if self.x >= x3 {
(&self.x - &x3) % &p_val
(&self.x - &x3) % p_val
} else {
(&p_val - (&x3 - &self.x) % &p_val) % &p_val
(p_val - (&x3 - &self.x) % p_val) % p_val
};
let prod = (&lam * dx) % &p_val;
let prod = (&lam * dx) % p_val;
if prod >= self.y {
(prod - &self.y) % &p_val
(prod - &self.y) % p_val
} else {
(&p_val - (&self.y - prod) % &p_val) % &p_val
(p_val - (&self.y - prod) % p_val) % p_val
}
};
@@ -226,14 +230,13 @@ impl Point {
fn scalar_mul(&self, scalar: &BigUint) -> Point {
let mut result = Point::infinity();
let mut base = self.clone();
let mut k = scalar.clone();
let bits = scalar.bits();
while !k.is_zero() {
if &k & &BigUint::one() == BigUint::one() {
for i in 0..bits {
if scalar.bit(i) {
result = result.add(&base);
}
base = base.add(&base);
k >>= 1;
}
result
}
@@ -241,19 +244,19 @@ impl Point {
// --- WCurve: Curve25519 in Weierstrass form ---
struct WCurve {
pub struct WCurve {
g: Point,
conversion_from_m: BigUint,
conversion_to_m: BigUint,
}
impl WCurve {
fn new() -> Self {
let p_val = p();
pub fn new() -> Self {
let p_val = &*P;
let mont_a = BigUint::from(MONT_A);
let three_inv = modinv(&BigUint::from(3u32), &p_val);
let conversion_from_m = (&mont_a * &three_inv) % &p_val;
let conversion_to_m = (&p_val - &conversion_from_m) % &p_val;
let three_inv = modinv(&BigUint::from(3u32), p_val);
let conversion_from_m = (&mont_a * &three_inv) % p_val;
let conversion_to_m = (p_val - &conversion_from_m) % p_val;
let mut curve = WCurve {
g: Point::infinity(),
@@ -265,8 +268,8 @@ impl WCurve {
}
fn to_montgomery(&self, pt: &Point) -> ([u8; 32], u8) {
let p_val = p();
let x = (&pt.x + &self.conversion_to_m) % &p_val;
let p_val = &*P;
let x = (&pt.x + &self.conversion_to_m) % p_val;
let parity = if pt.y.bit(0) { 1u8 } else { 0u8 };
let mut bytes = [0u8; 32];
let x_bytes = x.to_bytes_be();
@@ -276,14 +279,14 @@ impl WCurve {
}
fn lift_x(&self, x_mont: &BigUint, parity: bool) -> Point {
let p_val = p();
let x = x_mont % &p_val;
let p_val = &*P;
let x = x_mont % p_val;
// y^2 = x^3 + Ax^2 + x (Montgomery)
let y_squared = (&x * &x * &x + BigUint::from(MONT_A) * &x * &x + &x) % &p_val;
let y_squared = (&x * &x * &x + BigUint::from(MONT_A) * &x * &x + &x) % p_val;
// Convert x to Weierstrass
let x_w = (&x + &self.conversion_from_m) % &p_val;
let x_w = (&x + &self.conversion_from_m) % p_val;
if let Some((y1, y2)) = prime_mod_sqrt(&y_squared, &p_val) {
if let Some((y1, y2)) = prime_mod_sqrt(&y_squared, p_val) {
let pt1 = Point::new(x_w.clone(), y1);
let pt2 = Point::new(x_w, y2);
if parity {
@@ -323,7 +326,7 @@ impl WCurve {
password: &str,
salt: &[u8; 16],
) -> [u8; 32] {
let inner = sha256_bytes(&format!("{}:{}", username, password).as_bytes().to_vec());
let inner = sha256_bytes(format!("{}:{}", username, password).as_bytes());
let mut input = Vec::with_capacity(16 + 32);
input.extend_from_slice(salt);
input.extend_from_slice(&inner);
@@ -359,7 +362,7 @@ pub async fn client_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
password: &str,
) -> Result<()> {
tracing::info!("Starting EC-SRP5 authentication");
let w = WCurve::new();
let w = &*WCURVE;
// Generate client ephemeral keypair
let s_a: [u8; 32] = rand::random();
@@ -415,8 +418,8 @@ pub async fn client_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
let i_int = BigUint::from_bytes_be(&i);
let j_int = BigUint::from_bytes_be(&j);
let s_a_int = BigUint::from_bytes_be(&s_a);
let order = curve_order();
let scalar = ((&i_int * &j_int) + &s_a_int) % &order;
let order = &*CURVE_ORDER;
let scalar = ((&i_int * &j_int) + &s_a_int) % order;
let z_point = w_b_unblinded.scalar_mul(&scalar);
let (z, _) = w.to_montgomery(&z_point);
@@ -476,7 +479,7 @@ impl EcSrp5Credentials {
/// Derive EC-SRP5 credentials from username/password (done once at startup).
pub fn derive(username: &str, password: &str) -> Self {
let salt: [u8; 16] = rand::random();
let w = WCurve::new();
let w = &*WCURVE;
let i = w.gen_password_validator_priv(username, password, &salt);
let (x_gamma, parity) = w.gen_public_key(&i);
Self {
@@ -495,7 +498,7 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
creds: &EcSrp5Credentials,
) -> Result<()> {
tracing::info!("Starting EC-SRP5 server authentication");
let w = WCurve::new();
let w = &*WCURVE;
// MSG1: read [len][username\0][pubkey:32][parity:1]
let mut len_buf = [0u8; 1];
@@ -598,7 +601,12 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
mod hex {
pub fn encode(data: &[u8]) -> String {
data.iter().map(|b| format!("{:02x}", b)).collect()
let mut s = String::with_capacity(data.len() * 2);
for b in data {
use std::fmt::Write;
let _ = write!(s, "{:02x}", b);
}
s
}
}

View File

@@ -4,6 +4,8 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::Mutex;
@@ -18,6 +20,7 @@ struct TcpSession {
peer_ip: std::net::IpAddr,
streams: Vec<TcpStream>,
expected: u8,
notify: Arc<Notify>,
}
type SessionMap = Arc<Mutex<HashMap<u16, TcpSession>>>;
@@ -135,6 +138,11 @@ async fn handle_client(
) -> Result<()> {
stream.set_nodelay(true)?;
// Set TCP socket buffers to 4MB (matching UDP path) for high throughput
let sock_ref = socket2::SockRef::from(&stream);
let _ = sock_ref.set_send_buffer_size(4 * 1024 * 1024);
let _ = sock_ref.set_recv_buffer_size(4 * 1024 * 1024);
send_hello(&mut stream).await?;
// Read 16-byte command (or whatever the client sends)
@@ -164,6 +172,7 @@ async fn handle_client(
stream.flush().await?;
session.streams.push(stream);
session.notify.notify_one();
tracing::info!(
"Secondary connection joined ({}/{})",
session.streams.len() + 1,
@@ -244,6 +253,7 @@ async fn handle_client(
for (_t, s) in map.iter_mut() {
if s.peer_ip == peer.ip() && s.streams.len() < s.expected as usize {
s.streams.push(stream);
s.notify.notify_one();
return Ok(());
}
}
@@ -294,12 +304,14 @@ async fn handle_client(
let conn_count = cmd.tcp_conn_count;
// Register session for secondary connections to find
let notify = Arc::new(Notify::new());
{
let mut map = sessions.lock().await;
map.insert(session_token, TcpSession {
peer_ip: peer.ip(),
streams: Vec::new(),
expected: conn_count,
notify: notify.clone(),
});
}
@@ -315,7 +327,8 @@ async fn handle_client(
if count + 1 >= conn_count as usize {
break;
}
if Instant::now() > deadline {
let now = Instant::now();
if now >= deadline {
tracing::warn!(
"Timeout waiting for TCP connections ({}/{}), proceeding",
count + 1,
@@ -323,7 +336,17 @@ async fn handle_client(
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
match tokio::time::timeout(deadline - now, notify.notified()).await {
Ok(()) => continue,
Err(_) => {
tracing::warn!(
"Timeout waiting for TCP connections ({}/{}), proceeding",
count + 1,
conn_count,
);
break;
}
}
}
let extra_streams = {
@@ -367,6 +390,7 @@ async fn handle_client(
// --- TCP Test Server ---
/// Public TX task for multi-connection use by server_pro.
#[cfg(feature = "pro")]
pub async fn tcp_tx_task(
writer: tokio::net::tcp::OwnedWriteHalf,
tx_size: usize,
@@ -377,6 +401,7 @@ pub async fn tcp_tx_task(
}
/// Public RX task for multi-connection use by server_pro.
#[cfg(feature = "pro")]
pub async fn tcp_rx_task(
reader: tokio::net::tcp::OwnedReadHalf,
state: Arc<BandwidthState>,
@@ -386,6 +411,7 @@ pub async fn tcp_rx_task(
/// Run a TCP bandwidth test on an already-authenticated stream.
/// Public API for use by server_pro.
#[cfg(feature = "pro")]
pub async fn run_tcp_test(
stream: TcpStream,
cmd: Command,
@@ -470,6 +496,7 @@ async fn run_tcp_test_inner(stream: TcpStream, cmd: Command, state: Arc<Bandwidt
}
/// Public API for multi-connection TCP test with external state. Used by server_pro.
#[cfg(feature = "pro")]
pub async fn run_tcp_multiconn_test(
streams: Vec<TcpStream>,
cmd: Command,
@@ -571,15 +598,19 @@ async fn tcp_tx_loop_inner(
) {
tokio::time::sleep(Duration::from_millis(100)).await;
let packet = vec![0u8; tx_size];
let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16);
// Use larger writes when running unlimited to reduce syscall overhead
let effective_size = if interval.is_none() { tx_size.max(256 * 1024) } else { tx_size };
let packet = vec![0u8; effective_size];
let mut next_send = Instant::now();
let mut next_status = Instant::now() + Duration::from_secs(1);
let mut status_seq: u32 = 0;
while state.running.load(Ordering::Relaxed) {
let now = Instant::now();
// Inject status message every ~1 second if in bidirectional mode
if send_status && Instant::now() >= next_status {
if send_status && now >= next_status {
status_seq += 1;
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage { cpu_load: crate::cpu::get(),
@@ -592,28 +623,27 @@ async fn tcp_tx_loop_inner(
}
state.record_interval(0, rx_bytes, 0);
bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None);
next_status = Instant::now() + Duration::from_secs(1);
next_status = now + Duration::from_secs(1);
}
if !state.spend_budget(tx_size as u64) {
if !state.spend_budget(effective_size as u64) {
break;
}
if writer.write_all(&packet).await.is_err() {
state.running.store(false, Ordering::SeqCst);
break;
}
state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed);
state.tx_bytes.fetch_add(effective_size as u64, Ordering::Relaxed);
if state.tx_speed_changed.load(Ordering::Relaxed) {
state.tx_speed_changed.store(false, Ordering::Relaxed);
let new_speed = state.tx_speed.load(Ordering::Relaxed);
interval = bandwidth::calc_send_interval(new_speed, tx_size as u16);
next_send = Instant::now();
next_send = now;
}
match interval {
Some(iv) => {
let now = Instant::now();
if let Some(delay) = bandwidth::advance_next_send(&mut next_send, iv, now) {
tokio::time::sleep(delay).await;
}
@@ -626,7 +656,7 @@ async fn tcp_tx_loop_inner(
}
async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut buf = vec![0u8; 256 * 1024];
while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => {
@@ -686,6 +716,7 @@ async fn tcp_status_sender(
/// Run a UDP bandwidth test on an already-authenticated stream.
/// Public API for use by server_pro. Caller provides the UDP port offset.
#[cfg(feature = "pro")]
pub async fn run_udp_test(
stream: &mut TcpStream,
peer: SocketAddr,
@@ -906,36 +937,43 @@ async fn udp_tx_loop(
async fn udp_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut last_seq: Option<u32> = None;
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
while state.running.load(Ordering::Relaxed) {
// Use recv_from to accept packets from any source port
// (multi-connection MikroTik sends from multiple ports)
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
Ok(Ok((n, _src))) if n >= 4 => {
if !state.spend_budget(n as u64) {
break;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
tokio::select! {
biased;
res = socket.recv_from(&mut buf) => {
match res {
Ok((n, _src)) if n >= 4 => {
if !state.spend_budget(n as u64) {
return;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
}
}
last_seq = Some(seq);
state.last_udp_seq.store(seq, Ordering::Relaxed);
}
Ok(_) => {}
Err(e) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
last_seq = Some(seq);
state.last_udp_seq.store(seq, Ordering::Relaxed);
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(_) => {
_ = &mut timeout => {
tracing::debug!("UDP RX timeout");
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
}
}

View File

@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use btest_rs::bandwidth::BandwidthState;
use super::quota::{Direction, QuotaManager};
use super::quota::{Direction, QuotaError, QuotaManager};
/// Enforces quotas during an active test session.
/// Call `run()` as a spawned task — it will set `state.running = false`
@@ -154,10 +154,10 @@ impl QuotaEnforcer {
// The DB has usage from PREVIOUS sessions; we add current session bytes
if let Err(e) = self.quota_mgr.check_user(&self.username) {
// Already exceeded from previous sessions
return match format!("{}", e).as_str() {
s if s.contains("daily") => StopReason::UserDailyQuota,
s if s.contains("weekly") => StopReason::UserWeeklyQuota,
s if s.contains("monthly") => StopReason::UserMonthlyQuota,
return match e {
QuotaError::DailyExceeded { .. } => StopReason::UserDailyQuota,
QuotaError::WeeklyExceeded { .. } => StopReason::UserWeeklyQuota,
QuotaError::MonthlyExceeded { .. } => StopReason::UserMonthlyQuota,
_ => StopReason::UserDailyQuota,
};
}
@@ -169,13 +169,13 @@ impl QuotaEnforcer {
StopReason::Running
}
fn check_ip_with_session(&self, ip_str: &str, session_tx: u64, session_rx: u64) -> StopReason {
fn check_ip_with_session(&self, _ip_str: &str, _session_tx: u64, _session_rx: u64) -> StopReason {
if let Err(e) = self.quota_mgr.check_ip(&self.ip, Direction::Both) {
return match format!("{}", e).as_str() {
s if s.contains("IP daily") => StopReason::IpDailyQuota,
s if s.contains("IP weekly") => StopReason::IpWeeklyQuota,
s if s.contains("IP monthly") => StopReason::IpMonthlyQuota,
s if s.contains("connections") => StopReason::IpDailyQuota, // reuse
return match e {
QuotaError::IpDailyExceeded { .. } | QuotaError::IpInboundDailyExceeded { .. } | QuotaError::IpOutboundDailyExceeded { .. } => StopReason::IpDailyQuota,
QuotaError::IpWeeklyExceeded { .. } | QuotaError::IpInboundWeeklyExceeded { .. } | QuotaError::IpOutboundWeeklyExceeded { .. } => StopReason::IpWeeklyQuota,
QuotaError::IpMonthlyExceeded { .. } | QuotaError::IpInboundMonthlyExceeded { .. } | QuotaError::IpOutboundMonthlyExceeded { .. } => StopReason::IpMonthlyQuota,
QuotaError::TooManyConnections { .. } => StopReason::IpDailyQuota, // reuse
_ => StopReason::IpDailyQuota,
};
}
@@ -183,13 +183,13 @@ impl QuotaEnforcer {
}
/// Flush session bytes to DB. Call periodically and at session end.
pub fn flush_to_db(&self) {
pub fn flush_to_db(&self, ip_str: &str) {
let tx = self.state.total_tx_bytes.load(Ordering::Relaxed);
let rx = self.state.total_rx_bytes.load(Ordering::Relaxed);
// From server perspective: tx = outbound (we sent), rx = inbound (we received)
self.quota_mgr.record_usage(
&self.username,
&self.ip.to_string(),
ip_str,
rx, // inbound = what we received from client
tx, // outbound = what we sent to client
);
@@ -330,7 +330,7 @@ mod tests {
qm, "testuser".into(), "127.0.0.1".parse().unwrap(),
state, 10, 0,
);
enforcer.flush_to_db();
enforcer.flush_to_db("127.0.0.1");
// flush_to_db: total_tx=5000→outbound, total_rx=3000→inbound
// quota_mgr.record_usage(inbound=3000, outbound=5000)

View File

@@ -15,6 +15,22 @@ pub struct LdapAuth {
config: LdapConfig,
}
/// Escape special characters in LDAP filter values per RFC 4515.
fn ldap_escape(input: &str) -> String {
let mut out = String::with_capacity(input.len());
for c in input.chars() {
match c {
'\\' => out.push_str("\\5c"),
'*' => out.push_str("\\2a"),
'(' => out.push_str("\\28"),
')' => out.push_str("\\29"),
'\0' => out.push_str("\\00"),
_ => out.push(c),
}
}
out
}
impl LdapAuth {
pub fn new(config: LdapConfig) -> Self {
Self { config }
@@ -26,6 +42,8 @@ impl LdapAuth {
let (conn, mut ldap) = LdapConnAsync::new(&self.config.url).await?;
ldap3::drive!(conn);
let safe_username = ldap_escape(username);
// If service account configured, bind first to search for user DN
let user_dn = if let (Some(ref bind_dn), Some(ref bind_pass)) =
(&self.config.bind_dn, &self.config.bind_pass)
@@ -39,7 +57,7 @@ impl LdapAuth {
// Search for the user
let filter = format!(
"(&(objectClass=person)(|(uid={})(sAMAccountName={})(cn={})))",
username, username, username
safe_username, safe_username, safe_username
);
let (results, _) = ldap
.search(&self.config.base_dn, Scope::Subtree, &filter, vec!["dn"])
@@ -51,11 +69,17 @@ impl LdapAuth {
return Ok(false);
}
let entry = SearchEntry::construct(results.into_iter().next().unwrap());
let entry = match results.into_iter().next() {
Some(r) => SearchEntry::construct(r),
None => {
tracing::debug!("LDAP user not found: {}", username);
return Ok(false);
}
};
entry.dn
} else {
// No service account — construct DN directly
format!("uid={},{}", username, self.config.base_dn)
format!("uid={},{}", safe_username, self.config.base_dn)
};
// Attempt user bind

View File

@@ -609,32 +609,20 @@ impl UserDb {
fn hash_password(username: &str, password: &str) -> String {
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(format!("{}:{}", username, password).as_bytes());
hasher.update(username.as_bytes());
hasher.update(b":");
hasher.update(password.as_bytes());
let result = hasher.finalize();
result.iter().map(|b| format!("{:02x}", b)).collect()
let mut hex = String::with_capacity(64);
for b in result {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
hex
}
fn chrono_date_today() -> String {
// Simple date without chrono crate
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
let days = secs / 86400;
let mut y = 1970u64;
let mut remaining = days;
loop {
let leap = if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) { 366 } else { 365 };
if remaining < leap { break; }
remaining -= leap;
y += 1;
}
let leap = y % 4 == 0 && (y % 100 != 0 || y % 400 == 0);
let days_in_months = [31u64, if leap { 29 } else { 28 }, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
let mut m = 0usize;
for i in 0..12 {
if remaining < days_in_months[i] { m = i; break; }
remaining -= days_in_months[i];
}
format!("{:04}-{:02}-{:02}", y, m + 1, remaining + 1)
chrono::Local::now().format("%Y-%m-%d").to_string()
}
// Re-export for use by rusqlite

View File

@@ -36,12 +36,12 @@ pub fn init(target: &str) -> std::io::Result<()> {
/// Send a syslog message with the given severity and message.
/// Severity: 6=info, 4=warning, 3=error
fn send(severity: u8, msg: &str) {
// Format timestamp outside the lock to minimize contention
let priority = 128 + severity;
let timestamp = bsd_timestamp();
let guard = SYSLOG.lock().unwrap();
if let Some(ref sender) = *guard {
// RFC 3164 (BSD syslog): <priority>Mon DD HH:MM:SS hostname program: message
// facility=16 (local0) * 8 + severity
let priority = 128 + severity;
let timestamp = bsd_timestamp();
let syslog_msg = format!(
"<{}>{} {} btest-rs: {}",
priority, timestamp, sender.hostname, msg,
@@ -52,44 +52,7 @@ fn send(severity: u8, msg: &str) {
fn bsd_timestamp() -> String {
// RFC 3164 format: "Mon DD HH:MM:SS" (no year)
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Simple conversion — good enough for syslog
let secs_in_day = 86400u64;
let days = now / secs_in_day;
let time_of_day = now % secs_in_day;
let hours = time_of_day / 3600;
let minutes = (time_of_day % 3600) / 60;
let seconds = time_of_day % 60;
// Day of year calculation (approximate months)
let months = ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"];
let days_in_months = [31u64,28,31,30,31,30,31,31,30,31,30,31];
// Days since epoch to year/month/day
let mut y = 1970u64;
let mut remaining = days;
loop {
let leap = if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) { 366 } else { 365 };
if remaining < leap { break; }
remaining -= leap;
y += 1;
}
let leap = y % 4 == 0 && (y % 100 != 0 || y % 400 == 0);
let mut m = 0usize;
for i in 0..12 {
let mut d = days_in_months[i];
if i == 1 && leap { d += 1; }
if remaining < d { m = i; break; }
remaining -= d;
}
let day = remaining + 1;
format!("{} {:2} {:02}:{:02}:{:02}", months[m], day, hours, minutes, seconds)
chrono::Local::now().format("%b %e %H:%M:%S").to_string()
}
// --- Public logging functions ---

View File

@@ -235,7 +235,7 @@ async fn test_csv_created_client() {
// Initialize CSV
btest_rs::csv_output::init(&csv_path).unwrap();
let (tx, rx, lost, intervals) = run_client_test(
let (tx, rx, lost, _intervals) = run_client_test(
"127.0.0.1", port, false, true, false, None, None,
).await;
@@ -336,3 +336,67 @@ async fn test_bandwidth_state_running_flag() {
state.running.store(false, Ordering::SeqCst);
assert!(!state.running.load(Ordering::Relaxed));
}
// --- CPU Reporting Tests ---
/// Helper that returns the full BandwidthState (not just summary) so we can check remote_cpu.
async fn run_client_with_state(
host: &str, port: u16, transmit: bool, receive: bool, udp: bool,
secs: u64,
) -> std::sync::Arc<btest_rs::bandwidth::BandwidthState> {
let direction = match (transmit, receive) {
(true, false) => btest_rs::protocol::CMD_DIR_RX,
(false, true) => btest_rs::protocol::CMD_DIR_TX,
(true, true) => btest_rs::protocol::CMD_DIR_BOTH,
_ => panic!("must specify direction"),
};
let state = btest_rs::bandwidth::BandwidthState::new();
let state_clone = state.clone();
let host = host.to_string();
let handle = tokio::spawn(async move {
btest_rs::client::run_client(
&host, port, direction, udp,
0, 0, None, None, false, state_clone,
).await
});
tokio::time::sleep(Duration::from_secs(secs)).await;
state.running.store(false, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(500)).await;
handle.abort();
state
}
#[test]
fn test_local_cpu_nonzero() {
// CPU sampler should return > 0 on supported platforms after warming up
btest_rs::cpu::start_sampler();
std::thread::sleep(Duration::from_secs(2));
let cpu = btest_rs::cpu::get();
// On CI or idle machines, CPU may genuinely be 0, so just check it doesn't panic
// and returns a value in range
assert!(cpu <= 100, "CPU should be 0-100, got {}", cpu);
}
#[tokio::test]
async fn test_tcp_remote_cpu_both() {
let port = BASE_PORT + 20;
start_server_noauth(port).await;
let state = run_client_with_state("127.0.0.1", port, true, true, false, 3).await;
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
// On loopback with bidirectional traffic, server CPU should be > 0
// The status messages are interleaved in the TCP data stream
assert!(remote_cpu > 0, "TCP BOTH: remote CPU should be > 0 on loopback, got {}", remote_cpu);
}
#[tokio::test]
async fn test_tcp_remote_cpu_tx_only() {
let port = BASE_PORT + 21;
start_server_noauth(port).await;
let state = run_client_with_state("127.0.0.1", port, true, false, false, 3).await;
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
// TX-only: server sends status messages that the status reader should parse
assert!(remote_cpu > 0, "TCP TX-only: remote CPU should be > 0 on loopback, got {}", remote_cpu);
}