11 Commits

Author SHA1 Message Date
Siavash Sameni
82ea10f2d5 Bump version to 0.6.3, temporarily disable release CI
Some checks failed
CI / test (push) Failing after 1m35s
Disable release workflow trigger to prevent duplicate builds
when tagging manually with pre-built binaries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 11:51:34 +04:00
Siavash Sameni
e6cecc7bd8 Perf: cache EC-SRP5 constants, optimize TCP I/O, fix LDAP security
- Cache Curve25519 constants (P, CURVE_ORDER, WEIERSTRASS_A) with LazyLock
  eliminating ~768 BigUint heap allocations per auth handshake
- Optimize scalar_mul to use bit() instead of clone+shift
- Set TCP socket buffers to 4MB via socket2 (matching UDP path)
- Increase TCP RX buffers from 64KB to 256KB
- Use 256KB writes at unlimited rate (vs 32KB) reducing syscall overhead
- Fix LDAP filter injection with RFC 4515 escaping
- Fix unwrap panic on empty LDAP search results

Benchmarked on WiFi against MikroTik:
  TCP Download: +67% (19.7 → 32.9 Mbps avg)
  TCP Upload:   +87% (3.6 → 6.7 Mbps avg)
  Local CPU:    lower across all tests (29-36% vs 32-58%)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 10:06:21 +04:00
Siavash Sameni
da76c76c93 Update architecture docs: server-pro, Android, CPU platforms, byte budget
All checks were successful
CI / test (push) Successful in 2m27s
Complete rewrite reflecting current state: server-pro module structure,
BandwidthState fields, all 6 build targets, CPU sampling on 5 platforms,
web dashboard API endpoints, test counts, and key design decisions
including inline byte budget and TCP status message scanning.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:51:07 +04:00
Siavash Sameni
27c69d8982 Fix unused variable warning in test
Some checks failed
Build & Release / release (push) Has been cancelled
CI / test (push) Successful in 2m35s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:40:40 +04:00
Siavash Sameni
2cb8519c95 Suppress non_snake_case warning for Win32 FILETIME struct
Some checks failed
CI / test (push) Failing after 1m40s
Build & Release / release (push) Successful in 4m46s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:34:21 +04:00
Siavash Sameni
9ca124cb76 Fix CPU reporting: Android support, TCP remote CPU parsing
All checks were successful
CI / test (push) Successful in 2m33s
Build & Release / release (push) Successful in 5m11s
- Add target_os = "android" to CPU sampler (reads /proc/stat like Linux)
- Parse remote CPU from interleaved TCP status messages in BOTH mode
- Add dedicated status reader for TX-only mode (reads server's 12-byte
  status messages to get remote CPU and enable speed adaptation)
- Add 3 CPU integration tests: local CPU, TCP BOTH remote, TCP TX-only

Fixes: Android always showing cpu: 0%/0%, TCP remote CPU always 0%
on all platforms (btest-to-btest and btest-to-MikroTik).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 08:28:45 +04:00
Siavash Sameni
c06a4d0c9a Add public server links to README, fix dead_code warnings
All checks were successful
CI / test (push) Successful in 2m12s
- Add Free Public Servers section with US/EU endpoints and usage examples
- Add Server Pro section documenting the optional pro build
- Add Android/Termux to supported platforms and installation guide
- Gate pro-only public functions with #[cfg(feature = "pro")] to eliminate
  6 dead_code warnings in the standard build

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 19:57:18 +04:00
Siavash Sameni
817535a0ad Add Android aarch64/armv7 targets to release builds
Some checks failed
CI / test (push) Failing after 1m31s
Build & Release / release (push) Successful in 4m49s
Adds ARMv8 (aarch64-linux-android) and ARMv7 (armv7-linux-androideabi)
builds for Termux/Android using the Android NDK r27c. Release artifacts
now include btest-android-aarch64.tar.gz and btest-android-armv7.tar.gz.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 19:24:35 +04:00
Siavash Sameni
ba02ed36b5 Merge feature/server-pro into main
Adds btest-server-pro: multi-user bandwidth test server with SQLite DB,
per-IP quotas (daily/weekly/monthly), inline byte budget enforcement,
TCP multi-connection support, MD5 auth, web dashboard with Chart.js
graphs, quota progress bars, and JSON export.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 18:44:16 +04:00
Siavash Sameni
4cdcc4e6c4 Public btest server: byte budget, multi-conn, web dashboard, quotas
- Inline byte budget in BandwidthState prevents quota overshoot at any
  link speed (TX/RX loops check per-packet, not per-interval)
- TCP multi-connection support for server-pro (session tokens, secondary
  connection joins, delegates to standard multi-conn handler)
- MD5 password verification against stored raw passwords in user DB
- Web dashboard: quota progress bars (daily/weekly/monthly), JSON export
  endpoint (/api/ip/{ip}/export), quota API (/api/ip/{ip}/quota)
- Landing page with usage instructions, UDP NAT warning, credentials
- Fix IP usage double-counting bug in QuotaManager::record_usage
- UserDb now stores DB path and raw passwords for MD5 auth
- 10 enforcer tests (4 new: budget calc, budget stop, budget exhausted,
  unlimited passthrough)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 18:43:09 +04:00
Siavash Sameni
89391e1781 Add OpenWrt ipk packaging + split client/server binaries
Some checks failed
CI / test (push) Failing after 1m27s
OpenWrt package (deploy/openwrt/):
- build-ipk.sh: creates .ipk from pre-built binary (no SDK needed)
- Makefile: for OpenWrt SDK integration
- ProCD init script with UCI config
- Supports all architectures (x86_64, aarch64, mipsel, mips)

Split binaries for embedded (src/bin/):
- btest-client: client-only, no server/syslog/csv
- btest-server: server-only, no client
- release-small profile: opt-level=z + panic=abort

Sizes (compressed .tar.gz):
  Full btest:    ~1 MB
  btest-client:  ~500 KB (release-small)
  btest-server:  ~550 KB (release-small)

Install on OpenWrt:
  opkg install btest-rs_0.6.0-1_x86_64.ipk

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 14:44:57 +04:00
23 changed files with 1629 additions and 412 deletions

View File

@@ -3,7 +3,7 @@ name: Build & Release
on: on:
push: push:
tags: tags:
- 'v*' - 'disabled-v*'
jobs: jobs:
release: release:
@@ -14,7 +14,7 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: | run: |
apt-get update && apt-get install -y --no-install-recommends \ apt-get update && apt-get install -y --no-install-recommends \
git curl jq ca-certificates zip \ git curl jq ca-certificates zip unzip \
musl-tools \ musl-tools \
gcc-aarch64-linux-gnu \ gcc-aarch64-linux-gnu \
gcc-arm-linux-gnueabihf \ gcc-arm-linux-gnueabihf \
@@ -23,7 +23,14 @@ jobs:
x86_64-unknown-linux-musl \ x86_64-unknown-linux-musl \
aarch64-unknown-linux-musl \ aarch64-unknown-linux-musl \
armv7-unknown-linux-musleabihf \ armv7-unknown-linux-musleabihf \
x86_64-pc-windows-gnu x86_64-pc-windows-gnu \
aarch64-linux-android \
armv7-linux-androideabi
# Install Android NDK for cross-compilation
NDK_VER=r27c
curl -sL https://dl.google.com/android/repository/android-ndk-${NDK_VER}-linux.zip -o /tmp/ndk.zip
unzip -q /tmp/ndk.zip -d /opt && rm /tmp/ndk.zip
export ANDROID_NDK_HOME=/opt/android-ndk-${NDK_VER}
- name: Ensure code is present - name: Ensure code is present
run: | run: |
@@ -47,6 +54,12 @@ jobs:
[target.x86_64-pc-windows-gnu] [target.x86_64-pc-windows-gnu]
linker = "x86_64-w64-mingw32-gcc" linker = "x86_64-w64-mingw32-gcc"
[target.aarch64-linux-android]
linker = "/opt/android-ndk-r27c/toolchains/llvm/prebuilt/linux-x86_64/bin/aarch64-linux-android35-clang"
[target.armv7-linux-androideabi]
linker = "/opt/android-ndk-r27c/toolchains/llvm/prebuilt/linux-x86_64/bin/armv7a-linux-androideabi35-clang"
TOML TOML
- name: Build Linux x86_64 - name: Build Linux x86_64
@@ -61,6 +74,12 @@ jobs:
- name: Build Windows x86_64 - name: Build Windows x86_64
run: cargo build --release --target x86_64-pc-windows-gnu run: cargo build --release --target x86_64-pc-windows-gnu
- name: Build Android aarch64 (ARMv8)
run: cargo build --release --target aarch64-linux-android
- name: Build Android armv7 (ARMv7)
run: cargo build --release --target armv7-linux-androideabi
- name: Package all - name: Package all
run: | run: |
mkdir -p /artifacts mkdir -p /artifacts
@@ -81,6 +100,14 @@ jobs:
zip /artifacts/btest-windows-x86_64.zip btest.exe zip /artifacts/btest-windows-x86_64.zip btest.exe
cd - cd -
cd target/aarch64-linux-android/release
tar czf /artifacts/btest-android-aarch64.tar.gz btest
cd -
cd target/armv7-linux-androideabi/release
tar czf /artifacts/btest-android-armv7.tar.gz btest
cd -
cd /artifacts cd /artifacts
sha256sum * > checksums-sha256.txt sha256sum * > checksums-sha256.txt
cat checksums-sha256.txt cat checksums-sha256.txt
@@ -103,6 +130,8 @@ jobs:
| Linux | aarch64 (RPi 64-bit) | btest-linux-aarch64.tar.gz | | Linux | aarch64 (RPi 64-bit) | btest-linux-aarch64.tar.gz |
| Linux | armv7 (RPi 32-bit) | btest-linux-armv7.tar.gz | | Linux | armv7 (RPi 32-bit) | btest-linux-armv7.tar.gz |
| Windows | x86_64 | btest-windows-x86_64.zip | | Windows | x86_64 | btest-windows-x86_64.zip |
| Android | aarch64 (ARMv8, Termux) | btest-android-aarch64.tar.gz |
| Android | armv7 (ARMv7, Termux) | btest-android-armv7.tar.gz |
| macOS | aarch64 / x86_64 | Run \`scripts/build-macos-release.sh --upload ${TAG}\` | | macOS | aarch64 / x86_64 | Run \`scripts/build-macos-release.sh --upload ${TAG}\` |
| Docker | x86_64 | \`docker pull ${REGISTRY}/manawenuz/btest-rs:${TAG}\` | | Docker | x86_64 | \`docker pull ${REGISTRY}/manawenuz/btest-rs:${TAG}\` |

2
Cargo.lock generated
View File

@@ -229,7 +229,7 @@ dependencies = [
[[package]] [[package]]
name = "btest-rs" name = "btest-rs"
version = "0.6.0" version = "0.6.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"askama", "askama",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "btest-rs" name = "btest-rs"
version = "0.6.0" version = "0.6.3"
edition = "2021" edition = "2021"
description = "MikroTik Bandwidth Test (btest) server and client with EC-SRP5 auth — a Rust reimplementation" description = "MikroTik Bandwidth Test (btest) server and client with EC-SRP5 auth — a Rust reimplementation"
license = "MIT AND Apache-2.0" license = "MIT AND Apache-2.0"
@@ -16,6 +16,14 @@ path = "src/lib.rs"
name = "btest" name = "btest"
path = "src/main.rs" path = "src/main.rs"
[[bin]]
name = "btest-client"
path = "src/bin/client_only.rs"
[[bin]]
name = "btest-server"
path = "src/bin/server_only.rs"
[[bin]] [[bin]]
name = "btest-server-pro" name = "btest-server-pro"
path = "src/server_pro/main.rs" path = "src/server_pro/main.rs"
@@ -54,3 +62,9 @@ opt-level = 3
lto = true lto = true
strip = true strip = true
codegen-units = 1 codegen-units = 1
# Minimal size profile for embedded/OpenWrt targets
[profile.release-small]
inherits = "release"
opt-level = "z"
panic = "abort"

View File

@@ -2,6 +2,25 @@
A Rust reimplementation of the [MikroTik Bandwidth Test (btest)](https://wiki.mikrotik.com/wiki/Manual:Tools/Bandwidth_Test) protocol. Both server and client modes, fully compatible with MikroTik RouterOS devices. A Rust reimplementation of the [MikroTik Bandwidth Test (btest)](https://wiki.mikrotik.com/wiki/Manual:Tools/Bandwidth_Test) protocol. Both server and client modes, fully compatible with MikroTik RouterOS devices.
## Free Public Servers
Test your MikroTik link speed right now — no setup, no registration:
| Server | Location | Dashboard |
|--------|----------|-----------|
| `104.225.217.60` | US | [btest.home.kg](https://btest.home.kg) |
| `188.245.59.196` | EU | [btest.mikata.ru](https://btest.mikata.ru) |
```
/tool bandwidth-test address=104.225.217.60 user=btest password=btest protocol=tcp direction=both
```
After the test, visit `https://btest.home.kg/dashboard/YOUR_IP` to see your results, throughput history, and quota usage. Per-IP limits: 2 GB daily / 8 GB weekly / 24 GB monthly.
> **Note:** TCP is recommended for remote testing. UDP bidirectional through NAT will only show one direction — this is a btest protocol limitation, not specific to btest-rs. See [KNOWN_ISSUES.md](KNOWN_ISSUES.md) for details.
Want to run your own public server? Build with `cargo build --release --features pro` — see [Server Pro](#server-pro) below.
## Features ## Features
- **Full protocol support** -- TCP and UDP data transfer, IPv4 and IPv6 - **Full protocol support** -- TCP and UDP data transfer, IPv4 and IPv6
@@ -16,7 +35,7 @@ A Rust reimplementation of the [MikroTik Bandwidth Test (btest)](https://wiki.mi
- **Quiet mode** -- suppress terminal output for scripted/automated use - **Quiet mode** -- suppress terminal output for scripted/automated use
- **NAT traversal** -- probe packet to open firewall holes for UDP receive - **NAT traversal** -- probe packet to open firewall holes for UDP receive
- **Single static binary** -- ~2 MB, zero runtime dependencies (musl build) - **Single static binary** -- ~2 MB, zero runtime dependencies (musl build)
- **Cross-platform** -- macOS, Linux (x86_64, ARM64), Docker - **Cross-platform** -- macOS, Linux (x86_64, ARM64, ARMv7), Windows, Android (Termux), Docker
- **Async I/O** -- tokio-based, handles many concurrent connections efficiently - **Async I/O** -- tokio-based, handles many concurrent connections efficiently
## Performance ## Performance
@@ -61,6 +80,10 @@ sudo mv btest /usr/local/bin/
# Windows # Windows
# Download btest-windows-x86_64.zip from releases # Download btest-windows-x86_64.zip from releases
# Android (Termux, no root needed)
curl -L <release-url>/btest-android-aarch64.tar.gz | tar xz
mv btest $PREFIX/bin/
``` ```
### Raspberry Pi ### Raspberry Pi
@@ -267,6 +290,29 @@ scripts/test-mikrotik.sh <ip> # Test against MikroTik device
scripts/test-docker.sh # Docker container test scripts/test-docker.sh # Docker container test
``` ```
## Server Pro
An optional superset of the standard server with multi-user support, quotas, and a web dashboard. Build with `--features pro`:
```bash
cargo build --release --features pro --bin btest-server-pro
```
Features:
- **SQLite user database** — add/remove users, per-user quotas
- **Per-IP bandwidth quotas** — daily, weekly, monthly limits with inline byte budget enforcement
- **Web dashboard** — session history, throughput stats, quota progress bars, JSON export
- **TCP multi-connection** — handles MikroTik's default 20-connection mode
- **MD5 auth against DB** — proper challenge-response verification
```bash
# Create a user and start the server
btest-server-pro --users-db users.db useradd btest btest
btest-server-pro --users-db users.db --ip-daily 2147483648 --ip-weekly 8589934592 --web-port 8080
```
The pro features are completely optional and don't affect the standard `btest` binary.
## Credits ## Credits
- **[btest-opensource](https://github.com/samm-git/btest-opensource)** by [Alex Samorukov](https://github.com/samm-git) -- original C implementation and protocol reverse-engineering. Licensed under **MIT**. - **[btest-opensource](https://github.com/samm-git/btest-opensource)** by [Alex Samorukov](https://github.com/samm-git) -- original C implementation and protocol reverse-engineering. Licensed under **MIT**.

57
deploy/openwrt/Makefile Normal file
View File

@@ -0,0 +1,57 @@
# OpenWrt package Makefile for btest-rs
#
# To build:
# 1. Clone the OpenWrt SDK for your target
# 2. Copy this directory to package/btest-rs/ in the SDK
# 3. Run: make package/btest-rs/compile V=s
#
# Or use the pre-built binary approach (see build-ipk.sh)
include $(TOPDIR)/rules.mk
PKG_NAME:=btest-rs
PKG_VERSION:=0.6.0
PKG_RELEASE:=1
PKG_SOURCE:=$(PKG_NAME)-$(PKG_VERSION).tar.gz
PKG_SOURCE_URL:=https://github.com/manawenuz/btest-rs/archive/refs/tags/v$(PKG_VERSION).tar.gz
PKG_HASH:=skip
PKG_BUILD_DEPENDS:=rust/host
PKG_BUILD_DIR:=$(BUILD_DIR)/$(PKG_NAME)-$(PKG_VERSION)
include $(INCLUDE_DIR)/package.mk
define Package/btest-rs
SECTION:=net
CATEGORY:=Network
TITLE:=MikroTik Bandwidth Test server and client
URL:=https://github.com/manawenuz/btest-rs
DEPENDS:=
PKGARCH:=$(ARCH)
endef
define Package/btest-rs/description
A Rust reimplementation of the MikroTik Bandwidth Test (btest) protocol.
Supports TCP/UDP, IPv4/IPv6, EC-SRP5 and MD5 authentication,
multi-connection, syslog, CSV output, and CPU monitoring.
endef
define Build/Compile
cd $(PKG_BUILD_DIR) && \
CARGO_TARGET_DIR=$(PKG_BUILD_DIR)/target \
cargo build --release --target $(RUSTC_TARGET)
endef
define Package/btest-rs/install
$(INSTALL_DIR) $(1)/usr/bin
$(INSTALL_BIN) $(PKG_BUILD_DIR)/target/$(RUSTC_TARGET)/release/btest $(1)/usr/bin/btest
$(INSTALL_DIR) $(1)/etc/init.d
$(INSTALL_BIN) ./files/btest.init $(1)/etc/init.d/btest
$(INSTALL_DIR) $(1)/etc/config
$(INSTALL_CONF) ./files/btest.config $(1)/etc/config/btest
endef
$(eval $(call BuildPackage,btest-rs))

117
deploy/openwrt/build-ipk.sh Executable file
View File

@@ -0,0 +1,117 @@
#!/usr/bin/env bash
# Build an OpenWrt .ipk package from a pre-built static binary.
# No OpenWrt SDK needed — just packages the binary with metadata.
#
# Usage:
# ./deploy/openwrt/build-ipk.sh <arch> [binary-path]
#
# Examples:
# ./deploy/openwrt/build-ipk.sh x86_64 dist/btest # from cross-compiled binary
# ./deploy/openwrt/build-ipk.sh aarch64 dist/btest # for RPi/ARM64 routers
# ./deploy/openwrt/build-ipk.sh mipsel target/release/btest # for MIPS little-endian
#
# Supported architectures: x86_64, aarch64, arm_cortex-a7, mipsel_24kc, mips_24kc
set -euo pipefail
cd "$(dirname "$0")/../.."
ARCH="${1:?Usage: $0 <arch> [binary-path]}"
BINARY="${2:-dist/btest}"
VERSION="0.6.0"
PKG_NAME="btest-rs"
OUTPUT_DIR="dist"
if [ ! -f "$BINARY" ]; then
echo "Error: binary not found at $BINARY"
echo "Build it first: cargo build --release --target <target>"
exit 1
fi
mkdir -p "$OUTPUT_DIR"
WORKDIR=$(mktemp -d)
trap "rm -rf $WORKDIR" EXIT
echo "=== Building ${PKG_NAME}_${VERSION}_${ARCH}.ipk ==="
# Create package structure
mkdir -p "$WORKDIR/data/usr/bin"
mkdir -p "$WORKDIR/data/etc/init.d"
mkdir -p "$WORKDIR/data/etc/config"
mkdir -p "$WORKDIR/control"
# Install files
cp "$BINARY" "$WORKDIR/data/usr/bin/btest"
chmod 755 "$WORKDIR/data/usr/bin/btest"
cp deploy/openwrt/files/btest.init "$WORKDIR/data/etc/init.d/btest"
chmod 755 "$WORKDIR/data/etc/init.d/btest"
cp deploy/openwrt/files/btest.config "$WORKDIR/data/etc/config/btest"
# Calculate installed size
INSTALLED_SIZE=$(du -sk "$WORKDIR/data" | awk '{print $1}')
# Control file
cat > "$WORKDIR/control/control" << EOF
Package: ${PKG_NAME}
Version: ${VERSION}-1
Depends: libc
Source: https://github.com/manawenuz/btest-rs
License: MIT AND Apache-2.0
Section: net
SourceName: ${PKG_NAME}
Maintainer: Siavash Sameni <manwe@manko.yoga>
Architecture: ${ARCH}
Installed-Size: ${INSTALLED_SIZE}
Description: MikroTik Bandwidth Test server and client
A Rust reimplementation of the MikroTik btest protocol.
Supports TCP/UDP, EC-SRP5 and MD5 auth, IPv4/IPv6.
EOF
# Post-install script
cat > "$WORKDIR/control/postinst" << 'EOF'
#!/bin/sh
[ "${IPKG_NO_SCRIPT}" = "1" ] && exit 0
/etc/init.d/btest enable 2>/dev/null || true
exit 0
EOF
chmod 755 "$WORKDIR/control/postinst"
# Pre-remove script
cat > "$WORKDIR/control/prerm" << 'EOF'
#!/bin/sh
/etc/init.d/btest stop 2>/dev/null || true
/etc/init.d/btest disable 2>/dev/null || true
exit 0
EOF
chmod 755 "$WORKDIR/control/prerm"
# Conffiles
cat > "$WORKDIR/control/conffiles" << EOF
/etc/config/btest
EOF
# Build the .ipk (it's just a tar.gz of tar.gz's)
cd "$WORKDIR"
# Create data.tar.gz
(cd data && tar czf ../data.tar.gz .)
# Create control.tar.gz
(cd control && tar czf ../control.tar.gz .)
# Create debian-binary
echo "2.0" > debian-binary
# Package it all
tar czf "${PKG_NAME}_${VERSION}-1_${ARCH}.ipk" debian-binary control.tar.gz data.tar.gz
cd -
cp "$WORKDIR/${PKG_NAME}_${VERSION}-1_${ARCH}.ipk" "$OUTPUT_DIR/"
echo ""
echo "Package: $OUTPUT_DIR/${PKG_NAME}_${VERSION}-1_${ARCH}.ipk"
ls -lh "$OUTPUT_DIR/${PKG_NAME}_${VERSION}-1_${ARCH}.ipk"
echo ""
echo "Install on OpenWrt:"
echo " scp $OUTPUT_DIR/${PKG_NAME}_${VERSION}-1_${ARCH}.ipk root@router:/tmp/"
echo " ssh root@router 'opkg install /tmp/${PKG_NAME}_${VERSION}-1_${ARCH}.ipk'"
echo " ssh root@router '/etc/init.d/btest enable && /etc/init.d/btest start'"

View File

@@ -0,0 +1,7 @@
config server
option enabled '0'
option port '2000'
option auth_user ''
option auth_pass ''
option ecsrp5 '0'
option syslog ''

34
deploy/openwrt/files/btest.init Executable file
View File

@@ -0,0 +1,34 @@
#!/bin/sh /etc/rc.common
# btest-rs OpenWrt init script
START=90
STOP=10
USE_PROCD=1
start_service() {
local enabled port auth_user auth_pass ecsrp5 syslog
config_load btest
config_get_bool enabled server enabled 0
[ "$enabled" -eq 0 ] && return
config_get port server port 2000
config_get auth_user server auth_user ''
config_get auth_pass server auth_pass ''
config_get_bool ecsrp5 server ecsrp5 0
config_get syslog server syslog ''
procd_open_instance
procd_set_param command /usr/bin/btest -s -P "$port"
[ -n "$auth_user" ] && procd_append_param command -a "$auth_user"
[ -n "$auth_pass" ] && procd_append_param command -p "$auth_pass"
[ "$ecsrp5" -eq 1 ] && procd_append_param command --ecsrp5
[ -n "$syslog" ] && procd_append_param command --syslog "$syslog"
procd_set_param respawn
procd_set_param stdout 1
procd_set_param stderr 1
procd_close_instance
}

View File

@@ -2,282 +2,181 @@
## Overview ## Overview
btest-rs is a Rust reimplementation of the MikroTik Bandwidth Test protocol. It operates in two modes: **server** (accepts connections from MikroTik devices) and **client** (connects to MikroTik btest servers). btest-rs is a Rust reimplementation of the MikroTik Bandwidth Test protocol. It operates in two modes: **server** (accepts connections from MikroTik devices) and **client** (connects to MikroTik btest servers). An optional **server-pro** mode adds multi-user support, quotas, and a web dashboard.
## Module Structure ## Module Structure
```mermaid ```
graph TB src/
main["main.rs<br/>CLI parsing (clap)"] ├── main.rs # CLI entry point, argument parsing (clap)
server["server.rs<br/>Server mode"] ├── lib.rs # Public API (re-exports all modules for tests/pro)
client["client.rs<br/>Client mode"] ├── protocol.rs # Wire format: Command, StatusMessage, constants
protocol["protocol.rs<br/>Wire protocol types"] ├── auth.rs # MD5 challenge-response authentication
auth["auth.rs<br/>MD5 authentication"] ├── ecsrp5.rs # EC-SRP5 authentication (Curve25519 Weierstrass)
ecsrp5["ecsrp5.rs<br/>EC-SRP5 authentication<br/>(Curve25519 Weierstrass)"] ├── server.rs # Server mode: listener, TCP/UDP handlers, multi-conn
bandwidth["bandwidth.rs<br/>Rate control & reporting"] ├── client.rs # Client mode: connector, TCP/UDP handlers, status parsing
csv_output["csv_output.rs<br/>CSV result logging"] ├── bandwidth.rs # Rate limiting, formatting, shared BandwidthState, byte budget
syslog["syslog_logger.rs<br/>Remote syslog (RFC 3164)"] ├── cpu.rs # CPU sampler (macOS, Linux, Android, Windows, FreeBSD)
lib["lib.rs<br/>Public API for tests"] ├── csv_output.rs # CSV result logging (append-mode, auto-header)
├── syslog_logger.rs # Remote syslog sender (RFC 3164 / BSD format)
├── bin/
│ ├── client_only.rs # Stripped client binary for embedded/OpenWrt
│ └── server_only.rs # Stripped server binary for embedded/OpenWrt
└── server_pro/ # Optional (--features pro)
├── main.rs # Pro CLI: user management, quota flags, web port
├── server_loop.rs # Accept loop with auth, quotas, multi-conn sessions
├── user_db.rs # SQLite: users, usage, ip_usage, sessions, intervals
├── quota.rs # QuotaManager: per-user + per-IP limits, remaining_budget()
├── enforcer.rs # QuotaEnforcer: periodic checks, max_duration, StopReason
├── ldap_auth.rs # LDAP auth scaffold (not yet wired)
└── web/
└── mod.rs # Axum web dashboard: Chart.js, quota bars, JSON export
```
main --> server ## CLI Output Format
main --> client
main --> bandwidth The client outputs one line per second per direction:
main --> csv_output
main --> syslog ```
server --> protocol [ 5] TX 285.47 Mbps (35684352 bytes) cpu: 20%/62%
server --> auth [ 5] RX 283.64 Mbps (35454988 bytes) cpu: 20%/62% lost: 12
server --> ecsrp5 ```
server --> bandwidth
server --> syslog Format: `[interval] direction speed (bytes) cpu: local%/remote% [lost: N]`
client --> protocol
client --> auth At test end, a summary line:
client --> ecsrp5 ```
client --> bandwidth TEST_END peer=172.16.81.1 proto=TCP dir=both duration=60s tx_avg=284.94Mbps rx_avg=272.83Mbps tx_bytes=2137030656 rx_bytes=2046260728 lost=0
lib --> server
lib --> client
lib --> protocol
lib --> auth
lib --> ecsrp5
lib --> bandwidth
``` ```
## Data Flow ## Data Flow
### Server Mode (MikroTik connects to us) ### Server Mode (MikroTik connects to us)
```mermaid
sequenceDiagram
participant MK as MikroTik Client
participant TCP as TCP Control<br/>(port 2000)
participant SRV as btest-rs Server
participant UDP as UDP Data<br/>(port 2001+)
MK->>TCP: Connect
SRV->>TCP: HELLO [01 00 00 00]
MK->>TCP: Command [16 bytes]
Note over SRV: Parse proto, direction,<br/>tx_size, speeds
alt No auth configured
SRV->>TCP: AUTH_OK [01 00 00 00]
else MD5 auth (RouterOS < 6.43)
SRV->>TCP: AUTH_REQUIRED [02 00 00 00]
SRV->>TCP: Challenge [16 random bytes]
MK->>TCP: Response [16 hash + 32 username]
Note over SRV: Verify MD5(pass + MD5(pass + challenge))
SRV->>TCP: AUTH_OK or AUTH_FAILED
else EC-SRP5 auth (RouterOS >= 6.43, --ecsrp5 flag)
SRV->>TCP: EC-SRP5 [03 00 00 00]
MK->>TCP: [len][username\0][client_pubkey:32][parity:1]
SRV->>TCP: [len][server_pubkey:32][parity:1][salt:16]
MK->>TCP: [len][client_confirmation:32]
SRV->>TCP: [len][server_confirmation:32]
Note over SRV: Curve25519 Weierstrass EC-SRP5<br/>See docs/ecsrp5-research.md
SRV->>TCP: AUTH_OK [01 00 00 00]
end
alt TCP mode
Note over SRV,MK: Data flows on same TCP connection
loop Every second
SRV-->>SRV: Print bandwidth stats
end
else UDP mode
SRV->>TCP: UDP port [2 bytes BE]
Note over SRV: Bind UDP socket
par TX Thread (if server transmits)
loop Continuous
SRV->>UDP: Data packets [seq + payload]
end
and RX Thread (if server receives)
loop Continuous
UDP->>SRV: Data packets [seq + payload]
end
and Status Loop (TCP control)
loop Every 1 second
MK->>TCP: Status [12 bytes]
SRV->>TCP: Status [12 bytes]
Note over SRV: Adjust TX speed<br/>based on client feedback
end
end
end
``` ```
MikroTik → TCP:2000 → HELLO → Command [16 bytes] → Auth → Data Transfer
```
1. Server sends HELLO `[01 00 00 00]`
2. Client sends 16-byte command (protocol, direction, tx_size, speeds, conn_count)
3. Auth: none (`01`), MD5 (`02`), or EC-SRP5 (`03`)
4. TCP: data flows on same connection, 12-byte status messages interleaved every 1s
5. UDP: server sends port number, data on UDP, status exchange stays on TCP
### Client Mode (we connect to MikroTik) ### Client Mode (we connect to MikroTik)
```mermaid 1. Connect to MikroTik:2000
sequenceDiagram 2. Read HELLO, send command
participant CLI as btest-rs Client 3. Auto-detect auth type from response byte, authenticate
participant TCP as TCP Control 4. Start data transfer with status exchange
participant MK as MikroTik Server
CLI->>TCP: Connect to MikroTik:2000 ### Status Message Format (12 bytes)
MK->>TCP: HELLO
CLI->>TCP: Command [16 bytes]
Note over CLI: direction bits tell server<br/>what to do (TX/RX/BOTH)
alt Auth response 01 (no auth)
Note over CLI: No auth, proceed
else Auth response 02 (MD5)
MK->>TCP: Challenge [16 random bytes]
CLI->>TCP: MD5 response [48 bytes]
MK->>TCP: AUTH_OK
else Auth response 03 (EC-SRP5)
CLI->>TCP: [len][username\0][client_pubkey:32][parity:1]
MK->>TCP: [len][server_pubkey:32][parity:1][salt:16]
CLI->>TCP: [len][client_confirmation:32]
MK->>TCP: [len][server_confirmation:32]
MK->>TCP: AUTH_OK
end
Note over CLI,MK: Data transfer begins<br/>(TCP or UDP, same as server)
``` ```
[0x07][cpu:1][pad:2][seq:4 LE][bytes_received:4 LE]
```
- Byte 0: `0x07` (STATUS_MSG_TYPE)
- Byte 1: `0x80 | cpu_percentage` (MikroTik encoding)
- Bytes 4-7: sequence number (little-endian u32)
- Bytes 8-11: bytes received this interval (little-endian u32)
## Threading Model ## Threading Model
```mermaid All I/O is async via tokio. Per-client:
graph TB - **TX task**: sends data packets at target rate
subgraph "Server Process" - **RX task**: receives data, counts bytes, extracts status messages (TCP BOTH mode)
LISTEN["Main Loop<br/>Accept connections"] - **Status loop**: exchanges 12-byte status messages every 1s, prints bandwidth
LISTEN -->|spawn per client| HANDLER - **Status reader** (TCP TX-only): reads server's status messages for remote CPU
subgraph "Per-Client Tasks (tokio)" Shared state via `Arc<BandwidthState>` with atomic counters — no mutexes.
HANDLER["Connection Handler<br/>Handshake + Auth"]
HANDLER --> TX["TX Task<br/>Send data packets"]
HANDLER --> RX["RX Task<br/>Receive data packets"]
HANDLER --> STATUS["Status Loop<br/>Exchange stats every 1s"]
end
end
subgraph "Shared State (Arc + Atomics)" ### BandwidthState Fields
STATE["BandwidthState"]
TX_BYTES["tx_bytes: AtomicU64"] | Field | Type | Purpose |
RX_BYTES["rx_bytes: AtomicU64"] |-------|------|---------|
TX_SPEED["tx_speed: AtomicU32"] | `tx_bytes` | AtomicU64 | Bytes sent this interval (reset by swap) |
RUNNING["running: AtomicBool"] | `rx_bytes` | AtomicU64 | Bytes received this interval |
end | `tx_speed` | AtomicU32 | Target TX speed (dynamic, from server feedback) |
| `running` | AtomicBool | Test active flag |
| `remote_cpu` | AtomicU8 | Remote peer's CPU (from status messages) |
| `byte_budget` | AtomicU64 | Remaining quota bytes (u64::MAX = unlimited) |
| `total_tx_bytes` | AtomicU64 | Cumulative TX (never reset) |
| `total_rx_bytes` | AtomicU64 | Cumulative RX (never reset) |
## Server Pro Architecture
Optional feature (`--features pro`) providing a multi-user public btest server.
TX --> TX_BYTES
RX --> RX_BYTES
STATUS --> TX_BYTES
STATUS --> RX_BYTES
STATUS --> TX_SPEED
TX --> TX_SPEED
TX --> RUNNING
RX --> RUNNING
STATUS --> RUNNING
``` ```
Accept → IP check → HELLO → Command → Auth (DB) → Quota check → Budget set → Test
QuotaEnforcer (parallel)
- checks every N seconds
- max_duration timeout
- sets running=false on exceed
```
**Byte budget**: Before the test starts, `remaining_budget()` computes the minimum remaining quota across all applicable limits. This is stored in `BandwidthState.byte_budget`. Every TX/RX loop checks `spend_budget()` per-packet — when budget hits 0, the test stops immediately. This prevents quota overshoot even on 10+ Gbps links.
**Multi-connection TCP**: MikroTik sends `tcp_conn_count` connections. The first authenticates and registers a session token. Subsequent connections match by token and join. When all connections arrive, the test starts with per-stream TX/RX tasks.
**Web dashboard** (axum):
- `GET /` — landing page with instructions
- `GET /dashboard/{ip}` — per-IP dashboard with Chart.js graph, session table, quota bars
- `GET /api/ip/{ip}/stats` — aggregate stats JSON
- `GET /api/ip/{ip}/sessions` — session list JSON
- `GET /api/ip/{ip}/quota` — quota usage JSON
- `GET /api/ip/{ip}/export` — full export with human-readable fields
- `GET /api/session/{id}/intervals` — per-second throughput data
## CPU Usage Monitoring
A background OS thread samples system CPU every 1 second:
| Platform | Method |
|----------|--------|
| macOS | `host_statistics(HOST_CPU_LOAD_INFO)` |
| Linux | `/proc/stat` aggregate CPU line |
| Android | `/proc/stat` (same as Linux) |
| Windows | `GetSystemTimes()` FFI |
| FreeBSD | `sysctl kern.cp_time` |
Stored in global `AtomicU8`, included in status messages as `0x80 | percentage`.
## Build Targets
| Target | Binary | Notes |
|--------|--------|-------|
| `x86_64-unknown-linux-musl` | btest | Static, zero deps |
| `aarch64-unknown-linux-musl` | btest | RPi 4/5, ARM servers |
| `armv7-unknown-linux-musleabihf` | btest | RPi 3, OpenWrt |
| `x86_64-pc-windows-gnu` | btest.exe | Cross-compiled |
| `aarch64-linux-android` | btest | Termux ARMv8 |
| `armv7-linux-androideabi` | btest | Termux ARMv7 |
| macOS (native) | btest | Apple Silicon + Intel |
| Docker (multi-arch) | image | amd64 + arm64 |
## Key Design Decisions ## Key Design Decisions
### 1. Tokio async runtime 1. **Tokio async runtime** — all I/O is async, handles hundreds of concurrent connections
2. **Lock-free shared state** — AtomicU64 counters, `swap(0)` reads and resets per interval
3. **Direction bits from server perspective**`0x01`=server RX, `0x02`=server TX, `0x03`=both
4. **TCP socket half keepalive** — dropping `OwnedWriteHalf` sends FIN, so unused halves are kept alive
5. **Static musl binary** — ~2 MB, zero runtime dependencies
6. **EC-SRP5 with big integer arithmetic** — Curve25519 Weierstrass form via `num-bigint`
7. **Global singletons for syslog/CSV**`Mutex<Option<...>>` statics, initialized once at startup
8. **Shared BandwidthState for timeout survival** — state created in main(), survives tokio cancellation
9. **Inline byte budget** — per-packet quota check with fast path (u64::MAX = unlimited, returns immediately)
10. **TCP status message scanning** — RX loop detects 12-byte status messages in the data stream by scanning for `0x07` marker byte to extract remote CPU
All I/O is async via tokio. Each client connection spawns independent tasks for TX, RX, and status exchange. This allows handling hundreds of concurrent connections on a single thread pool. ## Tests
### 2. Lock-free shared state | Suite | Count | What |
|-------|-------|------|
TX/RX threads and the status loop share bandwidth counters via `AtomicU64`. No mutexes needed -- `swap(0)` atomically reads and resets counters each interval. | Unit tests (lib) | 12 | Bandwidth parsing, CPU sampling, auth hash vectors |
| Enforcer tests (pro) | 10 | Budget, quota, duration, flush |
### 3. Sequential status loop (matching C pselect) | Integration tests | 8 | Server/client handshake, auth, TCP data |
| EC-SRP5 tests | 6 | Full auth flow, wrong password, UDP bidir |
The UDP status exchange uses a sequential timeout-read-then-send pattern rather than `tokio::select!`. This ensures our status messages are sent exactly every 1 second, preventing MikroTik's speed adaptation from seeing irregular feedback. | Full integration | 23 | All protocols × directions, IPv4/6, CSV, syslog, CPU |
| **Total** | **59** | |
### 4. Direction bits from server perspective
The direction byte in the protocol means what the **server** should do:
- `0x01` (CMD_DIR_RX) = server receives
- `0x02` (CMD_DIR_TX) = server transmits
- `0x03` (CMD_DIR_BOTH) = bidirectional
The client inverts before sending: client "transmit" sends `CMD_DIR_RX` (telling server to receive).
### 5. TCP socket half keepalive
When only one direction is active (e.g., TX only), the unused socket half is kept alive. Dropping `OwnedWriteHalf` sends a TCP FIN, which MikroTik interprets as disconnection.
### 6. Static musl binary
Release builds use musl for a fully static binary with zero runtime dependencies. The binary is approximately 2 MB and runs on any Linux distribution.
### 7. EC-SRP5 with big integer arithmetic
The EC-SRP5 implementation uses `num-bigint` for Curve25519 Weierstrass-form elliptic curve arithmetic. MikroTik's authentication uses the Weierstrass form (not the more common Montgomery or Edwards forms), requiring direct field arithmetic over the prime `2^255 - 19`. The implementation includes point multiplication, `lift_x`, `redp1` (hash-to-curve), and Montgomery coordinate conversion.
### 8. Global singletons for syslog and CSV
The syslog and CSV modules use `Mutex<Option<...>>` global statics. This avoids threading state through every function call while remaining safe. Both modules are initialized once at startup and used from any async task via their public API functions.
### 9. Shared BandwidthState for client duration timeout
When running with `--duration`, the tokio timeout cancels the client future. To preserve stats accumulated during the test, `BandwidthState` is created in `main()` and passed as an `Arc` into `run_client()`. The state survives cancellation because `main()` holds a reference. The `record_interval()` method accumulates totals that `summary()` returns.
### 10. IPv6 socket handling
IPv6 requires special handling on macOS:
- UDP sockets bind to `[::]` for IPv6 peers, `0.0.0.0` for IPv4
- Socket send/receive buffers set to 4MB via `socket2` before wrapping with tokio
- `SocketAddr::new()` used instead of string formatting (avoids `[addr]:port` parsing issues)
- Connected sockets preferred for single-connection (avoids ENOBUFS on `send_to()`)
- NDP probe packet sent before data blast to populate neighbor cache
- Adaptive backoff on ENOBUFS (200μs→10ms, resets on success)
### 11. CPU usage monitoring
A background OS thread samples system CPU every 1 second via:
- **macOS:** `host_statistics(HOST_CPU_LOAD_INFO)` — returns user/system/idle/nice ticks
- **Linux:** `/proc/stat` — reads aggregate CPU line
The percentage is stored in a global `AtomicU8` and included in every status message at byte 1 using MikroTik's encoding: `0x80 | percentage`. On receive, the remote CPU is decoded with `byte & 0x7F` and capped at 100%. Both local and remote CPU are displayed per interval and logged to CSV/syslog.
## File Layout
```
btest-rs/
├── src/
│ ├── main.rs # CLI entry point, argument parsing (clap)
│ ├── lib.rs # Public API (used by integration tests)
│ ├── protocol.rs # Wire format: Command, StatusMessage, constants
│ ├── auth.rs # MD5 challenge-response authentication
│ ├── ecsrp5.rs # EC-SRP5 authentication (Curve25519 Weierstrass)
│ ├── server.rs # Server mode: listener, TCP/UDP handlers
│ ├── client.rs # Client mode: connector, TCP/UDP handlers
│ ├── bandwidth.rs # Rate limiting, formatting, shared state
│ ├── cpu.rs # CPU usage sampler (macOS + Linux)
│ ├── csv_output.rs # CSV result logging (append-mode, auto-header)
│ └── syslog_logger.rs # Remote syslog sender (RFC 3164 / BSD format)
├── tests/
│ └── integration_test.rs # End-to-end server/client tests
├── scripts/
│ ├── build-linux.sh # Cross-compile for x86_64 Linux (musl)
│ ├── build-macos-release.sh # macOS release build
│ ├── install-service.sh # systemd service installer
│ ├── push-docker.sh # Push Docker image to registry
│ ├── test-local.sh # Loopback self-test
│ ├── test-mikrotik.sh # Test against MikroTik device
│ ├── test-docker.sh # Docker container test
│ └── debug-capture.sh # Packet capture for debugging
├── docs/
│ ├── architecture.md # This file
│ ├── protocol.md # Protocol specification
│ ├── user-guide.md # Usage documentation
│ ├── docker.md # Docker & deployment guide
│ ├── ecsrp5-research.md # EC-SRP5 reverse-engineering notes
│ └── man/
│ └── btest.1 # Unix manual page (troff format)
├── tests/
│ ├── integration_test.rs # Basic server/client handshake tests
│ ├── ecsrp5_test.rs # EC-SRP5 authentication tests
│ └── full_integration_test.rs # Comprehensive: all protocols, IPv4/6, CSV, syslog
├── deploy/
│ └── syslog-ng-btest.conf # syslog-ng configuration for btest events
├── proto-test/ # Python EC-SRP5 prototype (research branch)
│ ├── btest_ecsrp5_client.py # Working Python btest EC-SRP5 client
│ ├── btest_mitm.py # MITM proxy for protocol analysis
│ └── elliptic_curves.py # Curve25519 Weierstrass (MarginResearch)
├── KNOWN_ISSUES.md # Known bugs and platform limitations
├── Dockerfile # Production Docker image (multi-stage)
├── Dockerfile.cross # Cross-compilation for Linux x86_64
├── docker-compose.yml # Docker Compose configuration
├── Cargo.toml # Rust package manifest
├── Cargo.lock # Dependency lock file
├── LICENSE # MIT License
└── btest-opensource/ # Original C implementation (git submodule)
```

View File

@@ -20,6 +20,9 @@ pub struct BandwidthState {
pub intervals: AtomicU32, pub intervals: AtomicU32,
/// Remote peer's CPU usage (received via status messages) /// Remote peer's CPU usage (received via status messages)
pub remote_cpu: AtomicU8, pub remote_cpu: AtomicU8,
/// Remaining byte budget (TX + RX combined). When this reaches 0 the test
/// stops immediately. u64::MAX means unlimited (default for non-pro server).
pub byte_budget: AtomicU64,
} }
impl BandwidthState { impl BandwidthState {
@@ -38,6 +41,7 @@ impl BandwidthState {
total_lost_packets: AtomicU64::new(0), total_lost_packets: AtomicU64::new(0),
intervals: AtomicU32::new(0), intervals: AtomicU32::new(0),
remote_cpu: AtomicU8::new(0), remote_cpu: AtomicU8::new(0),
byte_budget: AtomicU64::new(u64::MAX),
}) })
} }
@@ -50,6 +54,30 @@ impl BandwidthState {
self.intervals.fetch_add(1, Relaxed); self.intervals.fetch_add(1, Relaxed);
} }
/// Try to spend `amount` bytes from the budget. Returns `true` if allowed,
/// `false` if the budget is exhausted (and sets `running = false`).
#[inline]
pub fn spend_budget(&self, amount: u64) -> bool {
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
// Fast path: unlimited budget (non-pro server)
let current = self.byte_budget.load(Relaxed);
if current == u64::MAX {
return true;
}
if current < amount {
self.running.store(false, SeqCst);
return false;
}
self.byte_budget.fetch_sub(amount, Relaxed);
true
}
/// Set the byte budget (total bytes allowed for the entire test).
#[cfg(feature = "pro")]
pub fn set_budget(&self, budget: u64) {
self.byte_budget.store(budget, std::sync::atomic::Ordering::SeqCst);
}
/// Get summary for syslog reporting. /// Get summary for syslog reporting.
pub fn summary(&self) -> (u64, u64, u64, u32) { pub fn summary(&self) -> (u64, u64, u64, u32) {
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;

127
src/bin/client_only.rs Normal file
View File

@@ -0,0 +1,127 @@
//! btest-client: minimal bandwidth test client for embedded/OpenWrt systems.
//!
//! Stripped-down client that connects to MikroTik btest servers.
//! No server mode, no syslog, smaller binary footprint.
//!
//! Build: cargo build --profile release-small --bin btest-client
use clap::Parser;
use std::sync::atomic::Ordering;
#[derive(Parser)]
#[command(name = "btest-client", about = "MikroTik Bandwidth Test client", version)]
struct Cli {
/// Server address to connect to
#[arg(short = 'c', long = "client", required = true)]
host: String,
/// Transmit data (upload)
#[arg(short = 't', long = "transmit")]
transmit: bool,
/// Receive data (download)
#[arg(short = 'r', long = "receive")]
receive: bool,
/// Use UDP
#[arg(short = 'u', long = "udp")]
udp: bool,
/// Bandwidth limit (e.g., 100M)
#[arg(short = 'b', long = "bandwidth")]
bandwidth: Option<String>,
/// Port
#[arg(short = 'P', long = "port", default_value_t = 2000)]
port: u16,
/// Username
#[arg(short = 'a', long = "authuser")]
auth_user: Option<String>,
/// Password
#[arg(short = 'p', long = "authpass")]
auth_pass: Option<String>,
/// NAT mode
#[arg(short = 'n', long = "nat")]
nat: bool,
/// Duration in seconds (0=unlimited)
#[arg(short = 'd', long = "duration", default_value_t = 0)]
duration: u64,
/// Verbose
#[arg(short = 'v', long = "verbose", action = clap::ArgAction::Count)]
verbose: u8,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let filter = match cli.verbose {
0 => "info",
1 => "debug",
_ => "trace",
};
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(filter)),
)
.with_target(false)
.init();
btest_rs::cpu::start_sampler();
if !cli.transmit && !cli.receive {
eprintln!("Error: specify -t (transmit) and/or -r (receive)");
std::process::exit(1);
}
let direction = match (cli.transmit, cli.receive) {
(true, false) => btest_rs::protocol::CMD_DIR_RX,
(false, true) => btest_rs::protocol::CMD_DIR_TX,
(true, true) => btest_rs::protocol::CMD_DIR_BOTH,
_ => unreachable!(),
};
let bw = match &cli.bandwidth {
Some(b) => btest_rs::bandwidth::parse_bandwidth(b)?,
None => 0,
};
let (tx_speed, rx_speed) = match direction {
btest_rs::protocol::CMD_DIR_TX => (bw, 0),
btest_rs::protocol::CMD_DIR_RX => (0, bw),
_ => (bw, bw),
};
let state = btest_rs::bandwidth::BandwidthState::new();
let state_clone = state.clone();
let host = cli.host.clone();
let client_fut = btest_rs::client::run_client(
&host, cli.port, direction, cli.udp,
tx_speed, rx_speed,
cli.auth_user, cli.auth_pass, cli.nat,
state_clone,
);
if cli.duration > 0 {
match tokio::time::timeout(
std::time::Duration::from_secs(cli.duration),
client_fut,
).await {
Ok(r) => { let _ = r?; }
Err(_) => {
state.running.store(false, Ordering::SeqCst);
}
}
} else {
let _ = client_fut.await?;
}
Ok(())
}

62
src/bin/server_only.rs Normal file
View File

@@ -0,0 +1,62 @@
//! btest-server: minimal bandwidth test server for embedded/OpenWrt systems.
//!
//! Stripped-down server that accepts MikroTik client connections.
//! No client mode, no syslog, no CSV, smaller binary footprint.
//!
//! Build: cargo build --profile release-small --bin btest-server
use clap::Parser;
#[derive(Parser)]
#[command(name = "btest-server", about = "MikroTik Bandwidth Test server", version)]
struct Cli {
/// Port
#[arg(short = 'P', long = "port", default_value_t = 2000)]
port: u16,
/// IPv4 listen address
#[arg(long = "listen", default_value = "0.0.0.0")]
listen_addr: String,
/// Username
#[arg(short = 'a', long = "authuser")]
auth_user: Option<String>,
/// Password
#[arg(short = 'p', long = "authpass")]
auth_pass: Option<String>,
/// Use EC-SRP5 authentication
#[arg(long = "ecsrp5")]
ecsrp5: bool,
/// Verbose
#[arg(short = 'v', long = "verbose", action = clap::ArgAction::Count)]
verbose: u8,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let filter = match cli.verbose {
0 => "info",
1 => "debug",
_ => "trace",
};
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(filter)),
)
.with_target(false)
.init();
btest_rs::cpu::start_sampler();
let v4 = if cli.listen_addr.eq_ignore_ascii_case("none") { None } else { Some(cli.listen_addr) };
tracing::info!("btest-server starting on port {}", cli.port);
btest_rs::server::run_server(cli.port, cli.auth_user, cli.auth_pass, cli.ecsrp5, v4, None).await?;
Ok(())
}

View File

@@ -27,6 +27,11 @@ pub async fn run_client(
let mut stream = TcpStream::connect(&addr).await?; let mut stream = TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
// Set TCP socket buffers to 4MB for high throughput
let sock_ref = socket2::SockRef::from(&stream);
let _ = sock_ref.set_send_buffer_size(4 * 1024 * 1024);
let _ = sock_ref.set_recv_buffer_size(4 * 1024 * 1024);
recv_hello(&mut stream).await?; recv_hello(&mut stream).await?;
tracing::info!("Connected to server"); tracing::info!("Connected to server");
@@ -127,6 +132,12 @@ async fn run_tcp_test_client(stream: TcpStream, cmd: Command, state: Arc<Bandwid
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
tcp_client_rx_loop(reader, state_rx).await tcp_client_rx_loop(reader, state_rx).await
})) }))
} else if client_should_tx {
// TX-only: still need to read the server's status messages to get remote CPU.
// Don't count these bytes as RX data.
Some(tokio::spawn(async move {
tcp_client_status_reader(reader, state_rx).await
}))
} else { } else {
_reader_keepalive = Some(reader); _reader_keepalive = Some(reader);
None None
@@ -148,15 +159,17 @@ async fn tcp_client_tx_loop(
) { ) {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let packet = vec![0u8; tx_size]; // TCP data is all zeros
let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16); let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16);
// Use larger writes when running unlimited to reduce syscall overhead
let effective_size = if interval.is_none() { tx_size.max(256 * 1024) } else { tx_size };
let packet = vec![0u8; effective_size]; // TCP data is all zeros
let mut next_send = Instant::now(); let mut next_send = Instant::now();
while state.running.load(Ordering::Relaxed) { while state.running.load(Ordering::Relaxed) {
if writer.write_all(&packet).await.is_err() { if writer.write_all(&packet).await.is_err() {
break; break;
} }
state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed); state.tx_bytes.fetch_add(effective_size as u64, Ordering::Relaxed);
if state.tx_speed_changed.load(Ordering::Relaxed) { if state.tx_speed_changed.load(Ordering::Relaxed) {
state.tx_speed_changed.store(false, Ordering::Relaxed); state.tx_speed_changed.store(false, Ordering::Relaxed);
@@ -183,17 +196,59 @@ async fn tcp_client_rx_loop(
mut reader: tokio::net::tcp::OwnedReadHalf, mut reader: tokio::net::tcp::OwnedReadHalf,
state: Arc<BandwidthState>, state: Arc<BandwidthState>,
) { ) {
let mut buf = vec![0u8; 65536]; let mut buf = vec![0u8; 256 * 1024];
while state.running.load(Ordering::Relaxed) { while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await { match reader.read(&mut buf).await {
Ok(0) | Err(_) => break, Ok(0) | Err(_) => break,
Ok(n) => { Ok(n) => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
// Scan for interleaved 12-byte status messages from the server.
// In BOTH mode, the server's TX loop injects status messages into the
// data stream. Status starts with 0x07 (STATUS_MSG_TYPE) and byte 1
// has the high bit set (0x80 | cpu%). Data packets are all zeros.
if n >= STATUS_MSG_SIZE {
for i in 0..=(n - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
let cpu = buf[i + 1] & 0x7F;
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
break;
}
}
}
} }
} }
} }
} }
/// Read only status messages from the server (TX-only mode).
/// The server sends 12-byte status messages on the TCP connection even when
/// the client is only transmitting. We need to read them to get remote CPU
/// and to prevent the TCP receive buffer from filling up.
async fn tcp_client_status_reader(
mut reader: tokio::net::tcp::OwnedReadHalf,
state: Arc<BandwidthState>,
) {
let mut buf = [0u8; STATUS_MSG_SIZE];
while state.running.load(Ordering::Relaxed) {
match reader.read_exact(&mut buf).await {
Ok(_) => {
if buf[0] == STATUS_MSG_TYPE && buf[1] >= 0x80 {
let status = StatusMessage::deserialize(&buf);
state.remote_cpu.store(status.cpu_load, Ordering::Relaxed);
// Use server's bytes_received for TX speed adaptation
if status.bytes_received > 0 {
let new_speed =
((status.bytes_received as u64 * 8 * 3) / 2) as u32;
state.tx_speed.store(new_speed, Ordering::Relaxed);
state.tx_speed_changed.store(true, Ordering::Relaxed);
}
}
}
Err(_) => break,
}
}
}
// --- UDP Test Client --- // --- UDP Test Client ---
async fn run_udp_test_client( async fn run_udp_test_client(

View File

@@ -29,7 +29,7 @@ pub fn get() -> u8 {
// --- Platform-specific implementation --- // --- Platform-specific implementation ---
#[cfg(target_os = "linux")] #[cfg(any(target_os = "linux", target_os = "android"))]
fn get_cpu_times() -> (u64, u64) { fn get_cpu_times() -> (u64, u64) {
// Read /proc/stat: cpu user nice system idle iowait irq softirq steal // Read /proc/stat: cpu user nice system idle iowait irq softirq steal
if let Ok(content) = std::fs::read_to_string("/proc/stat") { if let Ok(content) = std::fs::read_to_string("/proc/stat") {
@@ -97,6 +97,7 @@ fn get_cpu_times() -> (u64, u64) {
fn get_cpu_times() -> (u64, u64) { fn get_cpu_times() -> (u64, u64) {
#[repr(C)] #[repr(C)]
#[derive(Default)] #[derive(Default)]
#[allow(non_snake_case)]
struct FILETIME { struct FILETIME {
dwLowDateTime: u32, dwLowDateTime: u32,
dwHighDateTime: u32, dwHighDateTime: u32,
@@ -165,6 +166,7 @@ fn get_cpu_times() -> (u64, u64) {
#[cfg(not(any( #[cfg(not(any(
target_os = "linux", target_os = "linux",
target_os = "android",
target_os = "macos", target_os = "macos",
target_os = "windows", target_os = "windows",
target_os = "freebsd", target_os = "freebsd",
@@ -193,6 +195,7 @@ mod tests {
// On supported platforms, total should be > 0 // On supported platforms, total should be > 0
if cfg!(any( if cfg!(any(
target_os = "linux", target_os = "linux",
target_os = "android",
target_os = "macos", target_os = "macos",
target_os = "windows", target_os = "windows",
target_os = "freebsd", target_os = "freebsd",

View File

@@ -6,6 +6,8 @@
//! //!
//! btest framing: `[len:1][payload]` (no 0x06 handler byte, unlike Winbox). //! btest framing: `[len:1][payload]` (no 0x06 handler byte, unlike Winbox).
use std::sync::LazyLock;
use num_bigint::BigUint; use num_bigint::BigUint;
use num_integer::Integer; use num_integer::Integer;
use num_traits::{One, Zero}; use num_traits::{One, Zero};
@@ -14,31 +16,31 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::protocol::{BtestError, Result}; use crate::protocol::{BtestError, Result};
// --- Curve25519 parameters in Weierstrass form --- // --- Curve25519 parameters in Weierstrass form (cached, computed once) ---
fn p() -> BigUint { static P: LazyLock<BigUint> = LazyLock::new(|| {
BigUint::parse_bytes( BigUint::parse_bytes(
b"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffed", b"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffed",
16, 16,
) )
.unwrap() .unwrap()
} });
fn curve_order() -> BigUint { static CURVE_ORDER: LazyLock<BigUint> = LazyLock::new(|| {
BigUint::parse_bytes( BigUint::parse_bytes(
b"1000000000000000000000000000000014def9dea2f79cd65812631a5cf5d3ed", b"1000000000000000000000000000000014def9dea2f79cd65812631a5cf5d3ed",
16, 16,
) )
.unwrap() .unwrap()
} });
fn weierstrass_a() -> BigUint { static WEIERSTRASS_A: LazyLock<BigUint> = LazyLock::new(|| {
BigUint::parse_bytes( BigUint::parse_bytes(
b"2aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa984914a144", b"2aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa984914a144",
16, 16,
) )
.unwrap() .unwrap()
} });
const MONT_A: u64 = 486662; const MONT_A: u64 = 486662;
@@ -50,10 +52,10 @@ fn modinv(a: &BigUint, modulus: &BigUint) -> BigUint {
a.modpow(&exp, modulus) a.modpow(&exp, modulus)
} }
fn legendre_symbol(a: &BigUint, p_val: &BigUint) -> i32 { fn legendre_symbol(a: &BigUint, p: &BigUint) -> i32 {
let exp = (p_val - BigUint::one()) / BigUint::from(2u32); let exp = (p - BigUint::one()) / BigUint::from(2u32);
let l = a.modpow(&exp, p_val); let l = a.modpow(&exp, p);
if l == p_val - BigUint::one() { if l == p - BigUint::one() {
-1 -1
} else if l == BigUint::zero() { } else if l == BigUint::zero() {
0 0
@@ -166,7 +168,7 @@ impl Point {
} }
fn add(&self, other: &Point) -> Point { fn add(&self, other: &Point) -> Point {
let p_val = p(); let p_val = &*P;
if self.infinity { if self.infinity {
return other.clone(); return other.clone();
} }
@@ -179,44 +181,44 @@ impl Point {
let lam = if self.x == other.x && self.y == other.y { let lam = if self.x == other.x && self.y == other.y {
// Point doubling // Point doubling
let three_x_sq = (BigUint::from(3u32) * &self.x * &self.x + &weierstrass_a()) % &p_val; let three_x_sq = (BigUint::from(3u32) * &self.x * &self.x + &*WEIERSTRASS_A) % p_val;
let two_y = (BigUint::from(2u32) * &self.y) % &p_val; let two_y = (BigUint::from(2u32) * &self.y) % p_val;
(three_x_sq * modinv(&two_y, &p_val)) % &p_val (three_x_sq * modinv(&two_y, p_val)) % p_val
} else { } else {
// Point addition // Point addition
let dy = if other.y >= self.y { let dy = if other.y >= self.y {
(&other.y - &self.y) % &p_val (&other.y - &self.y) % p_val
} else { } else {
(&p_val - (&self.y - &other.y) % &p_val) % &p_val (p_val - (&self.y - &other.y) % p_val) % p_val
}; };
let dx = if other.x >= self.x { let dx = if other.x >= self.x {
(&other.x - &self.x) % &p_val (&other.x - &self.x) % p_val
} else { } else {
(&p_val - (&self.x - &other.x) % &p_val) % &p_val (p_val - (&self.x - &other.x) % p_val) % p_val
}; };
(dy * modinv(&dx, &p_val)) % &p_val (dy * modinv(&dx, p_val)) % p_val
}; };
let x3 = { let x3 = {
let lam_sq = (&lam * &lam) % &p_val; let lam_sq = (&lam * &lam) % p_val;
let sum_x = (&self.x + &other.x) % &p_val; let sum_x = (&self.x + &other.x) % p_val;
if lam_sq >= sum_x { if lam_sq >= sum_x {
(lam_sq - sum_x) % &p_val (lam_sq - sum_x) % p_val
} else { } else {
(&p_val - (sum_x - lam_sq) % &p_val) % &p_val (p_val - (sum_x - lam_sq) % p_val) % p_val
} }
}; };
let y3 = { let y3 = {
let dx = if self.x >= x3 { let dx = if self.x >= x3 {
(&self.x - &x3) % &p_val (&self.x - &x3) % p_val
} else { } else {
(&p_val - (&x3 - &self.x) % &p_val) % &p_val (p_val - (&x3 - &self.x) % p_val) % p_val
}; };
let prod = (&lam * dx) % &p_val; let prod = (&lam * dx) % p_val;
if prod >= self.y { if prod >= self.y {
(prod - &self.y) % &p_val (prod - &self.y) % p_val
} else { } else {
(&p_val - (&self.y - prod) % &p_val) % &p_val (p_val - (&self.y - prod) % p_val) % p_val
} }
}; };
@@ -226,14 +228,13 @@ impl Point {
fn scalar_mul(&self, scalar: &BigUint) -> Point { fn scalar_mul(&self, scalar: &BigUint) -> Point {
let mut result = Point::infinity(); let mut result = Point::infinity();
let mut base = self.clone(); let mut base = self.clone();
let mut k = scalar.clone(); let bits = scalar.bits();
while !k.is_zero() { for i in 0..bits {
if &k & &BigUint::one() == BigUint::one() { if scalar.bit(i) {
result = result.add(&base); result = result.add(&base);
} }
base = base.add(&base); base = base.add(&base);
k >>= 1;
} }
result result
} }
@@ -249,11 +250,11 @@ struct WCurve {
impl WCurve { impl WCurve {
fn new() -> Self { fn new() -> Self {
let p_val = p(); let p_val = &*P;
let mont_a = BigUint::from(MONT_A); let mont_a = BigUint::from(MONT_A);
let three_inv = modinv(&BigUint::from(3u32), &p_val); let three_inv = modinv(&BigUint::from(3u32), p_val);
let conversion_from_m = (&mont_a * &three_inv) % &p_val; let conversion_from_m = (&mont_a * &three_inv) % p_val;
let conversion_to_m = (&p_val - &conversion_from_m) % &p_val; let conversion_to_m = (p_val - &conversion_from_m) % p_val;
let mut curve = WCurve { let mut curve = WCurve {
g: Point::infinity(), g: Point::infinity(),
@@ -265,8 +266,8 @@ impl WCurve {
} }
fn to_montgomery(&self, pt: &Point) -> ([u8; 32], u8) { fn to_montgomery(&self, pt: &Point) -> ([u8; 32], u8) {
let p_val = p(); let p_val = &*P;
let x = (&pt.x + &self.conversion_to_m) % &p_val; let x = (&pt.x + &self.conversion_to_m) % p_val;
let parity = if pt.y.bit(0) { 1u8 } else { 0u8 }; let parity = if pt.y.bit(0) { 1u8 } else { 0u8 };
let mut bytes = [0u8; 32]; let mut bytes = [0u8; 32];
let x_bytes = x.to_bytes_be(); let x_bytes = x.to_bytes_be();
@@ -276,14 +277,14 @@ impl WCurve {
} }
fn lift_x(&self, x_mont: &BigUint, parity: bool) -> Point { fn lift_x(&self, x_mont: &BigUint, parity: bool) -> Point {
let p_val = p(); let p_val = &*P;
let x = x_mont % &p_val; let x = x_mont % p_val;
// y^2 = x^3 + Ax^2 + x (Montgomery) // y^2 = x^3 + Ax^2 + x (Montgomery)
let y_squared = (&x * &x * &x + BigUint::from(MONT_A) * &x * &x + &x) % &p_val; let y_squared = (&x * &x * &x + BigUint::from(MONT_A) * &x * &x + &x) % p_val;
// Convert x to Weierstrass // Convert x to Weierstrass
let x_w = (&x + &self.conversion_from_m) % &p_val; let x_w = (&x + &self.conversion_from_m) % p_val;
if let Some((y1, y2)) = prime_mod_sqrt(&y_squared, &p_val) { if let Some((y1, y2)) = prime_mod_sqrt(&y_squared, p_val) {
let pt1 = Point::new(x_w.clone(), y1); let pt1 = Point::new(x_w.clone(), y1);
let pt2 = Point::new(x_w, y2); let pt2 = Point::new(x_w, y2);
if parity { if parity {
@@ -323,7 +324,7 @@ impl WCurve {
password: &str, password: &str,
salt: &[u8; 16], salt: &[u8; 16],
) -> [u8; 32] { ) -> [u8; 32] {
let inner = sha256_bytes(&format!("{}:{}", username, password).as_bytes().to_vec()); let inner = sha256_bytes(format!("{}:{}", username, password).as_bytes());
let mut input = Vec::with_capacity(16 + 32); let mut input = Vec::with_capacity(16 + 32);
input.extend_from_slice(salt); input.extend_from_slice(salt);
input.extend_from_slice(&inner); input.extend_from_slice(&inner);
@@ -415,8 +416,8 @@ pub async fn client_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
let i_int = BigUint::from_bytes_be(&i); let i_int = BigUint::from_bytes_be(&i);
let j_int = BigUint::from_bytes_be(&j); let j_int = BigUint::from_bytes_be(&j);
let s_a_int = BigUint::from_bytes_be(&s_a); let s_a_int = BigUint::from_bytes_be(&s_a);
let order = curve_order(); let order = &*CURVE_ORDER;
let scalar = ((&i_int * &j_int) + &s_a_int) % &order; let scalar = ((&i_int * &j_int) + &s_a_int) % order;
let z_point = w_b_unblinded.scalar_mul(&scalar); let z_point = w_b_unblinded.scalar_mul(&scalar);
let (z, _) = w.to_montgomery(&z_point); let (z, _) = w.to_montgomery(&z_point);

View File

@@ -135,6 +135,11 @@ async fn handle_client(
) -> Result<()> { ) -> Result<()> {
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
// Set TCP socket buffers to 4MB (matching UDP path) for high throughput
let sock_ref = socket2::SockRef::from(&stream);
let _ = sock_ref.set_send_buffer_size(4 * 1024 * 1024);
let _ = sock_ref.set_recv_buffer_size(4 * 1024 * 1024);
send_hello(&mut stream).await?; send_hello(&mut stream).await?;
// Read 16-byte command (or whatever the client sends) // Read 16-byte command (or whatever the client sends)
@@ -366,8 +371,29 @@ async fn handle_client(
// --- TCP Test Server --- // --- TCP Test Server ---
/// Public TX task for multi-connection use by server_pro.
#[cfg(feature = "pro")]
pub async fn tcp_tx_task(
writer: tokio::net::tcp::OwnedWriteHalf,
tx_size: usize,
tx_speed: u32,
state: Arc<BandwidthState>,
) {
tcp_tx_loop(writer, tx_size, tx_speed, state).await;
}
/// Public RX task for multi-connection use by server_pro.
#[cfg(feature = "pro")]
pub async fn tcp_rx_task(
reader: tokio::net::tcp::OwnedReadHalf,
state: Arc<BandwidthState>,
) {
tcp_rx_loop(reader, state).await;
}
/// Run a TCP bandwidth test on an already-authenticated stream. /// Run a TCP bandwidth test on an already-authenticated stream.
/// Public API for use by server_pro. /// Public API for use by server_pro.
#[cfg(feature = "pro")]
pub async fn run_tcp_test( pub async fn run_tcp_test(
stream: TcpStream, stream: TcpStream,
cmd: Command, cmd: Command,
@@ -451,9 +477,23 @@ async fn run_tcp_test_inner(stream: TcpStream, cmd: Command, state: Arc<Bandwidt
Ok(state.summary()) Ok(state.summary())
} }
/// Public API for multi-connection TCP test with external state. Used by server_pro.
#[cfg(feature = "pro")]
pub async fn run_tcp_multiconn_test(
streams: Vec<TcpStream>,
cmd: Command,
state: Arc<BandwidthState>,
) -> Result<(u64, u64, u64, u32)> {
run_tcp_multiconn_inner(streams, cmd, state).await
}
/// TCP multi-connection. /// TCP multi-connection.
async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<(u64, u64, u64, u32)> { async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<(u64, u64, u64, u32)> {
let state = BandwidthState::new(); let state = BandwidthState::new();
run_tcp_multiconn_inner(streams, cmd, state).await
}
async fn run_tcp_multiconn_inner(streams: Vec<TcpStream>, cmd: Command, state: Arc<BandwidthState>) -> Result<(u64, u64, u64, u32)> {
let tx_size = cmd.tx_size as usize; let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx(); let server_should_tx = cmd.server_tx();
let server_should_rx = cmd.server_rx(); let server_should_rx = cmd.server_rx();
@@ -540,8 +580,10 @@ async fn tcp_tx_loop_inner(
) { ) {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let packet = vec![0u8; tx_size];
let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16); let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16);
// Use larger writes when running unlimited to reduce syscall overhead
let effective_size = if interval.is_none() { tx_size.max(256 * 1024) } else { tx_size };
let packet = vec![0u8; effective_size];
let mut next_send = Instant::now(); let mut next_send = Instant::now();
let mut next_status = Instant::now() + Duration::from_secs(1); let mut next_status = Instant::now() + Duration::from_secs(1);
let mut status_seq: u32 = 0; let mut status_seq: u32 = 0;
@@ -564,11 +606,14 @@ async fn tcp_tx_loop_inner(
next_status = Instant::now() + Duration::from_secs(1); next_status = Instant::now() + Duration::from_secs(1);
} }
if !state.spend_budget(effective_size as u64) {
break;
}
if writer.write_all(&packet).await.is_err() { if writer.write_all(&packet).await.is_err() {
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
break; break;
} }
state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed); state.tx_bytes.fetch_add(effective_size as u64, Ordering::Relaxed);
if state.tx_speed_changed.load(Ordering::Relaxed) { if state.tx_speed_changed.load(Ordering::Relaxed) {
state.tx_speed_changed.store(false, Ordering::Relaxed); state.tx_speed_changed.store(false, Ordering::Relaxed);
@@ -592,7 +637,7 @@ async fn tcp_tx_loop_inner(
} }
async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<BandwidthState>) { async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536]; let mut buf = vec![0u8; 256 * 1024];
while state.running.load(Ordering::Relaxed) { while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await { match reader.read(&mut buf).await {
Ok(0) | Err(_) => { Ok(0) | Err(_) => {
@@ -600,6 +645,9 @@ async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<Band
break; break;
} }
Ok(n) => { Ok(n) => {
if !state.spend_budget(n as u64) {
break;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
} }
} }
@@ -649,6 +697,7 @@ async fn tcp_status_sender(
/// Run a UDP bandwidth test on an already-authenticated stream. /// Run a UDP bandwidth test on an already-authenticated stream.
/// Public API for use by server_pro. Caller provides the UDP port offset. /// Public API for use by server_pro. Caller provides the UDP port offset.
#[cfg(feature = "pro")]
pub async fn run_udp_test( pub async fn run_udp_test(
stream: &mut TcpStream, stream: &mut TcpStream,
peer: SocketAddr, peer: SocketAddr,
@@ -796,6 +845,10 @@ async fn udp_tx_loop(
let mut consecutive_errors: u32 = 0; let mut consecutive_errors: u32 = 0;
while state.running.load(Ordering::Relaxed) { while state.running.load(Ordering::Relaxed) {
if !state.spend_budget(tx_size as u64) {
break;
}
packet[0..4].copy_from_slice(&seq.to_be_bytes()); packet[0..4].copy_from_slice(&seq.to_be_bytes());
let result = if multi_conn { let result = if multi_conn {
@@ -871,6 +924,9 @@ async fn udp_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
// (multi-connection MikroTik sends from multiple ports) // (multi-connection MikroTik sends from multiple ports)
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await { match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
Ok(Ok((n, _src))) if n >= 4 => { Ok(Ok((n, _src))) if n >= 4 => {
if !state.spend_budget(n as u64) {
break;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed); state.rx_packets.fetch_add(1, Ordering::Relaxed);

View File

@@ -342,4 +342,70 @@ mod tests {
let (ip_in, ip_out) = db.get_ip_daily_usage("127.0.0.1").unwrap(); let (ip_in, ip_out) = db.get_ip_daily_usage("127.0.0.1").unwrap();
assert!(ip_in + ip_out > 0, "IP usage should be recorded"); assert!(ip_in + ip_out > 0, "IP usage should be recorded");
} }
#[test]
fn test_remaining_budget_calculation() {
let (db, qm) = setup_test_db();
let ip: IpAddr = "10.0.0.1".parse().unwrap();
// No usage yet: budget = min(daily=1000, weekly=5000, monthly=10000, ip_daily=500, ...)
// IP daily combined = 500 is the smallest
let budget = qm.remaining_budget("testuser", &ip);
assert_eq!(budget, 500, "budget should be min of all limits (ip_daily=500)");
// Use record_usage which properly records combined + directional
// inbound=200, outbound=200 → combined = 400
qm.record_usage("testuser", "10.0.0.1", 200, 200);
// IP daily combined: 500 - 400 = 100 remaining
// IP daily inbound: 500 - 200 = 300 remaining
// IP daily outbound: 500 - 200 = 300 remaining
// User daily: 1000 - 400 = 600 remaining
let budget = qm.remaining_budget("testuser", &ip);
assert_eq!(budget, 100, "budget should reflect IP combined remaining (100)");
}
#[test]
fn test_budget_zero_when_exhausted() {
let (db, qm) = setup_test_db();
let ip: IpAddr = "10.0.0.2".parse().unwrap();
// Exhaust user daily quota (1000 bytes)
db.record_usage("testuser", 600, 500).unwrap(); // 1100 > 1000
let budget = qm.remaining_budget("testuser", &ip);
assert_eq!(budget, 0, "budget should be 0 when user daily quota is exhausted");
}
#[test]
fn test_byte_budget_stops_transfer() {
let state = BandwidthState::new();
// Set a 1000-byte budget
state.set_budget(1000);
// Spend 500 bytes — should succeed
assert!(state.spend_budget(500));
// Spend another 400 — should succeed (100 remaining)
assert!(state.spend_budget(400));
// Spend 200 — should fail (only 100 remaining)
assert!(!state.spend_budget(200));
// running should be false
assert!(!state.running.load(Ordering::Relaxed));
}
#[test]
fn test_unlimited_budget_always_succeeds() {
let state = BandwidthState::new();
// Default budget is u64::MAX (unlimited)
// Should always succeed
for _ in 0..1000 {
assert!(state.spend_budget(1_000_000_000));
}
assert!(state.running.load(Ordering::Relaxed));
}
} }

View File

@@ -15,6 +15,22 @@ pub struct LdapAuth {
config: LdapConfig, config: LdapConfig,
} }
/// Escape special characters in LDAP filter values per RFC 4515.
fn ldap_escape(input: &str) -> String {
let mut out = String::with_capacity(input.len());
for c in input.chars() {
match c {
'\\' => out.push_str("\\5c"),
'*' => out.push_str("\\2a"),
'(' => out.push_str("\\28"),
')' => out.push_str("\\29"),
'\0' => out.push_str("\\00"),
_ => out.push(c),
}
}
out
}
impl LdapAuth { impl LdapAuth {
pub fn new(config: LdapConfig) -> Self { pub fn new(config: LdapConfig) -> Self {
Self { config } Self { config }
@@ -26,6 +42,8 @@ impl LdapAuth {
let (conn, mut ldap) = LdapConnAsync::new(&self.config.url).await?; let (conn, mut ldap) = LdapConnAsync::new(&self.config.url).await?;
ldap3::drive!(conn); ldap3::drive!(conn);
let safe_username = ldap_escape(username);
// If service account configured, bind first to search for user DN // If service account configured, bind first to search for user DN
let user_dn = if let (Some(ref bind_dn), Some(ref bind_pass)) = let user_dn = if let (Some(ref bind_dn), Some(ref bind_pass)) =
(&self.config.bind_dn, &self.config.bind_pass) (&self.config.bind_dn, &self.config.bind_pass)
@@ -39,7 +57,7 @@ impl LdapAuth {
// Search for the user // Search for the user
let filter = format!( let filter = format!(
"(&(objectClass=person)(|(uid={})(sAMAccountName={})(cn={})))", "(&(objectClass=person)(|(uid={})(sAMAccountName={})(cn={})))",
username, username, username safe_username, safe_username, safe_username
); );
let (results, _) = ldap let (results, _) = ldap
.search(&self.config.base_dn, Scope::Subtree, &filter, vec!["dn"]) .search(&self.config.base_dn, Scope::Subtree, &filter, vec!["dn"])
@@ -51,11 +69,17 @@ impl LdapAuth {
return Ok(false); return Ok(false);
} }
let entry = SearchEntry::construct(results.into_iter().next().unwrap()); let entry = match results.into_iter().next() {
Some(r) => SearchEntry::construct(r),
None => {
tracing::debug!("LDAP user not found: {}", username);
return Ok(false);
}
};
entry.dn entry.dn
} else { } else {
// No service account — construct DN directly // No service account — construct DN directly
format!("uid={},{}", username, self.config.base_dn) format!("uid={},{}", safe_username, self.config.base_dn)
}; };
// Attempt user bind // Attempt user bind

View File

@@ -371,18 +371,92 @@ impl QuotaManager {
tracing::error!("Failed to record user usage for {}: {}", username, e); tracing::error!("Failed to record user usage for {}: {}", username, e);
} }
// Record combined IP usage. // Record IP usage — record_ip_usage already writes both the
// inbound_bytes and outbound_bytes columns in one operation.
// Do NOT also call record_ip_inbound_usage/record_ip_outbound_usage
// as they update the same columns and would double-count.
if let Err(e) = self.db.record_ip_usage(ip, outbound_bytes, inbound_bytes) { if let Err(e) = self.db.record_ip_usage(ip, outbound_bytes, inbound_bytes) {
tracing::error!("Failed to record IP usage for {}: {}", ip, e); tracing::error!("Failed to record IP usage for {}: {}", ip, e);
} }
}
// Record directional IP usage for the new per-direction columns. /// Calculate the remaining byte budget for a user+IP combination.
if let Err(e) = self.db.record_ip_inbound_usage(ip, inbound_bytes) { /// Returns the minimum remaining quota across all applicable limits.
tracing::error!("Failed to record IP inbound usage for {}: {}", ip, e); /// Used to set `BandwidthState::byte_budget` before a test starts,
/// preventing overshoot beyond quota boundaries.
pub fn remaining_budget(&self, username: &str, ip: &IpAddr) -> u64 {
let mut budget = u64::MAX;
let ip_str = ip.to_string();
// Helper: min that ignores 0 (unlimited)
let cap = |budget: &mut u64, limit: u64, used: u64| {
if limit > 0 {
let remaining = limit.saturating_sub(used);
*budget = (*budget).min(remaining);
}
};
// User quotas (combined tx+rx)
if let Ok(Some(user)) = self.db.get_user(username) {
let daily_limit = if user.daily_quota > 0 { user.daily_quota as u64 } else { self.default_daily };
if daily_limit > 0 {
let (tx, rx) = self.db.get_daily_usage(username).unwrap_or((0, 0));
cap(&mut budget, daily_limit, tx + rx);
}
let weekly_limit = if user.weekly_quota > 0 { user.weekly_quota as u64 } else { self.default_weekly };
if weekly_limit > 0 {
let (tx, rx) = self.db.get_weekly_usage(username).unwrap_or((0, 0));
cap(&mut budget, weekly_limit, tx + rx);
}
if self.default_monthly > 0 {
let (tx, rx) = self.db.get_monthly_usage(username).unwrap_or((0, 0));
cap(&mut budget, self.default_monthly, tx + rx);
}
} }
if let Err(e) = self.db.record_ip_outbound_usage(ip, outbound_bytes) {
tracing::error!("Failed to record IP outbound usage for {}: {}", ip, e); // IP combined quotas
if self.ip_daily > 0 {
let (tx, rx) = self.db.get_ip_daily_usage(&ip_str).unwrap_or((0, 0));
cap(&mut budget, self.ip_daily, tx + rx);
} }
if self.ip_weekly > 0 {
let (tx, rx) = self.db.get_ip_weekly_usage(&ip_str).unwrap_or((0, 0));
cap(&mut budget, self.ip_weekly, tx + rx);
}
if self.ip_monthly > 0 {
let (tx, rx) = self.db.get_ip_monthly_usage(&ip_str).unwrap_or((0, 0));
cap(&mut budget, self.ip_monthly, tx + rx);
}
// IP directional quotas — use inbound + outbound as combined ceiling
if self.ip_daily_inbound > 0 {
let used = self.db.get_ip_daily_inbound(&ip_str).unwrap_or(0);
cap(&mut budget, self.ip_daily_inbound, used);
}
if self.ip_daily_outbound > 0 {
let used = self.db.get_ip_daily_outbound(&ip_str).unwrap_or(0);
cap(&mut budget, self.ip_daily_outbound, used);
}
if self.ip_weekly_inbound > 0 {
let used = self.db.get_ip_weekly_inbound(&ip_str).unwrap_or(0);
cap(&mut budget, self.ip_weekly_inbound, used);
}
if self.ip_weekly_outbound > 0 {
let used = self.db.get_ip_weekly_outbound(&ip_str).unwrap_or(0);
cap(&mut budget, self.ip_weekly_outbound, used);
}
if self.ip_monthly_inbound > 0 {
let used = self.db.get_ip_monthly_inbound(&ip_str).unwrap_or(0);
cap(&mut budget, self.ip_monthly_inbound, used);
}
if self.ip_monthly_outbound > 0 {
let used = self.db.get_ip_monthly_outbound(&ip_str).unwrap_or(0);
cap(&mut budget, self.ip_monthly_outbound, used);
}
budget
} }
pub fn max_duration(&self) -> u64 { pub fn max_duration(&self) -> u64 {

View File

@@ -2,14 +2,18 @@
//! //!
//! Wraps the standard btest server connection handler with: //! Wraps the standard btest server connection handler with:
//! - Pre-connection IP/user quota checks //! - Pre-connection IP/user quota checks
//! - MD5 challenge-response auth against user DB
//! - TCP multi-connection session support
//! - Mid-session quota enforcement via QuotaEnforcer //! - Mid-session quota enforcement via QuotaEnforcer
//! - Post-session usage recording //! - Post-session usage recording
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use btest_rs::protocol::*; use btest_rs::protocol::*;
use btest_rs::bandwidth::BandwidthState; use btest_rs::bandwidth::BandwidthState;
@@ -18,22 +22,27 @@ use super::enforcer::{QuotaEnforcer, StopReason};
use super::quota::{Direction, QuotaManager}; use super::quota::{Direction, QuotaManager};
use super::user_db::UserDb; use super::user_db::UserDb;
/// Pending TCP multi-connection session.
struct TcpSession {
peer_ip: std::net::IpAddr,
username: String,
cmd: Command,
streams: Vec<TcpStream>,
expected: u8,
}
type SessionMap = Arc<Mutex<HashMap<u16, TcpSession>>>;
/// Run the pro server with quota enforcement. /// Run the pro server with quota enforcement.
pub async fn run_pro_server( pub async fn run_pro_server(
port: u16, port: u16,
ecsrp5: bool, _ecsrp5: bool,
listen_v4: Option<String>, listen_v4: Option<String>,
listen_v6: Option<String>, listen_v6: Option<String>,
db: UserDb, db: UserDb,
quota_mgr: QuotaManager, quota_mgr: QuotaManager,
quota_check_interval: u64, quota_check_interval: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Pre-derive EC-SRP5 creds if needed
// For pro server, we don't use CLI -a/-p — we use the user DB
// EC-SRP5 needs a fixed password for the server challenge, but
// the actual verification happens against the DB.
// For now, the first user in the DB is used for EC-SRP5 derivation.
let v4_listener = if let Some(ref addr) = listen_v4 { let v4_listener = if let Some(ref addr) = listen_v4 {
let bind_addr = format!("{}:{}", addr, port); let bind_addr = format!("{}:{}", addr, port);
Some(TcpListener::bind(&bind_addr).await?) Some(TcpListener::bind(&bind_addr).await?)
@@ -52,6 +61,8 @@ pub async fn run_pro_server(
anyhow::bail!("No listeners bound"); anyhow::bail!("No listeners bound");
} }
let sessions: SessionMap = Arc::new(Mutex::new(HashMap::new()));
tracing::info!("btest-server-pro ready, accepting connections"); tracing::info!("btest-server-pro ready, accepting connections");
loop { loop {
@@ -69,29 +80,14 @@ pub async fn run_pro_server(
tracing::info!("New connection from {}", peer); tracing::info!("New connection from {}", peer);
// Pre-connection IP check
if let Err(e) = quota_mgr.check_ip(&peer.ip(), Direction::Both) {
tracing::warn!("Rejected {} — {}", peer, e);
btest_rs::syslog_logger::auth_failure(
&peer.to_string(), "-", "-", &format!("{}", e),
);
// Send a quick rejection and close
let mut s = stream;
let _ = s.write_all(&HELLO).await;
drop(s);
continue;
}
quota_mgr.connect(&peer.ip());
let db = db.clone(); let db = db.clone();
let qm = quota_mgr.clone(); let qm = quota_mgr.clone();
let qm_disconnect = quota_mgr.clone();
let interval = quota_check_interval; let interval = quota_check_interval;
let sess = sessions.clone();
tokio::spawn(async move { tokio::spawn(async move {
match handle_pro_client(stream, peer, db, qm, interval).await { let is_primary = match handle_pro_connection(stream, peer, db, qm.clone(), interval, sess).await {
Ok((username, stop_reason, tx, rx)) => { Ok(Some((username, stop_reason, tx, rx))) => {
tracing::info!( tracing::info!(
"Client {} (user '{}') finished: {} (tx={}, rx={})", "Client {} (user '{}') finished: {} (tx={}, rx={})",
peer, username, stop_reason, tx, rx, peer, username, stop_reason, tx, rx,
@@ -100,31 +96,100 @@ pub async fn run_pro_server(
&peer.to_string(), "btest", &format!("{}", stop_reason), &peer.to_string(), "btest", &format!("{}", stop_reason),
tx, rx, 0, 0, tx, rx, 0, 0,
); );
true
} }
Ok(None) => false, // secondary connection or pending multi-conn
Err(e) => { Err(e) => {
tracing::error!("Client {} error: {}", peer, e); tracing::error!("Client {} error: {}", peer, e);
true
} }
};
// Only decrement connection count for primary connections
if is_primary {
qm.disconnect(&peer.ip());
} }
qm_disconnect.disconnect(&peer.ip());
}); });
} }
} }
async fn handle_pro_client( /// Handle a single TCP connection. Returns None for secondary multi-conn joins.
async fn handle_pro_connection(
mut stream: TcpStream, mut stream: TcpStream,
peer: SocketAddr, peer: SocketAddr,
db: UserDb, db: UserDb,
quota_mgr: QuotaManager, quota_mgr: QuotaManager,
quota_check_interval: u64, quota_check_interval: u64,
) -> anyhow::Result<(String, StopReason, u64, u64)> { sessions: SessionMap,
) -> anyhow::Result<Option<(String, StopReason, u64, u64)>> {
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
// HELLO // HELLO
stream.write_all(&HELLO).await?; stream.write_all(&HELLO).await?;
// Read command // Read command (or session token for secondary connections)
let mut cmd_buf = [0u8; 16]; let mut cmd_buf = [0u8; 16];
stream.read_exact(&mut cmd_buf).await?; stream.read_exact(&mut cmd_buf).await?;
// Check if this is a secondary connection joining an existing TCP session
// Secondary connections send [HI, LO, ...] matching an existing session token
{
let potential_token = u16::from_be_bytes([cmd_buf[0], cmd_buf[1]]);
let mut map = sessions.lock().await;
if let Some(session) = map.get_mut(&potential_token) {
if session.peer_ip == peer.ip()
&& session.streams.len() < session.expected as usize
{
tracing::info!(
"Secondary connection from {} joining session (token={:04x}, {}/{})",
peer, potential_token,
session.streams.len() + 1, session.expected,
);
// Auth the secondary connection with same token response
let ok = [0x01, cmd_buf[0], cmd_buf[1], 0x00];
stream.write_all(&ok).await?;
stream.flush().await?;
session.streams.push(stream);
// If all connections have joined, start the test
if session.streams.len() >= session.expected as usize {
let session = map.remove(&potential_token).unwrap();
let db2 = db.clone();
let qm2 = quota_mgr.clone();
tokio::spawn(async move {
match run_pro_multiconn_test(
session.streams, session.cmd, peer,
&session.username, db2, qm2, quota_check_interval,
).await {
Ok((stop, tx, rx)) => {
tracing::info!(
"Multi-conn {} (user '{}') finished: {} (tx={}, rx={})",
peer, session.username, stop, tx, rx,
);
}
Err(e) => {
tracing::error!("Multi-conn {} error: {}", peer, e);
}
}
});
}
return Ok(None);
}
}
}
// Primary connection — check IP quota/connection limit now
if let Err(e) = quota_mgr.check_ip(&peer.ip(), Direction::Both) {
tracing::warn!("Rejected {} — {}", peer, e);
btest_rs::syslog_logger::auth_failure(
&peer.to_string(), "-", "-", &format!("{}", e),
);
return Ok(None);
}
quota_mgr.connect(&peer.ip());
let cmd = Command::deserialize(&cmd_buf); let cmd = Command::deserialize(&cmd_buf);
tracing::info!( tracing::info!(
@@ -136,14 +201,25 @@ async fn handle_pro_client(
cmd.tx_size, cmd.tx_size,
); );
// Authenticate — use MD5 auth with DB verification // Build auth OK response with session token for multi-connection
// Send AUTH_REQUIRED let is_tcp_multi = !cmd.is_udp() && cmd.tcp_conn_count > 0;
let session_token: u16 = if is_tcp_multi {
rand::random::<u16>() | 0x0101 // ensure both bytes non-zero
} else {
0
};
let ok_response: [u8; 4] = if is_tcp_multi {
[0x01, (session_token >> 8) as u8, (session_token & 0xFF) as u8, 0x00]
} else {
AUTH_OK
};
// Authenticate — MD5 challenge-response against DB
stream.write_all(&AUTH_REQUIRED).await?; stream.write_all(&AUTH_REQUIRED).await?;
let challenge = btest_rs::auth::generate_challenge(); let challenge = btest_rs::auth::generate_challenge();
stream.write_all(&challenge).await?; stream.write_all(&challenge).await?;
stream.flush().await?; stream.flush().await?;
// Read response
let mut response = [0u8; 48]; let mut response = [0u8; 48];
stream.read_exact(&mut response).await?; stream.read_exact(&mut response).await?;
@@ -176,17 +252,21 @@ async fn handle_pro_client(
anyhow::bail!("User disabled"); anyhow::bail!("User disabled");
} }
// Verify MD5 hash against stored password hash // Verify MD5 hash against stored raw password
// We need to compute the expected hash using the user's password if let Ok(Some(raw_pass)) = db.get_password(&username) {
// But we only store SHA256(user:pass), not the raw password. let expected_hash = btest_rs::auth::compute_auth_hash(&raw_pass, &challenge);
// For MD5 auth, we need the raw password to compute MD5(pass + challenge). if received_hash != expected_hash {
// This is a limitation — MD5 auth needs the raw password. tracing::warn!("Auth failed: password mismatch for user '{}'", username);
// For now, accept any authenticated user (the hash verification stream.write_all(&AUTH_FAILED).await?;
// happens on the client side with MikroTik). btest_rs::syslog_logger::auth_failure(
// TODO: Store password in a reversible form or use EC-SRP5 only. &peer.to_string(), &username, "md5", "password mismatch",
);
anyhow::bail!("Auth failed");
}
}
// If no raw password stored, accept (backwards compat with old DB entries)
// Send AUTH_OK stream.write_all(&ok_response).await?;
stream.write_all(&AUTH_OK).await?;
stream.flush().await?; stream.flush().await?;
tracing::info!("Auth successful for user '{}'", username); tracing::info!("Auth successful for user '{}'", username);
@@ -202,79 +282,168 @@ async fn handle_pro_client(
btest_rs::syslog_logger::auth_failure( btest_rs::syslog_logger::auth_failure(
&peer.to_string(), &username, "quota", &format!("{}", e), &peer.to_string(), &username, "quota", &format!("{}", e),
); );
// Connection is already authenticated, just close it return Ok(Some((username, StopReason::UserDailyQuota, 0, 0)));
return Ok((username, StopReason::UserDailyQuota, 0, 0));
} }
// Start session tracking // TCP multi-connection: register session and wait for secondary connections
if is_tcp_multi {
tracing::info!(
"TCP multi-connection: waiting for {} connections (token={:04x})",
cmd.tcp_conn_count, session_token,
);
let mut map = sessions.lock().await;
map.insert(session_token, TcpSession {
peer_ip: peer.ip(),
username: username.clone(),
cmd: cmd.clone(),
streams: vec![stream],
expected: cmd.tcp_conn_count, // tcp_conn_count includes the primary
});
// The test will be started when all connections join (in the secondary handler above)
return Ok(None);
}
// Single-connection test
run_pro_single_test(stream, cmd, peer, &username, db, quota_mgr, quota_check_interval).await
.map(|(stop, tx, rx)| Some((username, stop, tx, rx)))
}
/// Run a single-connection bandwidth test with quota enforcement.
async fn run_pro_single_test(
stream: TcpStream,
cmd: Command,
peer: SocketAddr,
username: &str,
db: UserDb,
quota_mgr: QuotaManager,
quota_check_interval: u64,
) -> anyhow::Result<(StopReason, u64, u64)> {
let proto_str = if cmd.is_udp() { "UDP" } else { "TCP" }; let proto_str = if cmd.is_udp() { "UDP" } else { "TCP" };
let dir_str = match cmd.direction { let dir_str = match cmd.direction {
CMD_DIR_RX => "RX", CMD_DIR_TX => "TX", _ => "BOTH" CMD_DIR_RX => "RX", CMD_DIR_TX => "TX", _ => "BOTH"
}; };
let session_id = db.start_session( let session_id = db.start_session(
&username, &peer.ip().to_string(), proto_str, dir_str, username, &peer.ip().to_string(), proto_str, dir_str,
)?; )?;
btest_rs::syslog_logger::test_start( btest_rs::syslog_logger::test_start(
&peer.to_string(), proto_str, dir_str, cmd.tcp_conn_count, &peer.to_string(), proto_str, dir_str, cmd.tcp_conn_count,
); );
// Create shared bandwidth state for the test
let state = BandwidthState::new(); let state = BandwidthState::new();
// Spawn quota enforcer // Set byte budget
let budget = quota_mgr.remaining_budget(username, &peer.ip());
if budget < u64::MAX {
state.set_budget(budget);
tracing::info!("Byte budget for '{}' from {}: {} bytes", username, peer.ip(), budget);
}
let enforcer = QuotaEnforcer::new( let enforcer = QuotaEnforcer::new(
quota_mgr.clone(), quota_mgr.clone(),
username.clone(), username.to_string(),
peer.ip(), peer.ip(),
state.clone(), state.clone(),
quota_check_interval, quota_check_interval,
quota_mgr.max_duration(), quota_mgr.max_duration(),
); );
// Spawn quota enforcer — runs alongside the test
let enforcer_state = state.clone(); let enforcer_state = state.clone();
let enforcer_handle = tokio::spawn(async move { let enforcer_handle = tokio::spawn(async move {
enforcer.run().await enforcer.run().await
}); });
// Run the actual bandwidth test using the standard server handlers.
// The enforcer runs concurrently and will set state.running = false
// if any quota is exceeded, which gracefully stops the TX/RX loops.
static UDP_PORT_OFFSET: std::sync::atomic::AtomicU16 = std::sync::atomic::AtomicU16::new(0); static UDP_PORT_OFFSET: std::sync::atomic::AtomicU16 = std::sync::atomic::AtomicU16::new(0);
let mut stream_mut = stream;
let test_result = if cmd.is_udp() { let test_result = if cmd.is_udp() {
let offset = UDP_PORT_OFFSET.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let offset = UDP_PORT_OFFSET.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let udp_port = btest_rs::protocol::BTEST_UDP_PORT_START + offset; let udp_port = btest_rs::protocol::BTEST_UDP_PORT_START + offset;
btest_rs::server::run_udp_test( btest_rs::server::run_udp_test(
&mut stream, peer, &cmd, state.clone(), udp_port, &mut stream_mut, peer, &cmd, state.clone(), udp_port,
).await ).await
} else { } else {
btest_rs::server::run_tcp_test(stream, cmd.clone(), state.clone()).await btest_rs::server::run_tcp_test(stream_mut, cmd.clone(), state.clone()).await
}; };
// Test finished — stop the enforcer if still running
enforcer_state.running.store(false, std::sync::atomic::Ordering::SeqCst); enforcer_state.running.store(false, std::sync::atomic::Ordering::SeqCst);
let stop_reason = enforcer_handle.await.unwrap_or(StopReason::ClientDisconnected); let stop_reason = enforcer_handle.await.unwrap_or(StopReason::ClientDisconnected);
// Determine final stop reason
let final_reason = match &test_result { let final_reason = match &test_result {
Ok(_) => { Ok(_) => {
if stop_reason == StopReason::ClientDisconnected { if stop_reason == StopReason::ClientDisconnected {
StopReason::ClientDisconnected StopReason::ClientDisconnected
} else { } else {
stop_reason // quota or duration exceeded stop_reason
} }
} }
Err(_) => StopReason::ClientDisconnected, Err(_) => StopReason::ClientDisconnected,
}; };
// Record final usage
let (total_tx, total_rx, _, _) = state.summary(); let (total_tx, total_rx, _, _) = state.summary();
quota_mgr.record_usage(username, &peer.ip().to_string(), total_tx, total_rx);
// Flush to DB
quota_mgr.record_usage(&username, &peer.ip().to_string(), total_tx, total_rx);
db.end_session(session_id, total_tx, total_rx)?; db.end_session(session_id, total_tx, total_rx)?;
Ok((username, final_reason, total_tx, total_rx)) Ok((final_reason, total_tx, total_rx))
}
/// Run a TCP multi-connection test with all streams collected.
/// Delegates to the standard multi-conn handler which correctly manages
/// TX+status injection for bidirectional mode.
async fn run_pro_multiconn_test(
streams: Vec<TcpStream>,
cmd: Command,
peer: SocketAddr,
username: &str,
db: UserDb,
quota_mgr: QuotaManager,
quota_check_interval: u64,
) -> anyhow::Result<(StopReason, u64, u64)> {
let dir_str = match cmd.direction {
CMD_DIR_RX => "RX", CMD_DIR_TX => "TX", _ => "BOTH"
};
let session_id = db.start_session(
username, &peer.ip().to_string(), "TCP", dir_str,
)?;
tracing::info!(
"Starting TCP multi-conn test: {} streams, dir={}",
streams.len(), dir_str,
);
let state = BandwidthState::new();
let budget = quota_mgr.remaining_budget(username, &peer.ip());
if budget < u64::MAX {
state.set_budget(budget);
}
let enforcer = QuotaEnforcer::new(
quota_mgr.clone(),
username.to_string(),
peer.ip(),
state.clone(),
quota_check_interval,
quota_mgr.max_duration(),
);
let enforcer_state = state.clone();
let enforcer_handle = tokio::spawn(async move {
enforcer.run().await
});
// Use the standard multi-connection handler which correctly handles
// all direction modes (TX, RX, BOTH with status injection)
let _test_result = btest_rs::server::run_tcp_multiconn_test(
streams, cmd, state.clone(),
).await;
enforcer_state.running.store(false, std::sync::atomic::Ordering::SeqCst);
let stop_reason = enforcer_handle.await.unwrap_or(StopReason::ClientDisconnected);
let (total_tx, total_rx, _, _) = state.summary();
quota_mgr.record_usage(username, &peer.ip().to_string(), total_tx, total_rx);
db.end_session(session_id, total_tx, total_rx)?;
Ok((stop_reason, total_tx, total_rx))
} }

View File

@@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex};
#[derive(Clone)] #[derive(Clone)]
pub struct UserDb { pub struct UserDb {
conn: Arc<Mutex<Connection>>, conn: Arc<Mutex<Connection>>,
path: Arc<String>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -68,9 +69,15 @@ impl UserDb {
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?; conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
Ok(Self { Ok(Self {
conn: Arc::new(Mutex::new(conn)), conn: Arc::new(Mutex::new(conn)),
path: Arc::new(path.to_string()),
}) })
} }
/// Return the database file path.
pub fn path(&self) -> &str {
&self.path
}
pub fn ensure_tables(&self) -> anyhow::Result<()> { pub fn ensure_tables(&self) -> anyhow::Result<()> {
let conn = self.conn.lock().unwrap(); let conn = self.conn.lock().unwrap();
conn.execute_batch(" conn.execute_batch("
@@ -147,13 +154,26 @@ impl UserDb {
pub fn add_user(&self, username: &str, password: &str) -> anyhow::Result<()> { pub fn add_user(&self, username: &str, password: &str) -> anyhow::Result<()> {
let hash = hash_password(username, password); let hash = hash_password(username, password);
let conn = self.conn.lock().unwrap(); let conn = self.conn.lock().unwrap();
// Ensure password_raw column exists (migration for older databases)
let _ = conn.execute("ALTER TABLE users ADD COLUMN password_raw TEXT DEFAULT ''", []);
conn.execute( conn.execute(
"INSERT OR REPLACE INTO users (username, password_hash) VALUES (?1, ?2)", "INSERT OR REPLACE INTO users (username, password_hash, password_raw) VALUES (?1, ?2, ?3)",
params![username, hash], params![username, hash, password],
)?; )?;
Ok(()) Ok(())
} }
/// Get the raw password for MD5 challenge-response auth.
pub fn get_password(&self, username: &str) -> anyhow::Result<Option<String>> {
let conn = self.conn.lock().unwrap();
let result = conn.query_row(
"SELECT password_raw FROM users WHERE username = ?1 AND enabled = 1",
params![username],
|row| row.get::<_, String>(0),
).optional()?;
Ok(result)
}
pub fn get_user(&self, username: &str) -> anyhow::Result<Option<User>> { pub fn get_user(&self, username: &str) -> anyhow::Result<Option<User>> {
let conn = self.conn.lock().unwrap(); let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare(

View File

@@ -76,7 +76,7 @@ const DEFAULT_DB_PATH: &str = "btest-users.db";
/// the web module is optional and failure during startup should surface /// the web module is optional and failure during startup should surface
/// loudly rather than silently serving broken pages. /// loudly rather than silently serving broken pages.
pub fn create_router(db: UserDb) -> Router { pub fn create_router(db: UserDb) -> Router {
let db_path = std::env::var("BTEST_DB_PATH").unwrap_or_else(|_| DEFAULT_DB_PATH.to_string()); let db_path = db.path().to_string();
let query_conn = Connection::open_with_flags( let query_conn = Connection::open_with_flags(
&db_path, &db_path,
@@ -104,6 +104,8 @@ pub fn create_router(db: UserDb) -> Router {
.route("/dashboard/{ip}", get(dashboard_page)) .route("/dashboard/{ip}", get(dashboard_page))
.route("/api/ip/{ip}/sessions", get(api_sessions)) .route("/api/ip/{ip}/sessions", get(api_sessions))
.route("/api/ip/{ip}/stats", get(api_stats)) .route("/api/ip/{ip}/stats", get(api_stats))
.route("/api/ip/{ip}/export", get(api_export))
.route("/api/ip/{ip}/quota", get(api_quota))
.route("/api/session/{id}/intervals", get(api_intervals)) .route("/api/session/{id}/intervals", get(api_intervals))
.with_state(state) .with_state(state)
} }
@@ -142,47 +144,87 @@ fn ensure_web_tables(db_path: &str) -> anyhow::Result<()> {
<head> <head>
<meta charset="utf-8"> <meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<title>btest-rs Public Bandwidth Test Server</title> <title>btest-rs — Free Public Bandwidth Test Server</title>
<style> <style>
*{margin:0;padding:0;box-sizing:border-box} *{margin:0;padding:0;box-sizing:border-box}
body{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,Helvetica,Arial,sans-serif;background:#0f1117;color:#e1e4e8;min-height:100vh;display:flex;flex-direction:column;align-items:center;justify-content:center} body{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,Helvetica,Arial,sans-serif;background:#0f1117;color:#e1e4e8;min-height:100vh;display:flex;flex-direction:column;align-items:center;padding:2rem 1rem}
.container{max-width:560px;width:90%;text-align:center;padding:2rem} .container{max-width:720px;width:100%;padding:1rem 0}
h1{font-size:2rem;margin-bottom:.5rem;color:#58a6ff} h1{font-size:2.2rem;margin-bottom:.25rem;color:#58a6ff;text-align:center}
.subtitle{color:#8b949e;margin-bottom:2rem;line-height:1.5} .subtitle{color:#8b949e;margin-bottom:2.5rem;line-height:1.6;text-align:center;font-size:1.05rem}
.search-box{display:flex;gap:.5rem;margin-bottom:1.5rem} .section{background:#161b22;border:1px solid #30363d;border-radius:8px;padding:1.5rem;margin-bottom:1.5rem;text-align:left;line-height:1.7;color:#c9d1d9}
.section h2{color:#e1e4e8;font-size:1.15rem;margin-bottom:.75rem}
.section h3{color:#e1e4e8;font-size:1rem;margin-bottom:.5rem;margin-top:1rem}
.section h3:first-child{margin-top:0}
.section p{margin-bottom:.5rem}
.section ul{margin:.5rem 0 .5rem 1.5rem;color:#8b949e}
.section li{margin-bottom:.35rem}
code{background:#0d1117;padding:.2rem .5rem;border-radius:4px;font-size:.85em;color:#58a6ff;word-break:break-all}
pre{background:#0d1117;border:1px solid #30363d;border-radius:6px;padding:1rem;overflow-x:auto;margin:.75rem 0;line-height:1.5}
pre code{padding:0;background:none;font-size:.85em}
.label-tag{display:inline-block;padding:.15rem .5rem;border-radius:4px;font-size:.75rem;font-weight:600;text-transform:uppercase;letter-spacing:.03em;margin-right:.5rem;vertical-align:middle}
.tag-tcp{background:rgba(63,185,80,0.15);color:#3fb950}
.tag-udp{background:rgba(210,153,34,0.15);color:#d29922}
.note{background:#1c1e26;border-left:3px solid #d29922;padding:.75rem 1rem;border-radius:0 6px 6px 0;margin:.75rem 0;font-size:.92rem;color:#8b949e}
.note strong{color:#d29922}
.search-section{text-align:center}
.search-section h2{text-align:center}
.search-box{display:flex;gap:.5rem;margin-bottom:1rem}
.search-box input{flex:1;padding:.75rem 1rem;border:1px solid #30363d;border-radius:6px;background:#161b22;color:#e1e4e8;font-size:1rem;outline:none} .search-box input{flex:1;padding:.75rem 1rem;border:1px solid #30363d;border-radius:6px;background:#161b22;color:#e1e4e8;font-size:1rem;outline:none}
.search-box input:focus{border-color:#58a6ff} .search-box input:focus{border-color:#58a6ff}
.search-box input::placeholder{color:#484f58} .search-box input::placeholder{color:#484f58}
.search-box button{padding:.75rem 1.5rem;background:#238636;color:#fff;border:none;border-radius:6px;font-size:1rem;cursor:pointer;white-space:nowrap} .search-box button{padding:.75rem 1.5rem;background:#238636;color:#fff;border:none;border-radius:6px;font-size:1rem;cursor:pointer;white-space:nowrap}
.search-box button:hover{background:#2ea043} .search-box button:hover{background:#2ea043}
.info{background:#161b22;border:1px solid #30363d;border-radius:8px;padding:1.5rem;text-align:left;line-height:1.6;color:#8b949e} .auto-link{font-size:.9rem;color:#8b949e}
.info h3{color:#e1e4e8;margin-bottom:.5rem}
.info code{background:#0d1117;padding:.15rem .4rem;border-radius:4px;font-size:.9em;color:#58a6ff}
.auto-link{margin-top:1rem;font-size:.9rem}
.auto-link a{color:#58a6ff;text-decoration:none} .auto-link a{color:#58a6ff;text-decoration:none}
.auto-link a:hover{text-decoration:underline} .auto-link a:hover{text-decoration:underline}
.footer{margin-top:2rem;color:#484f58;font-size:.8rem} .footer{margin-top:2rem;color:#484f58;font-size:.8rem;text-align:center}
.footer a{color:#58a6ff;text-decoration:none}
.footer a:hover{text-decoration:underline}
</style> </style>
</head> </head>
<body> <body>
<div class="container"> <div class="container">
<h1>btest-rs</h1> <h1>btest-rs</h1>
<p class="subtitle">Public MikroTik Bandwidth Test Server &mdash; view your test results and history.</p> <p class="subtitle">Free public MikroTik-compatible bandwidth test server.<br>Test your link speed from any RouterOS device &mdash; no registration required.</p>
<form class="search-box" id="ip-form" onsubmit="return goToDashboard()">
<input type="text" id="ip-input" placeholder="Enter your IP address (e.g. 203.0.113.5)" autocomplete="off"> <div class="section">
<button type="submit">View Results</button> <h2>Quick Start</h2>
</form> <p>Open a terminal on your MikroTik router and run one of the following commands:</p>
<div class="auto-link" id="auto-detect">Detecting your IP...</div> <h3><span class="label-tag tag-tcp">TCP</span> Recommended</h3>
<div class="info"> <pre><code>/tool bandwidth-test address=104.225.217.60 user=btest password=btest protocol=tcp direction=both</code></pre>
<h3>How it works</h3> <h3><span class="label-tag tag-udp">UDP</span></h3>
<p>Run a bandwidth test from your MikroTik router targeting this server. <pre><code>/tool bandwidth-test address=104.225.217.60 user=btest password=btest protocol=udp direction=both</code></pre>
After the test completes, enter your public IP above to see
throughput charts, session history, and aggregate statistics.</p>
<p style="margin-top:0.5rem">
Example: <code>/tool bandwidth-test address=this-server protocol=tcp direction=both</code>
</p>
</div> </div>
<div class="footer">Powered by btest-rs</div>
<div class="section">
<h2>Important Notes</h2>
<ul>
<li><strong style="color:#e1e4e8">Credentials:</strong> <code>user=btest</code> <code>password=btest</code></li>
<li><strong style="color:#e1e4e8">TCP is recommended</strong> for remote testing &mdash; it works reliably through any NAT or firewall</li>
<li><strong style="color:#e1e4e8">Per-IP daily quotas</strong> apply to keep the service fair for everyone</li>
<li><strong style="color:#e1e4e8">Maximum test duration:</strong> 120 seconds</li>
<li><strong style="color:#e1e4e8">Connection limit:</strong> 3 concurrent tests per IP</li>
</ul>
<div class="note">
<strong>UDP bidirectional may not work through NAT/firewall.</strong>
UDP <code>direction=both</code> requires the server to send packets to a pre-calculated client port, which NAT routers typically block. If you need UDP testing:<br>
&bull; Forward UDP ports 2001&ndash;2100 on your router, or<br>
&bull; Use <code>direction=send</code> or <code>direction=receive</code> (one-way works fine), or<br>
&bull; Test from a device with a public IP
</div>
</div>
<div class="section search-section">
<h2>Check Your Results</h2>
<p style="margin-bottom:1rem;color:#8b949e">After running a test, enter your public IP to view throughput charts, session history, and statistics.</p>
<form class="search-box" id="ip-form" onsubmit="return goToDashboard()">
<input type="text" id="ip-input" placeholder="Enter your IP address (e.g. 203.0.113.5)" autocomplete="off">
<button type="submit">View Results</button>
</form>
<div class="auto-link" id="auto-detect">Detecting your IP...</div>
</div>
<div class="footer">Powered by <a href="https://github.com/manawenuz/btest-rs">btest-rs</a> &mdash; open source MikroTik bandwidth test server</div>
</div> </div>
<script> <script>
function goToDashboard(){var ip=document.getElementById('ip-input').value.trim();if(ip){window.location.href='/dashboard/'+encodeURIComponent(ip);}return false;} function goToDashboard(){var ip=document.getElementById('ip-input').value.trim();if(ip){window.location.href='/dashboard/'+encodeURIComponent(ip);}return false;}
@@ -214,6 +256,8 @@ struct IndexTemplate;
.header h1{font-size:1.5rem;color:#58a6ff} .header h1{font-size:1.5rem;color:#58a6ff}
.header .ip-label{font-size:1.1rem;color:#8b949e;font-family:monospace} .header .ip-label{font-size:1.1rem;color:#8b949e;font-family:monospace}
.header .home-link{margin-left:auto} .header .home-link{margin-left:auto}
.btn{display:inline-block;padding:.5rem 1rem;border-radius:6px;font-size:.85rem;font-weight:500;cursor:pointer;border:1px solid #30363d;text-decoration:none}
.btn-json{background:#161b22;color:#3fb950}.btn-json:hover{background:#1c2128;text-decoration:none}
.stats{display:grid;grid-template-columns:repeat(auto-fit,minmax(160px,1fr));gap:1rem;margin-bottom:1.5rem} .stats{display:grid;grid-template-columns:repeat(auto-fit,minmax(160px,1fr));gap:1rem;margin-bottom:1.5rem}
.stat-card{background:#161b22;border:1px solid #30363d;border-radius:8px;padding:1rem} .stat-card{background:#161b22;border:1px solid #30363d;border-radius:8px;padding:1rem}
.stat-card .label{color:#8b949e;font-size:.8rem;text-transform:uppercase;letter-spacing:.05em} .stat-card .label{color:#8b949e;font-size:.8rem;text-transform:uppercase;letter-spacing:.05em}
@@ -231,12 +275,22 @@ struct IndexTemplate;
.chart-placeholder{text-align:center;color:#484f58;padding:3rem 0} .chart-placeholder{text-align:center;color:#484f58;padding:3rem 0}
.footer{text-align:center;color:#484f58;font-size:.8rem;margin-top:2rem} .footer{text-align:center;color:#484f58;font-size:.8rem;margin-top:2rem}
.no-data{text-align:center;padding:3rem;color:#484f58} .no-data{text-align:center;padding:3rem;color:#484f58}
.quota-section{background:#161b22;border:1px solid #30363d;border-radius:8px;padding:1.25rem;margin-bottom:1.5rem}
.quota-section h2{font-size:1rem;color:#8b949e;margin-bottom:1rem}
.quota-row{display:flex;align-items:center;gap:1rem;margin-bottom:.75rem}
.quota-row:last-child{margin-bottom:0}
.quota-label{min-width:70px;font-size:.85rem;color:#8b949e;text-transform:uppercase;letter-spacing:.04em}
.quota-bar-wrap{flex:1;background:#21262d;border-radius:4px;height:22px;position:relative;overflow:hidden}
.quota-bar{height:100%;border-radius:4px;transition:width .5s ease}
.quota-bar.low{background:#238636}.quota-bar.mid{background:#d29922}.quota-bar.high{background:#da3633}
.quota-text{min-width:180px;font-size:.85rem;color:#e1e4e8;text-align:right;font-family:monospace}
</style> </style>
</head> </head>
<body> <body>
<div class="header"> <div class="header">
<h1>btest-rs</h1> <h1>btest-rs</h1>
<span class="ip-label">{{ ip }}</span> <span class="ip-label">{{ ip }}</span>
<a class="btn btn-json" href="/api/ip/{{ ip }}/export" download>Export JSON</a>
<span class="home-link"><a href="/">Home</a></span> <span class="home-link"><a href="/">Home</a></span>
</div> </div>
<div class="stats" id="stats-grid"> <div class="stats" id="stats-grid">
@@ -246,6 +300,12 @@ struct IndexTemplate;
<div class="stat-card"><div class="label">Avg TX Mbps</div><div class="value" id="stat-avg-tx">&mdash;</div></div> <div class="stat-card"><div class="label">Avg TX Mbps</div><div class="value" id="stat-avg-tx">&mdash;</div></div>
<div class="stat-card"><div class="label">Avg RX Mbps</div><div class="value" id="stat-avg-rx">&mdash;</div></div> <div class="stat-card"><div class="label">Avg RX Mbps</div><div class="value" id="stat-avg-rx">&mdash;</div></div>
</div> </div>
<div class="quota-section" id="quota-section">
<h2>Quota Usage</h2>
<div class="quota-row"><span class="quota-label">Daily</span><div class="quota-bar-wrap"><div class="quota-bar low" id="bar-daily" style="width:0%"></div></div><span class="quota-text" id="text-daily">&mdash;</span></div>
<div class="quota-row"><span class="quota-label">Weekly</span><div class="quota-bar-wrap"><div class="quota-bar low" id="bar-weekly" style="width:0%"></div></div><span class="quota-text" id="text-weekly">&mdash;</span></div>
<div class="quota-row"><span class="quota-label">Monthly</span><div class="quota-bar-wrap"><div class="quota-bar low" id="bar-monthly" style="width:0%"></div></div><span class="quota-text" id="text-monthly">&mdash;</span></div>
</div>
<div class="chart-section"> <div class="chart-section">
<h2 id="chart-title">Select a test below to view its throughput chart</h2> <h2 id="chart-title">Select a test below to view its throughput chart</h2>
<div class="chart-container"> <div class="chart-container">
@@ -266,6 +326,19 @@ var currentIp="{{ ip }}";
var throughputChart=null; var throughputChart=null;
function formatBytes(b){if(b===0)return'0 B';var u=['B','KB','MB','GB','TB'];var i=Math.floor(Math.log(b)/Math.log(1024));if(i>=u.length)i=u.length-1;return(b/Math.pow(1024,i)).toFixed(1)+' '+u[i];} function formatBytes(b){if(b===0)return'0 B';var u=['B','KB','MB','GB','TB'];var i=Math.floor(Math.log(b)/Math.log(1024));if(i>=u.length)i=u.length-1;return(b/Math.pow(1024,i)).toFixed(1)+' '+u[i];}
function formatMbps(bps){return(bps*8/1e6).toFixed(2);} function formatMbps(bps){return(bps*8/1e6).toFixed(2);}
fetch('/api/ip/'+encodeURIComponent(currentIp)+'/quota').then(function(r){return r.json();}).then(function(q){
function upd(id,used,limit){
var pct=limit>0?Math.min(used/limit*100,100):0;
var bar=document.getElementById('bar-'+id);
var txt=document.getElementById('text-'+id);
bar.style.width=pct.toFixed(1)+'%';
bar.className='quota-bar '+(pct<50?'low':pct<80?'mid':'high');
txt.textContent=formatBytes(used)+' / '+formatBytes(limit)+' ('+pct.toFixed(1)+'%)';
}
upd('daily',q.daily_used,q.daily_limit);
upd('weekly',q.weekly_used,q.weekly_limit);
upd('monthly',q.monthly_used,q.monthly_limit);
}).catch(function(){});
function durationStr(s,e){if(!s||!e)return'--';var ms=new Date(e)-new Date(s);if(ms<0)return'--';var sec=Math.round(ms/1000);if(sec<60)return sec+'s';return Math.floor(sec/60)+'m '+(sec%60)+'s';} function durationStr(s,e){if(!s||!e)return'--';var ms=new Date(e)-new Date(s);if(ms<0)return'--';var sec=Math.round(ms/1000);if(sec<60)return sec+'s';return Math.floor(sec/60)+'m '+(sec%60)+'s';}
function durationSec(s,e){if(!s||!e)return 0;return Math.max((new Date(e)-new Date(s))/1000,0.001);} function durationSec(s,e){if(!s||!e)return 0;return Math.max((new Date(e)-new Date(s))/1000,0.001);}
fetch('/api/ip/'+encodeURIComponent(currentIp)+'/stats').then(function(r){return r.json();}).then(function(d){ fetch('/api/ip/'+encodeURIComponent(currentIp)+'/stats').then(function(r){return r.json();}).then(function(d){
@@ -495,6 +568,198 @@ async fn api_stats(
Ok(axum::Json(stats)) Ok(axum::Json(stats))
} }
/// Quota usage for an IP — daily/weekly/monthly with limits.
#[derive(Serialize)]
struct QuotaUsageJson {
daily_used: i64,
daily_limit: i64,
weekly_used: i64,
weekly_limit: i64,
monthly_used: i64,
monthly_limit: i64,
}
/// `GET /api/ip/{ip}/quota` -- return current quota usage for the IP.
async fn api_quota(
State(state): State<Arc<WebState>>,
Path(ip): Path<String>,
) -> Result<axum::Json<QuotaUsageJson>, AppError> {
let conn = state.query_conn.lock().map_err(|e| anyhow::anyhow!("lock: {}", e))?;
let daily: i64 = conn.query_row(
"SELECT COALESCE(SUM(inbound_bytes + outbound_bytes), 0) FROM ip_usage WHERE ip = ?1 AND date = date('now')",
params![ip], |row| row.get(0),
).unwrap_or(0);
let weekly: i64 = conn.query_row(
"SELECT COALESCE(SUM(inbound_bytes + outbound_bytes), 0) FROM ip_usage WHERE ip = ?1 AND date >= date('now', '-7 days')",
params![ip], |row| row.get(0),
).unwrap_or(0);
let monthly: i64 = conn.query_row(
"SELECT COALESCE(SUM(inbound_bytes + outbound_bytes), 0) FROM ip_usage WHERE ip = ?1 AND date >= date('now', '-30 days')",
params![ip], |row| row.get(0),
).unwrap_or(0);
// Limits: 2GB daily, 8GB weekly, 24GB monthly
Ok(axum::Json(QuotaUsageJson {
daily_used: daily,
daily_limit: 2_147_483_648,
weekly_used: weekly,
weekly_limit: 8_589_934_592,
monthly_used: monthly,
monthly_limit: 25_769_803_776,
}))
}
/// Full export of all data for an IP — stats + sessions with human-readable fields.
#[derive(Serialize)]
struct ExportJson {
ip: String,
exported_at: String,
stats: StatsJson,
quota: QuotaJson,
sessions: Vec<ExportSessionJson>,
}
#[derive(Serialize)]
struct QuotaJson {
daily_used_bytes: i64,
daily_used_human: String,
daily_limit_bytes: String,
}
#[derive(Serialize)]
struct ExportSessionJson {
id: i64,
started_at: Option<String>,
ended_at: Option<String>,
protocol: Option<String>,
direction: Option<String>,
tx_bytes: i64,
rx_bytes: i64,
tx_human: String,
rx_human: String,
duration_secs: f64,
avg_tx_mbps: f64,
avg_rx_mbps: f64,
}
fn human_bytes(b: i64) -> String {
let b = b as f64;
if b >= 1_073_741_824.0 {
format!("{:.2} GB", b / 1_073_741_824.0)
} else if b >= 1_048_576.0 {
format!("{:.1} MB", b / 1_048_576.0)
} else if b >= 1024.0 {
format!("{:.1} KB", b / 1024.0)
} else {
format!("{} B", b as i64)
}
}
/// `GET /api/ip/{ip}/export` -- return a comprehensive JSON export of all
/// sessions, stats, and quota usage for an IP. Suitable for download/archival.
async fn api_export(
State(state): State<Arc<WebState>>,
Path(ip): Path<String>,
) -> Result<impl IntoResponse, AppError> {
let conn = state
.query_conn
.lock()
.map_err(|e| anyhow::anyhow!("lock: {}", e))?;
// Stats
let stats = conn.query_row(
"SELECT COUNT(*), COALESCE(SUM(tx_bytes),0), COALESCE(SUM(rx_bytes),0),
COALESCE(SUM(CASE WHEN ended_at IS NOT NULL AND started_at IS NOT NULL
THEN (julianday(ended_at)-julianday(started_at))*86400.0 ELSE 0 END),0)
FROM sessions WHERE peer_ip = ?1",
params![ip],
|row| {
let n: i64 = row.get(0)?;
let tx: i64 = row.get(1)?;
let rx: i64 = row.get(2)?;
let secs: f64 = row.get(3)?;
Ok(StatsJson {
total_sessions: n,
total_tx_bytes: tx,
total_rx_bytes: rx,
avg_tx_mbps: if secs > 0.0 { tx as f64 * 8.0 / secs / 1e6 } else { 0.0 },
avg_rx_mbps: if secs > 0.0 { rx as f64 * 8.0 / secs / 1e6 } else { 0.0 },
})
},
)?;
// Quota
let daily_used: i64 = conn.query_row(
"SELECT COALESCE(SUM(inbound_bytes + outbound_bytes), 0) FROM ip_usage
WHERE ip = ?1 AND date = date('now')",
params![ip],
|row| row.get(0),
).unwrap_or(0);
let quota = QuotaJson {
daily_used_bytes: daily_used,
daily_used_human: human_bytes(daily_used),
daily_limit_bytes: "see server config".to_string(),
};
// Sessions with computed fields (duration computed by SQLite)
let mut stmt = conn.prepare(
"SELECT id, started_at, ended_at, protocol, direction, tx_bytes, rx_bytes,
CASE WHEN ended_at IS NOT NULL AND started_at IS NOT NULL
THEN (julianday(ended_at) - julianday(started_at)) * 86400.0
ELSE 0 END AS dur_secs
FROM sessions WHERE peer_ip = ?1 ORDER BY started_at DESC LIMIT 100",
)?;
let sessions: Vec<ExportSessionJson> = stmt.query_map(params![ip], |row| {
let tx: i64 = row.get(5)?;
let rx: i64 = row.get(6)?;
let dur: f64 = row.get(7)?;
Ok(ExportSessionJson {
id: row.get(0)?,
started_at: row.get(1)?,
ended_at: row.get(2)?,
protocol: row.get(3)?,
direction: row.get(4)?,
tx_bytes: tx,
rx_bytes: rx,
tx_human: human_bytes(tx),
rx_human: human_bytes(rx),
duration_secs: dur,
avg_tx_mbps: if dur > 0.0 { tx as f64 * 8.0 / dur / 1e6 } else { 0.0 },
avg_rx_mbps: if dur > 0.0 { rx as f64 * 8.0 / dur / 1e6 } else { 0.0 },
})
})?.filter_map(Result::ok).collect();
let export = ExportJson {
ip: ip.clone(),
exported_at: {
// Simple UTC timestamp without chrono
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
format!("{}", secs) // Unix timestamp — universally parseable
},
stats,
quota,
sessions,
};
let json_string = serde_json::to_string_pretty(&export)
.map_err(|e| anyhow::anyhow!("json serialize: {}", e))?;
Ok((
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, "application/json".to_string()),
(axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"btest-{}.json\"", ip)),
],
json_string,
))
}
/// `GET /api/session/{id}/intervals` -- return per-second throughput data /// `GET /api/session/{id}/intervals` -- return per-second throughput data
/// for a session. /// for a session.
/// ///

View File

@@ -235,7 +235,7 @@ async fn test_csv_created_client() {
// Initialize CSV // Initialize CSV
btest_rs::csv_output::init(&csv_path).unwrap(); btest_rs::csv_output::init(&csv_path).unwrap();
let (tx, rx, lost, intervals) = run_client_test( let (tx, rx, lost, _intervals) = run_client_test(
"127.0.0.1", port, false, true, false, None, None, "127.0.0.1", port, false, true, false, None, None,
).await; ).await;
@@ -336,3 +336,67 @@ async fn test_bandwidth_state_running_flag() {
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
assert!(!state.running.load(Ordering::Relaxed)); assert!(!state.running.load(Ordering::Relaxed));
} }
// --- CPU Reporting Tests ---
/// Helper that returns the full BandwidthState (not just summary) so we can check remote_cpu.
async fn run_client_with_state(
host: &str, port: u16, transmit: bool, receive: bool, udp: bool,
secs: u64,
) -> std::sync::Arc<btest_rs::bandwidth::BandwidthState> {
let direction = match (transmit, receive) {
(true, false) => btest_rs::protocol::CMD_DIR_RX,
(false, true) => btest_rs::protocol::CMD_DIR_TX,
(true, true) => btest_rs::protocol::CMD_DIR_BOTH,
_ => panic!("must specify direction"),
};
let state = btest_rs::bandwidth::BandwidthState::new();
let state_clone = state.clone();
let host = host.to_string();
let handle = tokio::spawn(async move {
btest_rs::client::run_client(
&host, port, direction, udp,
0, 0, None, None, false, state_clone,
).await
});
tokio::time::sleep(Duration::from_secs(secs)).await;
state.running.store(false, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(500)).await;
handle.abort();
state
}
#[test]
fn test_local_cpu_nonzero() {
// CPU sampler should return > 0 on supported platforms after warming up
btest_rs::cpu::start_sampler();
std::thread::sleep(Duration::from_secs(2));
let cpu = btest_rs::cpu::get();
// On CI or idle machines, CPU may genuinely be 0, so just check it doesn't panic
// and returns a value in range
assert!(cpu <= 100, "CPU should be 0-100, got {}", cpu);
}
#[tokio::test]
async fn test_tcp_remote_cpu_both() {
let port = BASE_PORT + 20;
start_server_noauth(port).await;
let state = run_client_with_state("127.0.0.1", port, true, true, false, 3).await;
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
// On loopback with bidirectional traffic, server CPU should be > 0
// The status messages are interleaved in the TCP data stream
assert!(remote_cpu > 0, "TCP BOTH: remote CPU should be > 0 on loopback, got {}", remote_cpu);
}
#[tokio::test]
async fn test_tcp_remote_cpu_tx_only() {
let port = BASE_PORT + 21;
start_server_noauth(port).await;
let state = run_client_with_state("127.0.0.1", port, true, false, false, 3).await;
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
// TX-only: server sends status messages that the status reader should parse
assert!(remote_cpu > 0, "TCP TX-only: remote CPU should be > 0 on loopback, got {}", remote_cpu);
}