5 Commits
v0.6.3 ... main

Author SHA1 Message Date
Siavash Sameni
3afbfb42cf bench: add criterion benchmarks for protocol, bandwidth, TCP RX scan, and EC-SRP5
Some checks failed
CI / test (push) Failing after 1m42s
Adds four Criterion.rs benchmark suites to measure hot-path performance
and demonstrate the impact of Sprints 1–3 optimizations:

- benches/protocol.rs    — Command & StatusMessage serialize/deserialize
- benches/bandwidth.rs   — BandwidthState atomics, budget, interval math
- benches/tcp_rx_scan.rs — memchr SIMD scan vs naive O(n) loop (55× faster
                           on 256KB buffers with status at end)
- benches/ecsrp5.rs      — WCurve::new() heavy math vs cached LazyLock
                           (~123,000× faster access)

Also adds BENCHMARKS.md with usage instructions and example results.

Visibility changes (bench-only):
- scan_status_message is now pub (was #[cfg(test)] only)
- WCurve and WCURVE are now pub in ecsrp5.rs

dev-dependencies: criterion + pprof (optional flamegraph support)
2026-04-30 21:01:38 +04:00
Siavash Sameni
bba9b0512c perf: replace O(n) TCP RX buffer scan with SIMD memchr + carry buffer (Sprint 3)
All checks were successful
CI / test (push) Successful in 2m14s
This commit fixes the most significant hot-path bottleneck in the
client: the tcp_client_rx_loop was scanning up to 256KB byte-by-byte
on every read() call looking for interleaved 12-byte status messages.

Changes:
- client.rs (tcp_client_rx_loop): Replace the O(n) for-loop scan
  with a three-stage approach:

  1. Split-message check: An 11-byte carry buffer stores trailing
     bytes from the previous read. We check every possible alignment
     where a status message (0x07 + cpu_byte) could span the carry
     and the start of the current buffer. This fixes a latent bug
     where the old code would miss status messages split across TCP
     read boundaries.

  2. Fast scan: memchr::memchr (AVX2/NEON SIMD) finds 0x07 bytes
     in the 256KB buffer. On all-zero data packets this exits in
     ~4096 SIMD-width operations instead of 262,144 byte compares.
     ~64x faster scan path.

  3. Carry save: Save up to 11 trailing bytes for the next read.

- client.rs (unit tests): Add scan_status_message() helper and
  five unit tests covering:
  - Status message fully within buffer
  - Status message split across reads (5+7 bytes)
  - Status message split at boundary (1+11 bytes)
  - All-zero buffer (no false positive)
  - Short buffer (no panic)

- Cargo.toml / Cargo.lock: Add memchr as an explicit dependency.

Verified against live MikroTik RouterOS (TCP both + receive modes
with EC-SRP5 auth). Status messages detected correctly. No wire
protocol changes — 100% MikroTik compatible.
2026-04-30 20:46:34 +04:00
Siavash Sameni
205030ce33 perf: reduce async runtime and platform overhead (Sprint 2)
This commit applies three concurrency and platform-specific
optimizations. No wire protocol changes — 100% MikroTik compatible.

Changes:
- cpu.rs (FreeBSD): Replace fork+exec of 'sysctl -n kern.cp_time'
  with a direct libc::sysctl FFI call. Eliminates one process spawn
  per second on FreeBSD. Uses CTL_KERN / KERN_CP_TIME mib to read
  the 5-element cp_time array directly into a [c_ulong; 5].

- server.rs (multi-conn TCP): Replace the 100ms busy-poll loop in
  tcp_client_rx_loop with tokio::sync::Notify. When a secondary
  TCP connection joins a multi-connection session, it calls
  notify_one() to wake the primary connection immediately instead
  of waiting up to 100ms. Adds an Arc<Notify> to TcpSession and
  updates all secondary connection push sites to signal it.

- client.rs + server.rs (UDP RX): Replace per-recv
  tokio::time::timeout(Duration::from_secs(5), socket.recv(...))
  with a pinned tokio::time::sleep future inside tokio::select!.
  This eliminates timer wheel registration/cancel overhead on every
  UDP packet receive, which is significant at high packet rates.
  The timeout still fires correctly when no packets arrive for 5s.

No new dependencies.
2026-04-30 20:46:13 +04:00
Siavash Sameni
b3c12b7f8b perf: eliminate redundant allocations and computations (Sprint 1)
This commit applies eight low-risk internal optimizations identified
in the performance audit. No wire protocol changes — 100% MikroTik
compatible.

Changes:
- ecsrp5.rs: Cache WCurve in a global LazyLock, eliminating the
  expensive BigUint modular square root recomputation on every
  EC-SRP5 authentication. Also optimize the local hex::encode
  module to use a single pre-allocated String instead of N format!
  allocations.

- server.rs: Deduplicate Instant::now() calls in the TCP TX hot
  loop, caching the result at the top of each iteration.

- csv_output.rs: Hold the CSV file handle open in a static
  Mutex<Option<(String, File)>> instead of reopening the file on
  every write_result call. Add explicit flush after each write.

- server_pro/user_db.rs: Replace hand-rolled Gregorian calendar
  math (30+ lines looping from 1970) with chrono::Local::now().
  Optimize hash_password() to write username:password directly
  into the SHA256 hasher and hex-encode with a pre-allocated
  String.

- server_pro/enforcer.rs: Replace allocating error string matching
  (format!({}, e).as_str().contains(...)) with direct
  QuotaError variant matching. Pass ip_str into flush_to_db()
  to avoid a per-call ip.to_string().

- syslog_logger.rs: Move timestamp formatting outside the global
  std::sync::Mutex to reduce lock hold time. Replace manual
  calendar arithmetic with chrono::Local::now().format().

New dependency: chrono (already pulled in transitively by rusqlite).
2026-04-30 20:45:56 +04:00
Siavash Sameni
a655d3bbe8 Re-enable release CI workflow trigger
All checks were successful
CI / test (push) Successful in 2m12s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 11:54:04 +04:00
18 changed files with 2283 additions and 182 deletions

View File

@@ -3,7 +3,7 @@ name: Build & Release
on:
push:
tags:
- 'disabled-v*'
- 'v*'
jobs:
release:

54
BENCHMARKS.md Normal file
View File

@@ -0,0 +1,54 @@
# Benchmarks
This project uses [Criterion.rs](https://bheisler.github.io/criterion.rs/book/) for performance benchmarking and regression detection.
## Running Benchmarks
Run all benchmarks:
```bash
cargo bench
```
Run a specific benchmark suite:
```bash
cargo bench --bench protocol
cargo bench --bench bandwidth
cargo bench --bench tcp_rx_scan
cargo bench --bench ecsrp5
```
Run in "quick" mode (fewer iterations, useful for development):
```bash
cargo bench --bench tcp_rx_scan -- --quick
```
## Benchmark Suites
### `protocol` — Protocol Serialization
Measures the zero-allocation serialization/deserialization of `Command` (16 bytes) and `StatusMessage` (12 bytes) structs.
### `bandwidth` — Bandwidth State Atomics
Measures `BandwidthState` hot-path operations: `fetch_add`, `spend_budget`, `calc_send_interval`, `advance_next_send`, and `summary`.
### `tcp_rx_scan` — TCP RX Status Message Scan
Compares the optimized `memchr`-based scan against the old naive O(n) byte-by-byte loop on 256KB buffers. Key scenarios:
- **All zeros** (common case — data packets contain no status)
- **Status at start**
- **Status at end** (worst case for naive scan)
- **Split messages** (status spans two TCP reads)
### `ecsrp5` — EC-SRP5 Curve Construction
Compares `WCurve::new()` (heavy `BigUint` modular arithmetic) against the cached `&*WCURVE` access to demonstrate the Sprint 1 optimization.
## Interpreting Results
Criterion generates HTML reports in `target/criterion/`. Open `target/criterion/report/index.html` after running benchmarks to view interactive charts.
Example results (Apple M3 Pro, release profile):
| Benchmark | Naive/Uncached | Optimized/Cached | Speedup |
|-----------|---------------|------------------|---------|
| TCP RX scan 256KB (status at end) | 251 µs | 4.5 µs | **~55×** |
| WCurve construction | 126 µs | 1.0 ns | **~123,000×** |
| Command serialize | — | 7.7 ns | — |
| Bandwidth `fetch_add` | — | ~1 ns | — |

739
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -49,6 +49,8 @@ num-traits = "0.2.19"
num-integer = "0.1.46"
sha2 = "0.11.0"
hostname = "0.4.2"
chrono = "0.4"
memchr = "2"
rusqlite = { version = "0.39.0", features = ["bundled"], optional = true }
ldap3 = { version = "0.12.1", optional = true }
axum = { version = "0.8.8", features = ["tokio"], optional = true }
@@ -68,3 +70,25 @@ codegen-units = 1
inherits = "release"
opt-level = "z"
panic = "abort"
# --- Benchmarks ---
[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
pprof = { version = "0.14", features = ["criterion", "flamegraph"] }
[[bench]]
name = "protocol"
harness = false
[[bench]]
name = "bandwidth"
harness = false
[[bench]]
name = "tcp_rx_scan"
harness = false
[[bench]]
name = "ecsrp5"
harness = false

302
PERFORMANCE_AUDIT.md Normal file
View File

@@ -0,0 +1,302 @@
# btest-rs Performance Audit
**Date:** 2026-04-30
**Scope:** Full codebase (`src/`, `tests/`, `Cargo.toml`)
**Methodology:** Static code analysis, hot-path tracing, lock/contention review, algorithmic complexity analysis
---
## Executive Summary
The codebase is generally well-structured for a network I/O tool, with good use of atomics in the per-packet hot path and zero-allocation protocol serialization. However, **three critical bottlenecks** significantly limit throughput and scalability:
1. **O(n) buffer scan on every TCP read** in the client RX loop (up to 256KB scanned per `read()` call)
2. **Expensive EC curve reconstruction on every authentication** (heavy `BigUint` modular arithmetic)
3. **Single SQLite connection mutex** serializing all DB operations in `server_pro`
Additionally, there is **no benchmark or profiling infrastructure** in the project, making it impossible to measure improvements or catch regressions.
---
## Severity Legend
| Icon | Severity | Impact |
|------|----------|--------|
| 🔴 | **Critical** | Direct throughput/latency hit in hot path; fix immediately |
| 🟠 | **High** | Significant overhead under load; fix in next sprint |
| 🟡 | **Medium** | Noticeable at scale or under specific conditions |
| 🟢 | **Low** | Cosmetic / easy wins; batch with other work |
---
## 🔴 Critical Bottlenecks
### 1. O(n) Linear Buffer Scan in `tcp_client_rx_loop` (`src/client.rs:210-216`)
**Problem:** On every TCP `read()` call (up to 256KB), the client performs a byte-by-byte scan looking for interleaved 12-byte status messages:
```rust
for i in 0..=(n - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
// ...
}
}
```
Since data packets are all zeros and status bytes are extremely rare, this **almost always scans the entire 256KB buffer** uselessly. At high bandwidth (many reads/sec), this wastes massive CPU cycles and pollutes cache lines.
**Impact:** CPU-bound slowdown on the client RX side during bidirectional TCP tests. The compiler *may* auto-vectorize the simple loop, but it still processes ~256K bytes per read.
**Fix Options (pick one):**
- **Best:** Use `memchr` (crate or `std::slice::memchr` on nightly) to find `0x07` bytes. On all-zero buffers this exits after a few SIMD-width checks.
- **Alternative:** Since status messages are injected at `write_all` boundaries and data is all zeros, maintain a small 12-byte sliding ring buffer across reads. Process the stream with a tiny state machine instead of scanning the whole buffer.
- **Alternative:** Track read bytes modulo expected packet size. Status messages are injected between full packets, so they will appear at predictable offsets *if* the client knows the server's `effective_size`. This requires protocol coordination.
---
### 2. `WCurve::new()` Recomputes Generator on Every Auth (`src/ecsrp5.rs:363,499`)
**Problem:** Every EC-SRP5 authentication (client and server) calls `WCurve::new()`, which performs `lift_x(9)``prime_mod_sqrt()` — heavy `BigUint` modular arithmetic to derive the curve generator point.
```rust
pub async fn client_authenticate<S: ...>(stream: &mut S, username: &str, password: &str) -> Result<()> {
let w = WCurve::new(); // <-- expensive, same result every time
// ...
}
```
The curve constants (`P`, `CURVE_ORDER`, `WEIERSTRASS_A`) are already cached as `LazyLock` statics, but the generator point is not.
**Impact:** Auth latency spikes, especially on the server under many concurrent connections. Each auth does redundant `BigUint` allocations and modular square roots.
**Fix:** Cache `WCurve` (or at least the generator point) in a global `LazyLock`:
```rust
static WCURVE: std::sync::LazyLock<WCurve> = std::sync::LazyLock::new(WCurve::new);
```
Then use `&*WCURVE` in both `client_authenticate` and `server_authenticate`.
---
### 3. Single SQLite Mutex Serializes All DB Operations (`src/server_pro/user_db.rs:15-18`)
**Problem:** The entire `server_pro` database layer uses a single shared `Connection` behind a `std::sync::Mutex`:
```rust
pub struct UserDb {
conn: Arc<Mutex<Connection>>,
}
```
While SQLite WAL mode is enabled (allowing readers to proceed during writes), **the Rust mutex still serializes all access to the connection object**. Under concurrent load with many tests starting/finishing, this becomes the primary bottleneck.
**Critical sub-issue:** `QuotaManager::remaining_budget()` (`src/server_pro/quota.rs:387`) performs **up to 15 separate SQLite queries** in sequence, locking the mutex 15+ times per pre-test check.
**Impact:** Connection setup/teardown latency increases linearly with concurrency. Quota checks and usage recording block each other.
**Fix Options:**
- **Connection pooling:** Use `r2d2_sqlite` or `deadpool-sqlite` to maintain a small pool of connections (SQLite handles this well in WAL mode).
- **Separate read/write paths:** Open a read-only connection for quota checks (`remaining_budget`) and a dedicated write connection for usage recording. SQLite WAL allows this.
- **Batch quota checks:** Cache quota results for a few seconds per user/IP to avoid redundant queries.
- **Channel-based writer:** Use a single dedicated DB writer task with an `mpsc` channel so only one task ever touches the connection, eliminating lock contention entirely.
---
## 🟠 High Severity Issues
### 4. 100ms Busy-Poll Wait in Multi-Connection TCP (`src/server.rs:313-332`)
**Problem:** When waiting for secondary TCP connections to join a multi-connection session, the primary connection busy-polls the session map every 100ms:
```rust
loop {
let count = { let map = sessions.lock().await; ... };
if count + 1 >= conn_count as usize { break; }
tokio::time::sleep(Duration::from_millis(100)).await;
}
```
This adds up to **100ms of unnecessary latency** to every multi-connection test startup. It also hammers the async mutex needlessly.
**Fix:** Replace with `tokio::sync::Notify`. When a secondary connection registers itself, it calls `notify_one()`. The primary waits on `notified().await` with a timeout, waking instantly when ready.
---
### 5. FreeBSD CPU Sampling Spawns Process Every Second (`src/cpu.rs:142`)
**Problem:** On FreeBSD, `get_cpu_times()` spawns `sysctl -n kern.cp_time` via `std::process::Command` every second:
```rust
fn get_cpu_times() -> (u64, u64) {
if let Ok(output) = std::process::Command::new("sysctl")
.arg("-n").arg("kern.cp_time").output()
{ ... }
}
```
`fork()` + `exec()` is extremely expensive relative to the work being done (reading 5 integers).
**Fix:** Use `libc::sysctl()` via FFI, matching the macOS implementation style. Cache the `mib` array and call the syscall directly.
---
### 6. Per-Call Timer Registration in UDP RX Loops (`src/client.rs:393`, `src/server.rs:925`)
**Problem:** Both UDP RX loops create a new `tokio::time::timeout` timer on **every single `recv`/`recv_from` call**:
```rust
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await
```
At high packet rates (e.g., 100K pps), registering and canceling timers on the Tokio timer wheel adds measurable overhead.
**Fix:** Use `tokio::select!` with a long-lived `tokio::time::sleep` future that is reset, or use the socket's built-in SO_RCVTIMEO if available via `socket2`. Alternatively, since UDP is connectionless, consider whether a 5-second timeout is needed on every call or if the outer test duration timer is sufficient.
---
## 🟡 Medium Severity Issues
### 7. String Error Matching with Allocation (`src/server_pro/enforcer.rs:157-161`)
```rust
match format!("{}", e).as_str() {
s if s.contains("daily") => ...
}
```
`format!("{}", e)` allocates a `String` from the error just to do substring matching. Use `e.to_string().contains(...)` or match on error types directly if possible.
---
### 8. `ip.to_string()` Called Repeatedly in Quota Checks (`src/server_pro/quota.rs:389`)
```rust
let ip_str = ip.to_string();
// ... used in 6+ DB calls
```
This allocates a `String` on every quota check. Accept `&str` or `IpAddr` directly in DB methods, or cache the string.
---
### 9. `chrono_date_today()` Recomputes Calendar from Epoch (`src/server_pro/user_db.rs:617-638`)
A hand-rolled date calculation loops through years from 1970 and months every time it's called (which is before almost every DB write). The `chrono` crate is already used indirectly by `rusqlite`; add it as a direct dependency and replace with `chrono::Local::now().format("%Y-%m-%d")`.
---
## 🟢 Low Severity / Easy Wins
### 10. CSV File Reopened on Every Write (`src/csv_output.rs:77`)
```rust
if let Ok(mut f) = OpenOptions::new().append(true).open(path) {
let _ = writeln!(f, "{}", row);
}
```
Called once per test, not per-packet, but still suboptimal. Consider keeping a lazily-initialized `Mutex<Option<File>>` or using `std::fs::OpenOptions` once at init and storing the handle.
---
### 11. Global Syslog Mutex Held During I/O (`src/syslog_logger.rs`)
```rust
static SYSLOG: Mutex<Option<SyslogSender>> = Mutex::new(None);
```
The global `std::sync::Mutex` is held while formatting the timestamp (expensive manual calendar math) and sending UDP. Switch to `parking_lot::Mutex` (faster) or `tokio::sync::Mutex` if async, and format the message outside the lock. Better yet, use `tracing`'s built-in syslog integration or a structured appender.
---
### 12. `hash_password()` Uses `format!` + `format!` in Hex Loop (`src/server_pro/user_db.rs:612-614`)
```rust
hasher.update(format!("{}:{}", username, password).as_bytes());
result.iter().map(|b| format!("{:02x}", b)).collect() // N allocs for N bytes
```
The hex encoding allocates one `String` per byte. Use a small fixed buffer or `hex` crate (already used elsewhere in `ecsrp5.rs`).
---
### 13. Redundant `Instant::now()` Calls in TX Loop (`src/server.rs:593,606`)
```rust
if send_status && Instant::now() >= next_status {
// ...
next_status = Instant::now() + Duration::from_secs(1);
}
```
Two monotonic clock reads per loop iteration. Cache `let now = Instant::now();` at the top of the loop.
---
## Architecture Observations
### What the Code Does Well
- **Zero-allocation protocol layer:** `serialize()` returns fixed-size stack arrays (`[u8; 12]`, `[u8; 16]`). Excellent.
- **Atomic bandwidth tracking:** `BandwidthState` uses `AtomicU64` with `Relaxed` ordering in the per-packet path. No locks in the data plane.
- **Buffer reuse:** TX/RX loops allocate `vec![0u8; ...]` once before the loop. Good.
- **Aggressive release profile:** `lto = true`, `codegen-units = 1`, `opt-level = 3`.
### Async Runtime Usage
- `tokio` with `full` features is used. For a primarily I/O-bound tool, this is appropriate.
- `tokio::task::yield_now().await` is used in unlimited-rate mode to prevent starving the runtime. This is correct but consider whether `tokio::task::spawn_blocking` or dedicated CPU pinning is needed for the EC-SRP5 math (which is CPU-bound and currently runs on the async runtime during auth).
### Memory Safety
- Several `unwrap()`/`expect()` calls in setup paths (socket binding, address parsing). These are acceptable for config errors but should use `?` propagation where possible to allow graceful degradation.
---
## Missing Performance Infrastructure
| Infrastructure | Status | Recommendation |
|----------------|--------|----------------|
| **Benchmarks** | ❌ None | Add `criterion` + `benches/` for `BandwidthState`, protocol ser/de, and EC-SRP5 auth |
| **Profiling hooks** | ❌ None | Add optional `pprof` or `dhat` dev-deps for heap profiling |
| **Throughput regression tests** | ⚠️ Partial | Integration tests assert `tx > 0` and `rx > 0` but don't measure sustained throughput |
| **Load tests** | ❌ None | Add a `benches/load_test.rs` that spawns 100+ concurrent tests against a local server |
| **CI performance gates** | ❌ None | Consider a benchmark action that fails on >5% regression |
---
## Priority Action Plan
### Phase 1: Hot-Path Fixes (1-2 days)
1. Replace buffer scan with `memchr` or ring-buffer approach in `tcp_client_rx_loop`
2. Cache `WCurve` in a global `LazyLock`
3. Replace 100ms poll with `tokio::sync::Notify` in multi-conn wait
### Phase 2: Scalability (2-3 days)
4. Add SQLite connection pooling or channel-based writer in `server_pro`
5. Cache `remaining_budget()` results for 5-10 seconds
6. Fix FreeBSD CPU sampling to use `libc::sysctl` FFI
### Phase 3: Polish & Tooling (1-2 days)
7. Replace manual date arithmetic with `chrono`
8. Add `criterion` benchmarks for auth and bandwidth state
9. Fix low-severity allocation issues (CSV, syslog, hex encoding)
---
## Appendix: File-by-File Quick Reference
| File | Lines | Hot Path? | Key Concern |
|------|-------|-----------|-------------|
| `src/client.rs` | 531 | ✅ Yes | O(n) 256KB scan per TCP read |
| `src/server.rs` | 1094 | ✅ Yes | 100ms poll wait, status injection timing |
| `src/ecsrp5.rs` | 660 | ✅ Yes (auth) | `WCurve::new()` recomputed per auth |
| `src/bandwidth.rs` | 263 | ✅ Yes (atomics) | Well-designed; no issues |
| `src/protocol.rs` | 214 | ✅ Yes (ser/de) | Zero-allocation; excellent |
| `src/cpu.rs` | 215 | ⚠️ Periodic | FreeBSD `fork+exec` every second |
| `src/server_pro/quota.rs` | 470 | ⚠️ Periodic | 15 DB queries per budget check |
| `src/server_pro/user_db.rs` | 641 | ⚠️ All DB ops | Single mutex serializes everything |
| `src/server_pro/server_loop.rs` | 449 | ✅ Yes | DB auth locks during connection setup |
| `src/server_pro/enforcer.rs` | 411 | ⚠️ Periodic | String error matching allocates |
| `src/csv_output.rs` | 86 | ❌ No | File reopen per write |
| `src/syslog_logger.rs` | 154 | ❌ No | Global mutex + manual calendar math |
| `src/auth.rs` | 164 | ⚠️ Auth only | Minor; double MD5 per auth |
| `src/main.rs` | 243 | ❌ No | Entry point only |

634
PERFORMANCE_PRDS.md Normal file
View File

@@ -0,0 +1,634 @@
# Performance Improvement PRDs
**Project:** btest-rs
**Constraint:** 100% MikroTik BTest protocol compatibility — no wire-format or behavioral changes visible to MikroTik devices
**Date:** 2026-04-30
---
## How to Read This Document
Each PRD is sorted by **recommended execution order**, which balances:
- **Effort** (development + review + test time)
- **Risk** (probability of regression or compatibility break)
- **Performance Effect** (measured or estimated throughput/latency improvement)
- **MikroTik Compatibility Risk** (whether the change could affect interoperability)
**Sorting rationale:** Execute *quick wins* first to build velocity and reduce risk surface, then tackle *high-impact* items with full attention.
---
## Summary Matrix
| # | PRD | Effort | Risk | Perf Impact | MikroTik Risk | Tier |
|---|-----|--------|------|-------------|---------------|------|
| 1 | WCurve Global Cache | 30 min | None | Medium | None | Quick Win |
| 2 | Redundant `Instant::now()` | 15 min | None | Low | None | Quick Win |
| 3 | `hash_password` Hex Fix | 30 min | None | Low | None | Quick Win |
| 4 | CSV File Handle Cache | 30 min | None | Low | None | Quick Win |
| 5 | Error String Matching | 30 min | None | Low | None | Quick Win |
| 6 | `chrono_date_today` Replace | 1 hr | Low | Low | None | Quick Win |
| 7 | Syslog Mutex + Timestamp | 1 hr | Low | Low | None | Quick Win |
| 8 | `ip.to_string()` Cache | 1 hr | Low | Low | None | Quick Win |
| 9 | FreeBSD CPU FFI | 3 hrs | Medium | Medium | None | Platform Fix |
| 10 | Multi-Conn Notify Wake | 2 hrs | Medium | Medium | None | Latency Fix |
| 11 | UDP Timer Reuse | 2 hrs | Medium | Medium | None | Throughput Fix |
| 12 | TCP RX Scan Optimization | 4 hrs | Medium | **High** | Low | Hot Path Fix |
| 13 | SQLite Connection Pool | 12 days | High | **High** | None | Scalability Fix |
---
## Tier 1: Quick Wins (Do These First)
---
### PRD-001: Cache `WCurve` in Global `LazyLock`
**Background:**
`WCurve::new()` is called on every EC-SRP5 authentication (client and server). It recomputes the Weierstrass curve generator point via `lift_x(9)``prime_mod_sqrt()`, which performs heavy `BigUint` modular arithmetic. The result is deterministic and immutable.
**MikroTik Compatibility:**
- **100% safe.** This is pure internal mathematics. The wire bytes, auth handshake order, and hash outputs are identical. No protocol-visible change.
**Objective:**
Eliminate redundant `BigUint` modular square root computation per authentication.
**Design:**
```rust
// src/ecsrp5.rs
static WCURVE: std::sync::LazyLock<WCurve> = std::sync::LazyLock::new(WCurve::new);
```
Replace all call sites:
- `src/ecsrp5.rs:363` (`client_authenticate`)
- `src/ecsrp5.rs:499` (`server_authenticate`)
Change `let w = WCurve::new();` to `let w = &*WCURVE;`. Update any `WCurve` methods that take `self` to take `&self` if they don't already.
**Acceptance Criteria:**
- [ ] `ecsrp5_test.rs` passes unchanged.
- [ ] `full_integration_test.rs` EC-SRP5 tests pass unchanged.
- [ ] `WCurve::new()` is called exactly once per process lifetime.
- [ ] No change to serialized auth bytes on the wire.
**Effort:** 30 min
**Risk:** None — stateless deterministic cache
**Performance Impact:** Medium — reduces per-auth CPU time by ~30-50% (estimated), especially noticeable under concurrent logins.
---
### PRD-002: Deduplicate `Instant::now()` in `tcp_tx_loop_inner`
**Background:**
The TCP TX loop calls `Instant::now()` twice per iteration (status check and interval scheduling). Monotonic clock reads are cheap but not free, and occur in the hottest loop in the system.
**MikroTik Compatibility:**
- **100% safe.** Timing granularity remains identical.
**Objective:**
Reduce syscalls in the per-packet hot path.
**Design:**
```rust
let now = Instant::now();
if send_status && now >= next_status { ... next_status = now + Duration::from_secs(1); }
// ... reuse `now` for interval math
```
**Acceptance Criteria:**
- [ ] TCP send/receive/both integration tests pass.
- [ ] No behavioral change in status injection timing.
**Effort:** 15 min
**Risk:** None
**Performance Impact:** Low — micro-optimization, but trivial.
---
### PRD-003: Fix `hash_password()` Hex Encoding Allocations
**Background:**
`user_db.rs:614` allocates one `String` per byte when hex-encoding a 32-byte SHA256 hash:
```rust
result.iter().map(|b| format!("{:02x}", b)).collect()
```
**MikroTik Compatibility:**
- **100% safe.** Output string is identical.
**Objective:**
Replace N-allocation hex encoding with a single-allocation approach.
**Design:**
Use `hex` crate (already in dependency tree via `ecsrp5.rs` debug logging) or a small `[u8; 64]` buffer with `write!` to a `String::with_capacity(64)`.
**Acceptance Criteria:**
- [ ] Same hex string output for all inputs.
- [ ] `pro` feature tests pass.
**Effort:** 30 min
**Risk:** None
**Performance Impact:** Low — removes 32 allocations per password hash.
---
### PRD-004: Cache CSV File Handle
**Background:**
`csv_output::write_result()` re-opens the file via `OpenOptions::new().append(true).open(path)` on every call (once per test). Safe but wasteful.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement.
**Objective:**
Hold the file handle open for the process lifetime.
**Design:**
Change `static CSV_FILE: Mutex<Option<String>>` to `Mutex<Option<(String, std::fs::File)>>`, or open once during `init()` and store `Mutex<Option<File>>`.
**Acceptance Criteria:**
- [ ] CSV tests in `full_integration_test.rs` pass.
- [ ] File is created with headers on `init()`.
- [ ] Multiple `write_result` calls append correctly.
**Effort:** 30 min
**Risk:** None
**Performance Impact:** Low — removes one `open()` syscall per test.
---
### PRD-005: Remove Allocating Error String Matching
**Background:**
`src/server_pro/enforcer.rs:157-161` does:
```rust
match format!("{}", e).as_str() {
s if s.contains("daily") => ...
}
```
This allocates a `String` from the error just for substring matching.
**MikroTik Compatibility:**
- **100% safe.** Server-pro internal logic only.
**Objective:**
Match without allocation.
**Design:**
Use `e.to_string().contains("daily")` (still allocates but clearer) or, better, downcast the `rusqlite::Error` or match on structured error variants. If the error is `anyhow::Error`, use `.downcast_ref::<rusqlite::Error>()`.
**Acceptance Criteria:**
- [ ] Quota enforcement behavior unchanged.
- [ ] Enforcer tests pass.
**Effort:** 30 min
**Risk:** None
**Performance Impact:** Low — removes one allocation per enforcer tick.
---
### PRD-006: Replace `chrono_date_today()` with `chrono` Crate
**Background:**
`user_db.rs:617-638` contains a hand-rolled Gregorian calendar converter that loops from 1970 to compute today's date. Called before almost every DB write. The `chrono` crate is already pulled in transitively by `rusqlite`.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement.
**Objective:**
Replace 30 lines of error-prone manual date math with one `chrono` call.
**Design:**
Add `chrono = { version = "0.4", optional = true }` gated behind `pro` feature (or use the transitive dep directly). Replace `chrono_date_today()` with:
```rust
chrono::Local::now().format("%Y-%m-%d").to_string()
```
**Acceptance Criteria:**
- [ ] `pro` feature compiles.
- [ ] Date strings match format `YYYY-MM-DD`.
- [ ] DB write tests pass.
**Effort:** 1 hr
**Risk:** Low — adds explicit dep that already exists transitively
**Performance Impact:** Low — eliminates loop overhead, but called infrequently.
---
### PRD-007: Optimize Syslog Mutex and Timestamp Formatting
**Background:**
`syslog_logger.rs` holds a global `std::sync::Mutex` while formatting a timestamp (manual calendar math) and sending UDP. `std::sync::Mutex` is relatively slow, and the timestamp logic duplicates `chrono_date_today()` issues.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement.
**Objective:**
Reduce lock contention and allocation in logging path.
**Design:**
1. Use `parking_lot::Mutex` (faster, no poisoning) OR switch to `std::sync::Mutex` but clone the `SyslogSender` config outside the lock.
2. Replace `bsd_timestamp()` with `chrono::Local::now().format("%b %e %H:%M:%S")`.
3. Pre-allocate the `String` with `with_capacity(256)`.
**Acceptance Criteria:**
- [ ] Syslog output format remains RFC 3164 compliant.
- [ ] `test_syslog_events` in `full_integration_test.rs` passes.
**Effort:** 1 hr
**Risk:** Low
**Performance Impact:** Low — logging is not a hot path, but reduces global lock hold time.
---
### PRD-008: Cache `ip.to_string()` in Quota Checks
**Background:**
`quota.rs:389` calls `ip.to_string()` and then passes `&ip_str` to multiple DB methods, allocating a new `String` on every `remaining_budget()` call.
**MikroTik Compatibility:**
- **100% safe.** Server-pro internal logic.
**Objective:**
Eliminate redundant IP stringification.
**Design:**
Change DB methods to accept `&std::net::IpAddr` directly and stringify inside only when needed for SQL parameter binding (which `rusqlite` may already handle via `ToSql`). Alternatively, pass `ip_str: &str` from a single `to_string()` call and avoid re-stringifying in sub-calls.
**Acceptance Criteria:**
- [ ] Quota checks return identical results.
- [ ] `pro` feature tests pass.
**Effort:** 1 hr
**Risk:** Low
**Performance Impact:** Low — one allocation removed per quota check.
---
## Tier 2: Moderate Fixes (Platform & Latency)
---
### PRD-009: FreeBSD CPU Sampling via `libc::sysctl` FFI
**Background:**
On FreeBSD, `cpu.rs` spawns `sysctl -n kern.cp_time` as a child process every second. `fork()` + `exec()` is orders of magnitude slower than a direct syscall.
**MikroTik Compatibility:**
- **100% safe.** No protocol involvement. Platform-specific internal code.
**Objective:**
Replace subprocess with direct `sysctl(3)` syscall.
**Design:**
```rust
#[cfg(target_os = "freebsd")]
fn get_cpu_times() -> (u64, u64) {
let mut mib = [libc::CTL_KERN, libc::KERN_CP_TIME];
let mut cp_time: [libc::c_ulong; 5] = [0; 5];
let mut len = std::mem::size_of_val(&cp_time);
unsafe {
if libc::sysctl(
mib.as_mut_ptr(),
mib.len() as u32,
&mut cp_time as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
) == 0 {
let total = cp_time[0] + cp_time[1] + cp_time[2] + cp_time[3] + cp_time[4];
return (total as u64, cp_time[4] as u64);
}
}
(0, 0)
}
```
**Acceptance Criteria:**
- [ ] Compiles on FreeBSD.
- [ ] Returns same values as previous `sysctl` command approach.
- [ ] No child process spawned (verify with `ktrace` or `ps`).
**Effort:** 3 hrs
**Risk:** Medium — requires FreeBSD test environment; FFI is unsafe
**Performance Impact:** Medium — eliminates 1 fork/exec per second on FreeBSD.
---
### PRD-010: Replace 100ms Poll with `tokio::sync::Notify`
**Background:**
In `server.rs:313-332`, the primary connection of a multi-connection TCP test busy-polls the session map every 100ms waiting for secondary connections to join.
**MikroTik Compatibility:**
- **100% safe.** This is internal server-side coordination. The wire behavior (waiting for connections, then starting the test) is unchanged. MikroTik clients will not observe a difference except potentially faster test startup.
**Objective:**
Eliminate polling latency and unnecessary mutex acquisitions.
**Design:**
1. Add a `tokio::sync::Notify` to `TcpSession`:
```rust
struct TcpSession {
peer_ip: IpAddr,
streams: Vec<OwnedTcpStream>,
expected: u8,
notify: tokio::sync::Notify,
}
```
2. In the secondary connection handler, after pushing to `streams`, call `session.notify.notify_one()`.
3. In the primary wait loop, replace the sleep loop with:
```rust
let count = { /* lock, get count, drop lock */ };
if count + 1 >= conn_count { break; }
// Wait for notification or 10s deadline
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = session.notify.notified() => {
let count = { /* lock, get count */ };
if count + 1 >= conn_count { break; }
}
_ = &mut timeout => { break; }
}
}
```
**Acceptance Criteria:**
- [ ] Multi-connection TCP tests pass.
- [ ] Test startup latency is ≤ 1ms after last connection joins (was up to 100ms).
- [ ] No deadlock under concurrent multi-connection tests.
**Effort:** 2 hrs
**Risk:** Medium — concurrency change; must carefully manage lock/notify ordering to avoid races
**Performance Impact:** Medium — improves multi-conn test startup latency by up to 100ms per test.
---
### PRD-011: Reuse UDP RX Timer Instead of Per-Call Timeout
**Background:**
Both client and server UDP RX loops create a new `tokio::time::timeout` on every `recv`/`recv_from` call:
```rust
tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await
```
At high packet rates, this registers and cancels timers on Tokio's timer wheel constantly.
**MikroTik Compatibility:**
- **100% safe.** Internal async timing only. UDP packet processing is unchanged.
**Objective:**
Reduce timer wheel churn in high-rate UDP RX loops.
**Design:**
Option A — `tokio::select!` with a pinned sleep future:
```rust
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
loop {
tokio::select! {
biased; // prioritize recv
res = socket.recv(&mut buf) => { /* handle */ timeout.as_mut().reset(Instant::now() + Duration::from_secs(5)); }
_ = &mut timeout => { tracing::debug!("UDP RX timeout"); }
}
}
```
Option B — Use `socket2` to set `SO_RCVTIMEO` on the underlying socket, then use blocking/async recv without Tokio timeouts. This moves timeout handling into the kernel, which is even cheaper.
**Recommendation:** Start with Option A (pure Tokio, no platform risk). Option B can be a follow-up.
**Acceptance Criteria:**
- [ ] UDP send/receive/both tests pass.
- [ ] UDP RX still times out correctly when no packets arrive.
- [ ] No change to packet parsing or sequence tracking.
**Effort:** 2 hrs
**Risk:** Medium — changes timeout behavior; must ensure test abortion still works correctly
**Performance Impact:** Medium — reduces timer wheel registration overhead, noticeable at >50K pps.
---
## Tier 3: High Impact (Do These With Full Focus)
---
### PRD-012: Optimize TCP Client RX Status Message Scan
**Background:**
`tcp_client_rx_loop` (`client.rs:210-216`) scans up to 256KB byte-by-byte on every `read()` call looking for a 12-byte status marker (`0x07` + `0x80|cpu`). Since data is all zeros, this is almost always a full scan.
**MikroTik Compatibility Consideration:**
- **High confidence of safety.** The protocol is: MikroTik injects 12-byte status messages into the TCP stream. Our client must detect them. Changing *how* we detect them (faster scan) does not change:
- What bytes are sent on the wire
- What bytes we expect
- How we respond to status messages
- **One edge case to handle:** TCP is a stream. A status message may be split across two `read()` calls. The current code does **not** handle this correctly (it scans each buffer independently). The optimized version *should* handle split messages to be strictly more correct than the current implementation.
**Objective:**
Replace O(n) byte-by-byte scan with SIMD-accelerated or state-machine-based detection, while correctly handling split messages.
**Design — Recommended: Ring Buffer Approach**
Since status messages are 12 bytes and all other bytes are zeros, maintain a 12-byte ring buffer across reads:
```rust
const STATUS_MSG_SIZE: usize = 12;
async fn tcp_client_rx_loop(mut reader: OwnedReadHalf, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 256 * 1024];
let mut carry = [0u8; STATUS_MSG_SIZE - 1]; // up to 11 bytes from previous read
let mut carry_len = 0usize;
while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
// Check if a status message spans the carry + start of buf
if carry_len > 0 {
let needed = STATUS_MSG_SIZE - carry_len;
if n >= needed {
let mut candidate = [0u8; STATUS_MSG_SIZE];
candidate[..carry_len].copy_from_slice(&carry[..carry_len]);
candidate[carry_len..].copy_from_slice(&buf[..needed]);
if candidate[0] == STATUS_MSG_TYPE && candidate[1] >= 0x80 {
state.remote_cpu.store(candidate[1] & 0x7F, Ordering::Relaxed);
}
}
}
// Scan within buf for status messages
// Since data is zeros, use memchr to find 0x07 candidates
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
let mut offset = 0;
while let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[offset..search_end]) {
let i = offset + pos;
if buf[i + 1] >= 0x80 {
state.remote_cpu.store(buf[i + 1] & 0x7F, Ordering::Relaxed);
break;
}
offset = i + 1;
if offset >= search_end { break; }
}
}
// Save trailing bytes for next read
carry_len = (n).min(STATUS_MSG_SIZE - 1);
if n >= carry_len {
carry[..carry_len].copy_from_slice(&buf[n - carry_len..n]);
}
}
}
}
}
```
**Alternative: `memchr` crate only**
If we determine split messages are extremely rare and the current behavior is "good enough," simply replace the `for` loop with:
```rust
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..n - STATUS_MSG_SIZE + 1]) {
if buf[pos + 1] >= 0x80 { /* ... */ }
}
```
This is a 5-line change with massive speedup (SIMD scan). However, the ring buffer approach is strictly more correct and not much more complex.
**Acceptance Criteria:**
- [ ] TCP bidirectional tests pass.
- [ ] Remote CPU reporting still works.
- [ ] Status messages split across reads are correctly detected (unit test for this).
- [ ] `memchr` crate added to deps (very lightweight).
- [ ] No change to wire bytes or server behavior.
**Effort:** 4 hrs
**Risk:** Medium — hot path change; must be carefully reviewed and tested
**Performance Impact:** **High** — eliminates 256KB byte scan per read. At 10K reads/sec, saves ~2.5GB of memory scanning per second.
---
### PRD-013: SQLite Connection Pool / Channel-Based Writer
**Background:**
`server_pro` uses a single `Arc<Mutex<Connection>>`. All quota checks, usage recordings, and auth lookups serialize through one lock. `remaining_budget()` issues 15 queries, locking 15+ times. This is the primary scalability bottleneck for the pro server.
**MikroTik Compatibility:**
- **100% safe.** Server-side infrastructure only. No protocol change.
**Objective:**
Enable concurrent quota checks and usage recording without mutex contention.
**Design — Option A: Connection Pool (Recommended for reads)**
Use `r2d2_sqlite` or `deadpool-sqlite`:
1. Open a pool of ~4-8 connections to the same SQLite file (WAL mode supports this).
2. Read-only operations (`remaining_budget`, `get_user`, `check_user`) borrow a connection from the pool.
3. Write operations (`record_usage`, `record_session`) also borrow from the pool (WAL allows concurrent readers + one writer).
**Design — Option B: Channel-Based Writer (Recommended for writes)**
1. Keep one dedicated `Connection` owned by a single Tokio task.
2. Expose an `mpsc::channel` where other tasks send write requests (`RecordUsage { user, tx, rx }`).
3. The writer task batches or sequentially executes writes without any mutex.
4. Reads use a separate read-only connection or pool.
**Hybrid Recommendation:**
- **Reads:** Small connection pool (4 connections) for quota checks and auth lookups.
- **Writes:** Single dedicated async task with an `mpsc::unbounded_channel` for usage recording.
- **Cache:** Add a 5-second TTL cache for `remaining_budget()` results per user+IP to avoid redundant DB hits during test setup.
**Acceptance Criteria:**
- [ ] `pro` feature compiles and all tests pass.
- [ ] Concurrent test launches scale linearly up to at least 50 concurrent sessions.
- [ ] Quota enforcement remains correct (no over-quota usage).
- [ ] Session logging and interval recording remain accurate.
- [ ] No SQLite "database is locked" errors under load.
**Effort:** 12 days
**Risk:** High — touches every DB interaction in `server_pro`; potential for data races, quota leaks, or connection exhaustion
**Performance Impact:** **High** — enables horizontal scaling of concurrent tests; removes the primary pro server bottleneck.
---
## Execution Roadmap
### Sprint 1: Quick Wins + Foundation (1 day)
- [ ] PRD-001: WCurve cache
- [ ] PRD-002: `Instant::now()` dedup
- [ ] PRD-003: `hash_password` hex fix
- [ ] PRD-004: CSV file handle cache
- [ ] PRD-005: Error string matching
- [ ] PRD-006: `chrono` date replacement
- [ ] PRD-007: Syslog optimization
- [ ] PRD-008: `ip.to_string()` cache
**Deliverable:** Low-risk PR with 8 clean commits. Run full integration tests.
### Sprint 2: Platform & Async Fixes (1 day)
- [ ] PRD-009: FreeBSD CPU FFI
- [ ] PRD-010: Multi-conn Notify wake
- [ ] PRD-011: UDP timer reuse
**Deliverable:** PR with platform + latency improvements.
### Sprint 3: Hot Path Optimization (12 days)
- [ ] PRD-012: TCP RX scan optimization
- [ ] Add unit test for split status messages
- [ ] Benchmark before/after with `criterion` (or manual throughput test)
**Deliverable:** PR with benchmark numbers proving improvement.
### Sprint 4: Scalability (23 days)
- [ ] PRD-013: SQLite connection pool / channel writer
- [ ] Load test: 50 concurrent tests, verify no DB lock contention
- [ ] Add `remaining_budget` cache
**Deliverable:** PR with load test results.
---
## Testing Requirements for All PRDs
Since **no wire protocol changes** are made, the existing integration test suite is the primary validation tool. However, for PRD-012 and PRD-013, additional tests are required:
### New Tests to Add
1. **Split Status Message Unit Test (for PRD-012)**
```rust
#[test]
fn test_status_message_split_across_reads() {
// Feed first 5 bytes, then remaining 7 bytes
// Assert CPU value is extracted correctly
}
```
2. **Concurrent Quota Load Test (for PRD-013)**
```rust
#[tokio::test]
async fn test_concurrent_quota_checks() {
// Spawn 50 tasks doing remaining_budget() + record_usage()
// Assert no panics, no SQLite locked errors
}
```
3. **FreeBSD CPU Parity Test (for PRD-009)**
Manual verification on FreeBSD that FFI `sysctl` returns same values as command.
---
## Appendix: MikroTik Compatibility Checklist
For every PRD, verify:
- [ ] No change to `Command` or `StatusMessage` struct layouts or serialization
- [ ] No change to MD5 challenge-response handshake order
- [ ] No change to EC-SRP5 handshake order or byte values
- [ ] No change to TCP packet sizes or UDP payload format
- [ ] No change to status injection timing (1-second interval)
- [ ] No change to NAT probe behavior
- [ ] Client can still authenticate against stock RouterOS `btest` server
- [ ] Server can still accept connections from stock RouterOS `btest` client
All PRDs in this document satisfy the above checklist by construction.

79
benches/bandwidth.rs Normal file
View File

@@ -0,0 +1,79 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use btest_rs::bandwidth::{BandwidthState, calc_send_interval, advance_next_send};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
fn bench_atomic_fetch_add(c: &mut Criterion) {
let state = BandwidthState::new();
c.bench_function("bandwidth_rx_bytes_fetch_add", |b| {
b.iter(|| {
black_box(state.rx_bytes.fetch_add(1500, Ordering::Relaxed));
})
});
c.bench_function("bandwidth_tx_bytes_fetch_add", |b| {
b.iter(|| {
black_box(state.tx_bytes.fetch_add(32768, Ordering::Relaxed));
})
});
}
fn bench_spend_budget(c: &mut Criterion) {
// Unlimited budget (fast path)
let unlimited = BandwidthState::new();
c.bench_function("spend_budget_unlimited", |b| {
b.iter(|| black_box(unlimited.spend_budget(black_box(1500))))
});
// Limited budget
let limited = BandwidthState::new();
limited.byte_budget.store(1_000_000_000, Ordering::SeqCst);
c.bench_function("spend_budget_limited", |b| {
b.iter(|| black_box(limited.spend_budget(black_box(1500))))
});
}
fn bench_calc_send_interval(c: &mut Criterion) {
c.bench_function("calc_interval_100mbps_1500b", |b| {
b.iter(|| black_box(calc_send_interval(black_box(100_000_000), black_box(1500))))
});
c.bench_function("calc_interval_1gbps_32768b", |b| {
b.iter(|| black_box(calc_send_interval(black_box(1_000_000_000), black_box(32768))))
});
c.bench_function("calc_interval_unlimited", |b| {
b.iter(|| black_box(calc_send_interval(black_box(0), black_box(1500))))
});
}
fn bench_advance_next_send(c: &mut Criterion) {
let iv = Duration::from_micros(120);
let now = Instant::now();
let mut next = now;
c.bench_function("advance_next_send", |b| {
b.iter(|| {
let r = advance_next_send(&mut next, iv, now);
black_box(r);
});
next = now;
});
}
fn bench_summary(c: &mut Criterion) {
let state = BandwidthState::new();
// Pre-populate some values so loads are real
state.total_tx_bytes.store(1_000_000_000, Ordering::Relaxed);
state.total_rx_bytes.store(2_000_000_000, Ordering::Relaxed);
state.intervals.store(100, Ordering::Relaxed);
c.bench_function("bandwidth_summary", |b| {
b.iter(|| black_box(state.summary()))
});
}
criterion_group!(
bandwidth_benches,
bench_atomic_fetch_add,
bench_spend_budget,
bench_calc_send_interval,
bench_advance_next_send,
bench_summary
);
criterion_main!(bandwidth_benches);

19
benches/ecsrp5.rs Normal file
View File

@@ -0,0 +1,19 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use btest_rs::ecsrp5::{WCurve, WCURVE};
fn bench_wcurve_new(c: &mut Criterion) {
c.bench_function("wcurve_new_uncached", |b| {
b.iter(|| black_box(WCurve::new()))
});
}
fn bench_wcurve_cached(c: &mut Criterion) {
// Force initialization before benchmarking
let _ = &*WCURVE;
c.bench_function("wcurve_cached_access", |b| {
b.iter(|| black_box(&*WCURVE))
});
}
criterion_group!(ecsrp5_benches, bench_wcurve_new, bench_wcurve_cached);
criterion_main!(ecsrp5_benches);

65
benches/protocol.rs Normal file
View File

@@ -0,0 +1,65 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use btest_rs::protocol::{Command, StatusMessage, CMD_PROTO_TCP, CMD_DIR_BOTH};
fn bench_command_serialize(c: &mut Criterion) {
let cmd = Command::new(CMD_PROTO_TCP, CMD_DIR_BOTH);
c.bench_function("command_serialize", |b| {
b.iter(|| black_box(cmd.serialize()))
});
}
fn bench_command_deserialize(c: &mut Criterion) {
let bytes = [0x01, 0x03, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
c.bench_function("command_deserialize", |b| {
b.iter(|| black_box(Command::deserialize(black_box(&bytes))))
});
}
fn bench_status_message_serialize(c: &mut Criterion) {
let msg = StatusMessage {
seq: 42,
bytes_received: 1_000_000,
cpu_load: 50,
};
c.bench_function("status_message_serialize", |b| {
b.iter(|| black_box(msg.serialize()))
});
}
fn bench_status_message_deserialize(c: &mut Criterion) {
let bytes = [0x07, 0xB2, 0x00, 0x00, 0x2A, 0x00, 0x00, 0x00, 0x40, 0x42, 0x0F, 0x00];
c.bench_function("status_message_deserialize", |b| {
b.iter(|| black_box(StatusMessage::deserialize(black_box(&bytes))))
});
}
fn bench_roundtrip(c: &mut Criterion) {
let cmd = Command::new(CMD_PROTO_TCP, CMD_DIR_BOTH);
let msg = StatusMessage {
seq: 99,
bytes_received: 50_000,
cpu_load: 75,
};
c.bench_function("command_roundtrip", |b| {
b.iter(|| {
let s = black_box(cmd.serialize());
black_box(Command::deserialize(&s))
})
});
c.bench_function("status_message_roundtrip", |b| {
b.iter(|| {
let s = black_box(msg.serialize());
black_box(StatusMessage::deserialize(&s))
})
});
}
criterion_group!(
protocol_benches,
bench_command_serialize,
bench_command_deserialize,
bench_status_message_serialize,
bench_status_message_deserialize,
bench_roundtrip
);
criterion_main!(protocol_benches);

100
benches/tcp_rx_scan.rs Normal file
View File

@@ -0,0 +1,100 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use btest_rs::client::scan_status_message;
use btest_rs::protocol::STATUS_MSG_TYPE;
/// Naive O(n) byte-by-byte scan — the old implementation.
fn naive_scan(buf: &[u8]) -> Option<u8> {
const STATUS_MSG_SIZE: usize = 12;
if buf.len() < STATUS_MSG_SIZE {
return None;
}
for i in 0..=(buf.len() - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
return Some((buf[i + 1] & 0x7F).min(100));
}
}
None
}
fn make_buffer(size: usize, status_at: Option<usize>) -> Vec<u8> {
let mut buf = vec![0u8; size];
if let Some(pos) = status_at {
buf[pos] = STATUS_MSG_TYPE;
buf[pos + 1] = 0x80 | 50; // CPU = 50%
}
buf
}
fn bench_scan_all_zeros(c: &mut Criterion) {
let mut group = c.benchmark_group("tcp_rx_scan_all_zeros");
for size in [4096, 65536, 262144] {
let buf = make_buffer(size, None);
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::new("naive", size), &buf, |b, buf| {
b.iter(|| black_box(naive_scan(black_box(buf))))
});
group.bench_with_input(BenchmarkId::new("memchr", size), &buf, |b, buf| {
b.iter(|| black_box(scan_status_message(black_box(&[]), black_box(buf))))
});
}
group.finish();
}
fn bench_scan_status_at_start(c: &mut Criterion) {
let mut group = c.benchmark_group("tcp_rx_scan_status_at_start");
for size in [4096, 65536, 262144] {
let buf = make_buffer(size, Some(0));
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::new("naive", size), &buf, |b, buf| {
b.iter(|| black_box(naive_scan(black_box(buf))))
});
group.bench_with_input(BenchmarkId::new("memchr", size), &buf, |b, buf| {
b.iter(|| black_box(scan_status_message(black_box(&[]), black_box(buf))))
});
}
group.finish();
}
fn bench_scan_status_at_end(c: &mut Criterion) {
let mut group = c.benchmark_group("tcp_rx_scan_status_at_end");
for size in [4096, 65536, 262144] {
let buf = make_buffer(size, Some(size - 12));
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::new("naive", size), &buf, |b, buf| {
b.iter(|| black_box(naive_scan(black_box(buf))))
});
group.bench_with_input(BenchmarkId::new("memchr", size), &buf, |b, buf| {
b.iter(|| black_box(scan_status_message(black_box(&[]), black_box(buf))))
});
}
group.finish();
}
fn bench_scan_split_message(c: &mut Criterion) {
// Simulate a status message split across two reads:
// carry has first 5 bytes, buf has remaining 7 bytes
let mut carry = vec![0u8; 5];
carry[0] = STATUS_MSG_TYPE;
carry[1] = 0x80 | 75;
let buf = vec![0u8; 7];
c.bench_function("scan_split_5_7", |b| {
b.iter(|| black_box(scan_status_message(black_box(&carry), black_box(&buf))))
});
// Split with 2 bytes in carry (status type + cpu byte), 10 in buf
let carry_2 = vec![STATUS_MSG_TYPE, 0x80 | 33];
let buf_10 = vec![0u8; 10];
c.bench_function("scan_split_2_10", |b| {
b.iter(|| black_box(scan_status_message(black_box(&carry_2), black_box(&buf_10))))
});
}
criterion_group!(
tcp_rx_scan_benches,
bench_scan_all_zeros,
bench_scan_status_at_start,
bench_scan_status_at_end,
bench_scan_split_message
);
criterion_main!(tcp_rx_scan_benches);

View File

@@ -197,24 +197,57 @@ async fn tcp_client_rx_loop(
state: Arc<BandwidthState>,
) {
let mut buf = vec![0u8; 256 * 1024];
// Carry trailing bytes from the previous read to detect status messages
// that are split across TCP read boundaries.
let mut carry = [0u8; STATUS_MSG_SIZE - 1];
let mut carry_len = 0usize;
while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
// Scan for interleaved 12-byte status messages from the server.
// In BOTH mode, the server's TX loop injects status messages into the
// data stream. Status starts with 0x07 (STATUS_MSG_TYPE) and byte 1
// has the high bit set (0x80 | cpu%). Data packets are all zeros.
if n >= STATUS_MSG_SIZE {
for i in 0..=(n - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
let cpu = buf[i + 1] & 0x7F;
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
// 1) Check if a status message spans the carry + start of buf.
if carry_len > 0 {
for offset in 0..carry_len {
if carry[offset] != STATUS_MSG_TYPE {
continue;
}
let from_carry = carry_len - offset;
let from_buf = STATUS_MSG_SIZE - from_carry;
if n < from_buf {
continue;
}
let cpu_byte = if from_carry >= 2 {
carry[offset + 1]
} else {
buf[0]
};
if cpu_byte >= 0x80 {
state.remote_cpu.store((cpu_byte & 0x7F).min(100), Ordering::Relaxed);
break;
}
}
}
// 2) Fast scan within buf for status messages.
// Data packets are all zeros, so memchr (SIMD) exits almost instantly.
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) {
if buf[pos + 1] >= 0x80 {
let cpu = buf[pos + 1] & 0x7F;
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
}
}
}
// 3) Save trailing bytes for the next read.
carry_len = n.min(STATUS_MSG_SIZE - 1);
if n >= carry_len {
carry[..carry_len].copy_from_slice(&buf[n - carry_len..n]);
}
}
}
}
@@ -388,30 +421,39 @@ async fn udp_client_tx_loop(
async fn udp_client_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut last_seq: Option<u32> = None;
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
while state.running.load(Ordering::Relaxed) {
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await {
Ok(Ok(n)) if n >= 4 => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
tokio::select! {
biased;
res = socket.recv(&mut buf) => {
match res {
Ok(n) if n >= 4 => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
}
}
last_seq = Some(seq);
}
Ok(_) => {}
Err(e) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
last_seq = Some(seq);
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(_) => {
_ = &mut timeout => {
tracing::debug!("UDP RX timeout");
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
}
}
@@ -529,3 +571,84 @@ async fn udp_client_status_loop(
}
}
}
/// Scan for a status message in `carry` + `buf` and return the CPU value if found.
#[allow(dead_code)]
pub fn scan_status_message(carry: &[u8], buf: &[u8]) -> Option<u8> {
let carry_len = carry.len();
// 1) Check split across carry + buf
if carry_len > 0 {
for offset in 0..carry_len {
if carry[offset] != STATUS_MSG_TYPE {
continue;
}
let from_carry = carry_len - offset;
let from_buf = STATUS_MSG_SIZE - from_carry;
if buf.len() < from_buf {
continue;
}
let cpu_byte = if from_carry >= 2 {
carry[offset + 1]
} else {
buf[0]
};
if cpu_byte >= 0x80 {
return Some((cpu_byte & 0x7F).min(100));
}
}
}
// 2) Check within buf
let n = buf.len();
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) {
if buf[pos + 1] >= 0x80 {
return Some((buf[pos + 1] & 0x7F).min(100));
}
}
}
None
}
#[cfg(test)]
mod status_scan_tests {
use super::*;
#[test]
fn test_status_within_buffer() {
let mut buf = [0u8; 256];
buf[10] = STATUS_MSG_TYPE;
buf[11] = 0x80 | 42;
assert_eq!(scan_status_message(&[], &buf), Some(42));
}
#[test]
fn test_status_split_across_reads() {
// 12-byte status message: [0x07, 0x80|50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// Split: first 5 bytes in carry, last 7 bytes in buf
let carry = [0x07, 0xB2, 0, 0, 0];
let buf = [0, 0, 0, 0, 0, 0, 0];
assert_eq!(scan_status_message(&carry, &buf), Some(50));
}
#[test]
fn test_status_split_at_boundary() {
// Split: first 1 byte (0x07) in carry, rest in buf
let carry = [0x07];
let mut buf = [0u8; 20];
buf[0] = 0x80 | 77;
assert_eq!(scan_status_message(&carry, &buf), Some(77));
}
#[test]
fn test_no_status_in_zeros() {
let buf = [0u8; 256];
assert_eq!(scan_status_message(&[], &buf), None);
}
#[test]
fn test_short_buffer_no_panic() {
let buf = [0x07, 0x80];
assert_eq!(scan_status_message(&[], &buf), None);
}
}

View File

@@ -139,27 +139,32 @@ fn get_cpu_times() -> (u64, u64) {
#[cfg(target_os = "freebsd")]
fn get_cpu_times() -> (u64, u64) {
// kern.cp_time returns: user nice system interrupt idle
if let Ok(output) = std::process::Command::new("sysctl")
.arg("-n")
.arg("kern.cp_time")
.output()
{
if output.status.success() {
let text = String::from_utf8_lossy(&output.stdout);
let parts: Vec<u64> = text
.split_whitespace()
.filter_map(|s| s.parse().ok())
.collect();
if parts.len() >= 5 {
let user = parts[0];
let nice = parts[1];
let system = parts[2];
let interrupt = parts[3];
let idle = parts[4];
let total = user + nice + system + interrupt + idle;
return (total, idle);
}
}
const CTL_KERN: libc::c_int = 1;
const KERN_CP_TIME: libc::c_int = 40;
let mut mib: [libc::c_int; 2] = [CTL_KERN, KERN_CP_TIME];
let mut cp_time: [libc::c_ulong; 5] = [0; 5];
let mut len = std::mem::size_of_val(&cp_time);
let ret = unsafe {
libc::sysctl(
mib.as_mut_ptr(),
mib.len() as u32,
&mut cp_time as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if ret == 0 {
let user = cp_time[0] as u64;
let nice = cp_time[1] as u64;
let system = cp_time[2] as u64;
let interrupt = cp_time[3] as u64;
let idle = cp_time[4] as u64;
let total = user + nice + system + interrupt + idle;
return (total, idle);
}
(0, 0)
}

View File

@@ -9,7 +9,7 @@ use std::path::Path;
use std::sync::Mutex;
use std::time::SystemTime;
static CSV_FILE: Mutex<Option<String>> = Mutex::new(None);
static CSV_FILE: Mutex<Option<(String, std::fs::File)>> = Mutex::new(None);
static QUIET: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
const HEADER: &str = "timestamp,host,port,protocol,direction,duration_s,tx_avg_mbps,rx_avg_mbps,tx_bytes,rx_bytes,lost_packets,local_cpu_pct,remote_cpu_pct,auth_type";
@@ -18,12 +18,12 @@ const HEADER: &str = "timestamp,host,port,protocol,direction,duration_s,tx_avg_m
pub fn init(path: &str) -> std::io::Result<()> {
let needs_header = !Path::new(path).exists() || std::fs::metadata(path)?.len() == 0;
let mut f = OpenOptions::new().create(true).append(true).open(path)?;
if needs_header {
let mut f = OpenOptions::new().create(true).write(true).open(path)?;
writeln!(f, "{}", HEADER)?;
}
*CSV_FILE.lock().unwrap() = Some(path.to_string());
*CSV_FILE.lock().unwrap() = Some((path.to_string(), f));
Ok(())
}
@@ -49,8 +49,8 @@ pub fn write_result(
remote_cpu: u8,
auth_type: &str,
) {
let guard = CSV_FILE.lock().unwrap();
if let Some(ref path) = *guard {
let mut guard = CSV_FILE.lock().unwrap();
if let Some((ref _path, ref mut file)) = *guard {
let tx_mbps = if duration_secs > 0 {
tx_bytes as f64 * 8.0 / duration_secs as f64 / 1_000_000.0
} else {
@@ -74,9 +74,8 @@ pub fn write_result(
local_cpu, remote_cpu, auth_type,
);
if let Ok(mut f) = OpenOptions::new().append(true).open(path) {
let _ = writeln!(f, "{}", row);
}
let _ = writeln!(file, "{}", row);
let _ = file.flush();
}
}

View File

@@ -42,6 +42,8 @@ static WEIERSTRASS_A: LazyLock<BigUint> = LazyLock::new(|| {
.unwrap()
});
pub static WCURVE: LazyLock<WCurve> = LazyLock::new(WCurve::new);
const MONT_A: u64 = 486662;
// --- Modular arithmetic ---
@@ -242,14 +244,14 @@ impl Point {
// --- WCurve: Curve25519 in Weierstrass form ---
struct WCurve {
pub struct WCurve {
g: Point,
conversion_from_m: BigUint,
conversion_to_m: BigUint,
}
impl WCurve {
fn new() -> Self {
pub fn new() -> Self {
let p_val = &*P;
let mont_a = BigUint::from(MONT_A);
let three_inv = modinv(&BigUint::from(3u32), p_val);
@@ -360,7 +362,7 @@ pub async fn client_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
password: &str,
) -> Result<()> {
tracing::info!("Starting EC-SRP5 authentication");
let w = WCurve::new();
let w = &*WCURVE;
// Generate client ephemeral keypair
let s_a: [u8; 32] = rand::random();
@@ -477,7 +479,7 @@ impl EcSrp5Credentials {
/// Derive EC-SRP5 credentials from username/password (done once at startup).
pub fn derive(username: &str, password: &str) -> Self {
let salt: [u8; 16] = rand::random();
let w = WCurve::new();
let w = &*WCURVE;
let i = w.gen_password_validator_priv(username, password, &salt);
let (x_gamma, parity) = w.gen_public_key(&i);
Self {
@@ -496,7 +498,7 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
creds: &EcSrp5Credentials,
) -> Result<()> {
tracing::info!("Starting EC-SRP5 server authentication");
let w = WCurve::new();
let w = &*WCURVE;
// MSG1: read [len][username\0][pubkey:32][parity:1]
let mut len_buf = [0u8; 1];
@@ -599,7 +601,12 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
mod hex {
pub fn encode(data: &[u8]) -> String {
data.iter().map(|b| format!("{:02x}", b)).collect()
let mut s = String::with_capacity(data.len() * 2);
for b in data {
use std::fmt::Write;
let _ = write!(s, "{:02x}", b);
}
s
}
}

View File

@@ -4,6 +4,8 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::Mutex;
@@ -18,6 +20,7 @@ struct TcpSession {
peer_ip: std::net::IpAddr,
streams: Vec<TcpStream>,
expected: u8,
notify: Arc<Notify>,
}
type SessionMap = Arc<Mutex<HashMap<u16, TcpSession>>>;
@@ -169,6 +172,7 @@ async fn handle_client(
stream.flush().await?;
session.streams.push(stream);
session.notify.notify_one();
tracing::info!(
"Secondary connection joined ({}/{})",
session.streams.len() + 1,
@@ -249,6 +253,7 @@ async fn handle_client(
for (_t, s) in map.iter_mut() {
if s.peer_ip == peer.ip() && s.streams.len() < s.expected as usize {
s.streams.push(stream);
s.notify.notify_one();
return Ok(());
}
}
@@ -299,12 +304,14 @@ async fn handle_client(
let conn_count = cmd.tcp_conn_count;
// Register session for secondary connections to find
let notify = Arc::new(Notify::new());
{
let mut map = sessions.lock().await;
map.insert(session_token, TcpSession {
peer_ip: peer.ip(),
streams: Vec::new(),
expected: conn_count,
notify: notify.clone(),
});
}
@@ -320,7 +327,8 @@ async fn handle_client(
if count + 1 >= conn_count as usize {
break;
}
if Instant::now() > deadline {
let now = Instant::now();
if now >= deadline {
tracing::warn!(
"Timeout waiting for TCP connections ({}/{}), proceeding",
count + 1,
@@ -328,7 +336,17 @@ async fn handle_client(
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
match tokio::time::timeout(deadline - now, notify.notified()).await {
Ok(()) => continue,
Err(_) => {
tracing::warn!(
"Timeout waiting for TCP connections ({}/{}), proceeding",
count + 1,
conn_count,
);
break;
}
}
}
let extra_streams = {
@@ -589,8 +607,10 @@ async fn tcp_tx_loop_inner(
let mut status_seq: u32 = 0;
while state.running.load(Ordering::Relaxed) {
let now = Instant::now();
// Inject status message every ~1 second if in bidirectional mode
if send_status && Instant::now() >= next_status {
if send_status && now >= next_status {
status_seq += 1;
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage { cpu_load: crate::cpu::get(),
@@ -603,7 +623,7 @@ async fn tcp_tx_loop_inner(
}
state.record_interval(0, rx_bytes, 0);
bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None);
next_status = Instant::now() + Duration::from_secs(1);
next_status = now + Duration::from_secs(1);
}
if !state.spend_budget(effective_size as u64) {
@@ -619,12 +639,11 @@ async fn tcp_tx_loop_inner(
state.tx_speed_changed.store(false, Ordering::Relaxed);
let new_speed = state.tx_speed.load(Ordering::Relaxed);
interval = bandwidth::calc_send_interval(new_speed, tx_size as u16);
next_send = Instant::now();
next_send = now;
}
match interval {
Some(iv) => {
let now = Instant::now();
if let Some(delay) = bandwidth::advance_next_send(&mut next_send, iv, now) {
tokio::time::sleep(delay).await;
}
@@ -918,36 +937,43 @@ async fn udp_tx_loop(
async fn udp_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut last_seq: Option<u32> = None;
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
while state.running.load(Ordering::Relaxed) {
// Use recv_from to accept packets from any source port
// (multi-connection MikroTik sends from multiple ports)
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
Ok(Ok((n, _src))) if n >= 4 => {
if !state.spend_budget(n as u64) {
break;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
tokio::select! {
biased;
res = socket.recv_from(&mut buf) => {
match res {
Ok((n, _src)) if n >= 4 => {
if !state.spend_budget(n as u64) {
return;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
}
}
last_seq = Some(seq);
state.last_udp_seq.store(seq, Ordering::Relaxed);
}
Ok(_) => {}
Err(e) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
last_seq = Some(seq);
state.last_udp_seq.store(seq, Ordering::Relaxed);
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(_) => {
_ = &mut timeout => {
tracing::debug!("UDP RX timeout");
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
}
}

View File

@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use btest_rs::bandwidth::BandwidthState;
use super::quota::{Direction, QuotaManager};
use super::quota::{Direction, QuotaError, QuotaManager};
/// Enforces quotas during an active test session.
/// Call `run()` as a spawned task — it will set `state.running = false`
@@ -154,10 +154,10 @@ impl QuotaEnforcer {
// The DB has usage from PREVIOUS sessions; we add current session bytes
if let Err(e) = self.quota_mgr.check_user(&self.username) {
// Already exceeded from previous sessions
return match format!("{}", e).as_str() {
s if s.contains("daily") => StopReason::UserDailyQuota,
s if s.contains("weekly") => StopReason::UserWeeklyQuota,
s if s.contains("monthly") => StopReason::UserMonthlyQuota,
return match e {
QuotaError::DailyExceeded { .. } => StopReason::UserDailyQuota,
QuotaError::WeeklyExceeded { .. } => StopReason::UserWeeklyQuota,
QuotaError::MonthlyExceeded { .. } => StopReason::UserMonthlyQuota,
_ => StopReason::UserDailyQuota,
};
}
@@ -169,13 +169,13 @@ impl QuotaEnforcer {
StopReason::Running
}
fn check_ip_with_session(&self, ip_str: &str, session_tx: u64, session_rx: u64) -> StopReason {
fn check_ip_with_session(&self, _ip_str: &str, _session_tx: u64, _session_rx: u64) -> StopReason {
if let Err(e) = self.quota_mgr.check_ip(&self.ip, Direction::Both) {
return match format!("{}", e).as_str() {
s if s.contains("IP daily") => StopReason::IpDailyQuota,
s if s.contains("IP weekly") => StopReason::IpWeeklyQuota,
s if s.contains("IP monthly") => StopReason::IpMonthlyQuota,
s if s.contains("connections") => StopReason::IpDailyQuota, // reuse
return match e {
QuotaError::IpDailyExceeded { .. } | QuotaError::IpInboundDailyExceeded { .. } | QuotaError::IpOutboundDailyExceeded { .. } => StopReason::IpDailyQuota,
QuotaError::IpWeeklyExceeded { .. } | QuotaError::IpInboundWeeklyExceeded { .. } | QuotaError::IpOutboundWeeklyExceeded { .. } => StopReason::IpWeeklyQuota,
QuotaError::IpMonthlyExceeded { .. } | QuotaError::IpInboundMonthlyExceeded { .. } | QuotaError::IpOutboundMonthlyExceeded { .. } => StopReason::IpMonthlyQuota,
QuotaError::TooManyConnections { .. } => StopReason::IpDailyQuota, // reuse
_ => StopReason::IpDailyQuota,
};
}
@@ -183,13 +183,13 @@ impl QuotaEnforcer {
}
/// Flush session bytes to DB. Call periodically and at session end.
pub fn flush_to_db(&self) {
pub fn flush_to_db(&self, ip_str: &str) {
let tx = self.state.total_tx_bytes.load(Ordering::Relaxed);
let rx = self.state.total_rx_bytes.load(Ordering::Relaxed);
// From server perspective: tx = outbound (we sent), rx = inbound (we received)
self.quota_mgr.record_usage(
&self.username,
&self.ip.to_string(),
ip_str,
rx, // inbound = what we received from client
tx, // outbound = what we sent to client
);
@@ -330,7 +330,7 @@ mod tests {
qm, "testuser".into(), "127.0.0.1".parse().unwrap(),
state, 10, 0,
);
enforcer.flush_to_db();
enforcer.flush_to_db("127.0.0.1");
// flush_to_db: total_tx=5000→outbound, total_rx=3000→inbound
// quota_mgr.record_usage(inbound=3000, outbound=5000)

View File

@@ -609,32 +609,20 @@ impl UserDb {
fn hash_password(username: &str, password: &str) -> String {
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(format!("{}:{}", username, password).as_bytes());
hasher.update(username.as_bytes());
hasher.update(b":");
hasher.update(password.as_bytes());
let result = hasher.finalize();
result.iter().map(|b| format!("{:02x}", b)).collect()
let mut hex = String::with_capacity(64);
for b in result {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
hex
}
fn chrono_date_today() -> String {
// Simple date without chrono crate
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
let days = secs / 86400;
let mut y = 1970u64;
let mut remaining = days;
loop {
let leap = if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) { 366 } else { 365 };
if remaining < leap { break; }
remaining -= leap;
y += 1;
}
let leap = y % 4 == 0 && (y % 100 != 0 || y % 400 == 0);
let days_in_months = [31u64, if leap { 29 } else { 28 }, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
let mut m = 0usize;
for i in 0..12 {
if remaining < days_in_months[i] { m = i; break; }
remaining -= days_in_months[i];
}
format!("{:04}-{:02}-{:02}", y, m + 1, remaining + 1)
chrono::Local::now().format("%Y-%m-%d").to_string()
}
// Re-export for use by rusqlite

View File

@@ -36,12 +36,12 @@ pub fn init(target: &str) -> std::io::Result<()> {
/// Send a syslog message with the given severity and message.
/// Severity: 6=info, 4=warning, 3=error
fn send(severity: u8, msg: &str) {
// Format timestamp outside the lock to minimize contention
let priority = 128 + severity;
let timestamp = bsd_timestamp();
let guard = SYSLOG.lock().unwrap();
if let Some(ref sender) = *guard {
// RFC 3164 (BSD syslog): <priority>Mon DD HH:MM:SS hostname program: message
// facility=16 (local0) * 8 + severity
let priority = 128 + severity;
let timestamp = bsd_timestamp();
let syslog_msg = format!(
"<{}>{} {} btest-rs: {}",
priority, timestamp, sender.hostname, msg,
@@ -52,44 +52,7 @@ fn send(severity: u8, msg: &str) {
fn bsd_timestamp() -> String {
// RFC 3164 format: "Mon DD HH:MM:SS" (no year)
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Simple conversion — good enough for syslog
let secs_in_day = 86400u64;
let days = now / secs_in_day;
let time_of_day = now % secs_in_day;
let hours = time_of_day / 3600;
let minutes = (time_of_day % 3600) / 60;
let seconds = time_of_day % 60;
// Day of year calculation (approximate months)
let months = ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"];
let days_in_months = [31u64,28,31,30,31,30,31,31,30,31,30,31];
// Days since epoch to year/month/day
let mut y = 1970u64;
let mut remaining = days;
loop {
let leap = if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) { 366 } else { 365 };
if remaining < leap { break; }
remaining -= leap;
y += 1;
}
let leap = y % 4 == 0 && (y % 100 != 0 || y % 400 == 0);
let mut m = 0usize;
for i in 0..12 {
let mut d = days_in_months[i];
if i == 1 && leap { d += 1; }
if remaining < d { m = i; break; }
remaining -= d;
}
let day = remaining + 1;
format!("{} {:2} {:02}:{:02}:{:02}", months[m], day, hours, minutes, seconds)
chrono::Local::now().format("%b %e %H:%M:%S").to_string()
}
// --- Public logging functions ---