From 4cdcc4e6c4653fe60120f4263314327bcfcf435f Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 1 Apr 2026 18:43:09 +0400 Subject: [PATCH] Public btest server: byte budget, multi-conn, web dashboard, quotas - Inline byte budget in BandwidthState prevents quota overshoot at any link speed (TX/RX loops check per-packet, not per-interval) - TCP multi-connection support for server-pro (session tokens, secondary connection joins, delegates to standard multi-conn handler) - MD5 password verification against stored raw passwords in user DB - Web dashboard: quota progress bars (daily/weekly/monthly), JSON export endpoint (/api/ip/{ip}/export), quota API (/api/ip/{ip}/quota) - Landing page with usage instructions, UDP NAT warning, credentials - Fix IP usage double-counting bug in QuotaManager::record_usage - UserDb now stores DB path and raw passwords for MD5 auth - 10 enforcer tests (4 new: budget calc, budget stop, budget exhausted, unlimited passthrough) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/bandwidth.rs | 27 +++ src/server.rs | 44 +++++ src/server_pro/enforcer.rs | 66 +++++++ src/server_pro/quota.rs | 86 ++++++++- src/server_pro/server_loop.rs | 295 ++++++++++++++++++++++++------- src/server_pro/user_db.rs | 24 ++- src/server_pro/web/mod.rs | 319 +++++++++++++++++++++++++++++++--- 7 files changed, 763 insertions(+), 98 deletions(-) diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 5386a68..f79e4fe 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -20,6 +20,9 @@ pub struct BandwidthState { pub intervals: AtomicU32, /// Remote peer's CPU usage (received via status messages) pub remote_cpu: AtomicU8, + /// Remaining byte budget (TX + RX combined). When this reaches 0 the test + /// stops immediately. u64::MAX means unlimited (default for non-pro server). + pub byte_budget: AtomicU64, } impl BandwidthState { @@ -38,6 +41,7 @@ impl BandwidthState { total_lost_packets: AtomicU64::new(0), intervals: AtomicU32::new(0), remote_cpu: AtomicU8::new(0), + byte_budget: AtomicU64::new(u64::MAX), }) } @@ -50,6 +54,29 @@ impl BandwidthState { self.intervals.fetch_add(1, Relaxed); } + /// Try to spend `amount` bytes from the budget. Returns `true` if allowed, + /// `false` if the budget is exhausted (and sets `running = false`). + #[inline] + pub fn spend_budget(&self, amount: u64) -> bool { + use std::sync::atomic::Ordering::{Relaxed, SeqCst}; + // Fast path: unlimited budget (non-pro server) + let current = self.byte_budget.load(Relaxed); + if current == u64::MAX { + return true; + } + if current < amount { + self.running.store(false, SeqCst); + return false; + } + self.byte_budget.fetch_sub(amount, Relaxed); + true + } + + /// Set the byte budget (total bytes allowed for the entire test). + pub fn set_budget(&self, budget: u64) { + self.byte_budget.store(budget, std::sync::atomic::Ordering::SeqCst); + } + /// Get summary for syslog reporting. pub fn summary(&self) -> (u64, u64, u64, u32) { use std::sync::atomic::Ordering::Relaxed; diff --git a/src/server.rs b/src/server.rs index 57d2ea2..80091cc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -366,6 +366,24 @@ async fn handle_client( // --- TCP Test Server --- +/// Public TX task for multi-connection use by server_pro. +pub async fn tcp_tx_task( + writer: tokio::net::tcp::OwnedWriteHalf, + tx_size: usize, + tx_speed: u32, + state: Arc, +) { + tcp_tx_loop(writer, tx_size, tx_speed, state).await; +} + +/// Public RX task for multi-connection use by server_pro. +pub async fn tcp_rx_task( + reader: tokio::net::tcp::OwnedReadHalf, + state: Arc, +) { + tcp_rx_loop(reader, state).await; +} + /// Run a TCP bandwidth test on an already-authenticated stream. /// Public API for use by server_pro. pub async fn run_tcp_test( @@ -451,9 +469,22 @@ async fn run_tcp_test_inner(stream: TcpStream, cmd: Command, state: Arc, + cmd: Command, + state: Arc, +) -> Result<(u64, u64, u64, u32)> { + run_tcp_multiconn_inner(streams, cmd, state).await +} + /// TCP multi-connection. async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Result<(u64, u64, u64, u32)> { let state = BandwidthState::new(); + run_tcp_multiconn_inner(streams, cmd, state).await +} + +async fn run_tcp_multiconn_inner(streams: Vec, cmd: Command, state: Arc) -> Result<(u64, u64, u64, u32)> { let tx_size = cmd.tx_size as usize; let server_should_tx = cmd.server_tx(); let server_should_rx = cmd.server_rx(); @@ -564,6 +595,9 @@ async fn tcp_tx_loop_inner( next_status = Instant::now() + Duration::from_secs(1); } + if !state.spend_budget(tx_size as u64) { + break; + } if writer.write_all(&packet).await.is_err() { state.running.store(false, Ordering::SeqCst); break; @@ -600,6 +634,9 @@ async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc { + if !state.spend_budget(n as u64) { + break; + } state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); } } @@ -796,6 +833,10 @@ async fn udp_tx_loop( let mut consecutive_errors: u32 = 0; while state.running.load(Ordering::Relaxed) { + if !state.spend_budget(tx_size as u64) { + break; + } + packet[0..4].copy_from_slice(&seq.to_be_bytes()); let result = if multi_conn { @@ -871,6 +912,9 @@ async fn udp_rx_loop(socket: &UdpSocket, state: Arc) { // (multi-connection MikroTik sends from multiple ports) match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await { Ok(Ok((n, _src))) if n >= 4 => { + if !state.spend_budget(n as u64) { + break; + } state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); state.rx_packets.fetch_add(1, Ordering::Relaxed); diff --git a/src/server_pro/enforcer.rs b/src/server_pro/enforcer.rs index 48584dc..2b05847 100644 --- a/src/server_pro/enforcer.rs +++ b/src/server_pro/enforcer.rs @@ -342,4 +342,70 @@ mod tests { let (ip_in, ip_out) = db.get_ip_daily_usage("127.0.0.1").unwrap(); assert!(ip_in + ip_out > 0, "IP usage should be recorded"); } + + #[test] + fn test_remaining_budget_calculation() { + let (db, qm) = setup_test_db(); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // No usage yet: budget = min(daily=1000, weekly=5000, monthly=10000, ip_daily=500, ...) + // IP daily combined = 500 is the smallest + let budget = qm.remaining_budget("testuser", &ip); + assert_eq!(budget, 500, "budget should be min of all limits (ip_daily=500)"); + + // Use record_usage which properly records combined + directional + // inbound=200, outbound=200 → combined = 400 + qm.record_usage("testuser", "10.0.0.1", 200, 200); + + // IP daily combined: 500 - 400 = 100 remaining + // IP daily inbound: 500 - 200 = 300 remaining + // IP daily outbound: 500 - 200 = 300 remaining + // User daily: 1000 - 400 = 600 remaining + let budget = qm.remaining_budget("testuser", &ip); + assert_eq!(budget, 100, "budget should reflect IP combined remaining (100)"); + } + + #[test] + fn test_budget_zero_when_exhausted() { + let (db, qm) = setup_test_db(); + let ip: IpAddr = "10.0.0.2".parse().unwrap(); + + // Exhaust user daily quota (1000 bytes) + db.record_usage("testuser", 600, 500).unwrap(); // 1100 > 1000 + + let budget = qm.remaining_budget("testuser", &ip); + assert_eq!(budget, 0, "budget should be 0 when user daily quota is exhausted"); + } + + #[test] + fn test_byte_budget_stops_transfer() { + let state = BandwidthState::new(); + + // Set a 1000-byte budget + state.set_budget(1000); + + // Spend 500 bytes — should succeed + assert!(state.spend_budget(500)); + + // Spend another 400 — should succeed (100 remaining) + assert!(state.spend_budget(400)); + + // Spend 200 — should fail (only 100 remaining) + assert!(!state.spend_budget(200)); + + // running should be false + assert!(!state.running.load(Ordering::Relaxed)); + } + + #[test] + fn test_unlimited_budget_always_succeeds() { + let state = BandwidthState::new(); + // Default budget is u64::MAX (unlimited) + + // Should always succeed + for _ in 0..1000 { + assert!(state.spend_budget(1_000_000_000)); + } + assert!(state.running.load(Ordering::Relaxed)); + } } diff --git a/src/server_pro/quota.rs b/src/server_pro/quota.rs index 08b7e0f..e059acb 100644 --- a/src/server_pro/quota.rs +++ b/src/server_pro/quota.rs @@ -371,18 +371,92 @@ impl QuotaManager { tracing::error!("Failed to record user usage for {}: {}", username, e); } - // Record combined IP usage. + // Record IP usage — record_ip_usage already writes both the + // inbound_bytes and outbound_bytes columns in one operation. + // Do NOT also call record_ip_inbound_usage/record_ip_outbound_usage + // as they update the same columns and would double-count. if let Err(e) = self.db.record_ip_usage(ip, outbound_bytes, inbound_bytes) { tracing::error!("Failed to record IP usage for {}: {}", ip, e); } + } - // Record directional IP usage for the new per-direction columns. - if let Err(e) = self.db.record_ip_inbound_usage(ip, inbound_bytes) { - tracing::error!("Failed to record IP inbound usage for {}: {}", ip, e); + /// Calculate the remaining byte budget for a user+IP combination. + /// Returns the minimum remaining quota across all applicable limits. + /// Used to set `BandwidthState::byte_budget` before a test starts, + /// preventing overshoot beyond quota boundaries. + pub fn remaining_budget(&self, username: &str, ip: &IpAddr) -> u64 { + let mut budget = u64::MAX; + let ip_str = ip.to_string(); + + // Helper: min that ignores 0 (unlimited) + let cap = |budget: &mut u64, limit: u64, used: u64| { + if limit > 0 { + let remaining = limit.saturating_sub(used); + *budget = (*budget).min(remaining); + } + }; + + // User quotas (combined tx+rx) + if let Ok(Some(user)) = self.db.get_user(username) { + let daily_limit = if user.daily_quota > 0 { user.daily_quota as u64 } else { self.default_daily }; + if daily_limit > 0 { + let (tx, rx) = self.db.get_daily_usage(username).unwrap_or((0, 0)); + cap(&mut budget, daily_limit, tx + rx); + } + + let weekly_limit = if user.weekly_quota > 0 { user.weekly_quota as u64 } else { self.default_weekly }; + if weekly_limit > 0 { + let (tx, rx) = self.db.get_weekly_usage(username).unwrap_or((0, 0)); + cap(&mut budget, weekly_limit, tx + rx); + } + + if self.default_monthly > 0 { + let (tx, rx) = self.db.get_monthly_usage(username).unwrap_or((0, 0)); + cap(&mut budget, self.default_monthly, tx + rx); + } } - if let Err(e) = self.db.record_ip_outbound_usage(ip, outbound_bytes) { - tracing::error!("Failed to record IP outbound usage for {}: {}", ip, e); + + // IP combined quotas + if self.ip_daily > 0 { + let (tx, rx) = self.db.get_ip_daily_usage(&ip_str).unwrap_or((0, 0)); + cap(&mut budget, self.ip_daily, tx + rx); } + if self.ip_weekly > 0 { + let (tx, rx) = self.db.get_ip_weekly_usage(&ip_str).unwrap_or((0, 0)); + cap(&mut budget, self.ip_weekly, tx + rx); + } + if self.ip_monthly > 0 { + let (tx, rx) = self.db.get_ip_monthly_usage(&ip_str).unwrap_or((0, 0)); + cap(&mut budget, self.ip_monthly, tx + rx); + } + + // IP directional quotas — use inbound + outbound as combined ceiling + if self.ip_daily_inbound > 0 { + let used = self.db.get_ip_daily_inbound(&ip_str).unwrap_or(0); + cap(&mut budget, self.ip_daily_inbound, used); + } + if self.ip_daily_outbound > 0 { + let used = self.db.get_ip_daily_outbound(&ip_str).unwrap_or(0); + cap(&mut budget, self.ip_daily_outbound, used); + } + if self.ip_weekly_inbound > 0 { + let used = self.db.get_ip_weekly_inbound(&ip_str).unwrap_or(0); + cap(&mut budget, self.ip_weekly_inbound, used); + } + if self.ip_weekly_outbound > 0 { + let used = self.db.get_ip_weekly_outbound(&ip_str).unwrap_or(0); + cap(&mut budget, self.ip_weekly_outbound, used); + } + if self.ip_monthly_inbound > 0 { + let used = self.db.get_ip_monthly_inbound(&ip_str).unwrap_or(0); + cap(&mut budget, self.ip_monthly_inbound, used); + } + if self.ip_monthly_outbound > 0 { + let used = self.db.get_ip_monthly_outbound(&ip_str).unwrap_or(0); + cap(&mut budget, self.ip_monthly_outbound, used); + } + + budget } pub fn max_duration(&self) -> u64 { diff --git a/src/server_pro/server_loop.rs b/src/server_pro/server_loop.rs index 6a48076..b87f941 100644 --- a/src/server_pro/server_loop.rs +++ b/src/server_pro/server_loop.rs @@ -2,14 +2,18 @@ //! //! Wraps the standard btest server connection handler with: //! - Pre-connection IP/user quota checks +//! - MD5 challenge-response auth against user DB +//! - TCP multi-connection session support //! - Mid-session quota enforcement via QuotaEnforcer //! - Post-session usage recording +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::Mutex; use btest_rs::protocol::*; use btest_rs::bandwidth::BandwidthState; @@ -18,22 +22,27 @@ use super::enforcer::{QuotaEnforcer, StopReason}; use super::quota::{Direction, QuotaManager}; use super::user_db::UserDb; +/// Pending TCP multi-connection session. +struct TcpSession { + peer_ip: std::net::IpAddr, + username: String, + cmd: Command, + streams: Vec, + expected: u8, +} + +type SessionMap = Arc>>; + /// Run the pro server with quota enforcement. pub async fn run_pro_server( port: u16, - ecsrp5: bool, + _ecsrp5: bool, listen_v4: Option, listen_v6: Option, db: UserDb, quota_mgr: QuotaManager, quota_check_interval: u64, ) -> anyhow::Result<()> { - // Pre-derive EC-SRP5 creds if needed - // For pro server, we don't use CLI -a/-p — we use the user DB - // EC-SRP5 needs a fixed password for the server challenge, but - // the actual verification happens against the DB. - // For now, the first user in the DB is used for EC-SRP5 derivation. - let v4_listener = if let Some(ref addr) = listen_v4 { let bind_addr = format!("{}:{}", addr, port); Some(TcpListener::bind(&bind_addr).await?) @@ -52,6 +61,8 @@ pub async fn run_pro_server( anyhow::bail!("No listeners bound"); } + let sessions: SessionMap = Arc::new(Mutex::new(HashMap::new())); + tracing::info!("btest-server-pro ready, accepting connections"); loop { @@ -69,29 +80,14 @@ pub async fn run_pro_server( tracing::info!("New connection from {}", peer); - // Pre-connection IP check - if let Err(e) = quota_mgr.check_ip(&peer.ip(), Direction::Both) { - tracing::warn!("Rejected {} — {}", peer, e); - btest_rs::syslog_logger::auth_failure( - &peer.to_string(), "-", "-", &format!("{}", e), - ); - // Send a quick rejection and close - let mut s = stream; - let _ = s.write_all(&HELLO).await; - drop(s); - continue; - } - - quota_mgr.connect(&peer.ip()); - let db = db.clone(); let qm = quota_mgr.clone(); - let qm_disconnect = quota_mgr.clone(); let interval = quota_check_interval; + let sess = sessions.clone(); tokio::spawn(async move { - match handle_pro_client(stream, peer, db, qm, interval).await { - Ok((username, stop_reason, tx, rx)) => { + let is_primary = match handle_pro_connection(stream, peer, db, qm.clone(), interval, sess).await { + Ok(Some((username, stop_reason, tx, rx))) => { tracing::info!( "Client {} (user '{}') finished: {} (tx={}, rx={})", peer, username, stop_reason, tx, rx, @@ -100,31 +96,100 @@ pub async fn run_pro_server( &peer.to_string(), "btest", &format!("{}", stop_reason), tx, rx, 0, 0, ); + true } + Ok(None) => false, // secondary connection or pending multi-conn Err(e) => { tracing::error!("Client {} error: {}", peer, e); + true } + }; + // Only decrement connection count for primary connections + if is_primary { + qm.disconnect(&peer.ip()); } - qm_disconnect.disconnect(&peer.ip()); }); } } -async fn handle_pro_client( +/// Handle a single TCP connection. Returns None for secondary multi-conn joins. +async fn handle_pro_connection( mut stream: TcpStream, peer: SocketAddr, db: UserDb, quota_mgr: QuotaManager, quota_check_interval: u64, -) -> anyhow::Result<(String, StopReason, u64, u64)> { + sessions: SessionMap, +) -> anyhow::Result> { stream.set_nodelay(true)?; // HELLO stream.write_all(&HELLO).await?; - // Read command + // Read command (or session token for secondary connections) let mut cmd_buf = [0u8; 16]; stream.read_exact(&mut cmd_buf).await?; + + // Check if this is a secondary connection joining an existing TCP session + // Secondary connections send [HI, LO, ...] matching an existing session token + { + let potential_token = u16::from_be_bytes([cmd_buf[0], cmd_buf[1]]); + let mut map = sessions.lock().await; + if let Some(session) = map.get_mut(&potential_token) { + if session.peer_ip == peer.ip() + && session.streams.len() < session.expected as usize + { + tracing::info!( + "Secondary connection from {} joining session (token={:04x}, {}/{})", + peer, potential_token, + session.streams.len() + 1, session.expected, + ); + + // Auth the secondary connection with same token response + let ok = [0x01, cmd_buf[0], cmd_buf[1], 0x00]; + stream.write_all(&ok).await?; + stream.flush().await?; + + session.streams.push(stream); + + // If all connections have joined, start the test + if session.streams.len() >= session.expected as usize { + let session = map.remove(&potential_token).unwrap(); + let db2 = db.clone(); + let qm2 = quota_mgr.clone(); + tokio::spawn(async move { + match run_pro_multiconn_test( + session.streams, session.cmd, peer, + &session.username, db2, qm2, quota_check_interval, + ).await { + Ok((stop, tx, rx)) => { + tracing::info!( + "Multi-conn {} (user '{}') finished: {} (tx={}, rx={})", + peer, session.username, stop, tx, rx, + ); + } + Err(e) => { + tracing::error!("Multi-conn {} error: {}", peer, e); + } + } + }); + } + + return Ok(None); + } + } + } + + // Primary connection — check IP quota/connection limit now + if let Err(e) = quota_mgr.check_ip(&peer.ip(), Direction::Both) { + tracing::warn!("Rejected {} — {}", peer, e); + btest_rs::syslog_logger::auth_failure( + &peer.to_string(), "-", "-", &format!("{}", e), + ); + return Ok(None); + } + quota_mgr.connect(&peer.ip()); + let cmd = Command::deserialize(&cmd_buf); tracing::info!( @@ -136,14 +201,25 @@ async fn handle_pro_client( cmd.tx_size, ); - // Authenticate — use MD5 auth with DB verification - // Send AUTH_REQUIRED + // Build auth OK response with session token for multi-connection + let is_tcp_multi = !cmd.is_udp() && cmd.tcp_conn_count > 0; + let session_token: u16 = if is_tcp_multi { + rand::random::() | 0x0101 // ensure both bytes non-zero + } else { + 0 + }; + let ok_response: [u8; 4] = if is_tcp_multi { + [0x01, (session_token >> 8) as u8, (session_token & 0xFF) as u8, 0x00] + } else { + AUTH_OK + }; + + // Authenticate — MD5 challenge-response against DB stream.write_all(&AUTH_REQUIRED).await?; let challenge = btest_rs::auth::generate_challenge(); stream.write_all(&challenge).await?; stream.flush().await?; - // Read response let mut response = [0u8; 48]; stream.read_exact(&mut response).await?; @@ -176,17 +252,21 @@ async fn handle_pro_client( anyhow::bail!("User disabled"); } - // Verify MD5 hash against stored password hash - // We need to compute the expected hash using the user's password - // But we only store SHA256(user:pass), not the raw password. - // For MD5 auth, we need the raw password to compute MD5(pass + challenge). - // This is a limitation — MD5 auth needs the raw password. - // For now, accept any authenticated user (the hash verification - // happens on the client side with MikroTik). - // TODO: Store password in a reversible form or use EC-SRP5 only. + // Verify MD5 hash against stored raw password + if let Ok(Some(raw_pass)) = db.get_password(&username) { + let expected_hash = btest_rs::auth::compute_auth_hash(&raw_pass, &challenge); + if received_hash != expected_hash { + tracing::warn!("Auth failed: password mismatch for user '{}'", username); + stream.write_all(&AUTH_FAILED).await?; + btest_rs::syslog_logger::auth_failure( + &peer.to_string(), &username, "md5", "password mismatch", + ); + anyhow::bail!("Auth failed"); + } + } + // If no raw password stored, accept (backwards compat with old DB entries) - // Send AUTH_OK - stream.write_all(&AUTH_OK).await?; + stream.write_all(&ok_response).await?; stream.flush().await?; tracing::info!("Auth successful for user '{}'", username); @@ -202,79 +282,168 @@ async fn handle_pro_client( btest_rs::syslog_logger::auth_failure( &peer.to_string(), &username, "quota", &format!("{}", e), ); - // Connection is already authenticated, just close it - return Ok((username, StopReason::UserDailyQuota, 0, 0)); + return Ok(Some((username, StopReason::UserDailyQuota, 0, 0))); } - // Start session tracking + // TCP multi-connection: register session and wait for secondary connections + if is_tcp_multi { + tracing::info!( + "TCP multi-connection: waiting for {} connections (token={:04x})", + cmd.tcp_conn_count, session_token, + ); + let mut map = sessions.lock().await; + map.insert(session_token, TcpSession { + peer_ip: peer.ip(), + username: username.clone(), + cmd: cmd.clone(), + streams: vec![stream], + expected: cmd.tcp_conn_count, // tcp_conn_count includes the primary + }); + // The test will be started when all connections join (in the secondary handler above) + return Ok(None); + } + + // Single-connection test + run_pro_single_test(stream, cmd, peer, &username, db, quota_mgr, quota_check_interval).await + .map(|(stop, tx, rx)| Some((username, stop, tx, rx))) +} + +/// Run a single-connection bandwidth test with quota enforcement. +async fn run_pro_single_test( + stream: TcpStream, + cmd: Command, + peer: SocketAddr, + username: &str, + db: UserDb, + quota_mgr: QuotaManager, + quota_check_interval: u64, +) -> anyhow::Result<(StopReason, u64, u64)> { let proto_str = if cmd.is_udp() { "UDP" } else { "TCP" }; let dir_str = match cmd.direction { CMD_DIR_RX => "RX", CMD_DIR_TX => "TX", _ => "BOTH" }; let session_id = db.start_session( - &username, &peer.ip().to_string(), proto_str, dir_str, + username, &peer.ip().to_string(), proto_str, dir_str, )?; btest_rs::syslog_logger::test_start( &peer.to_string(), proto_str, dir_str, cmd.tcp_conn_count, ); - // Create shared bandwidth state for the test let state = BandwidthState::new(); - // Spawn quota enforcer + // Set byte budget + let budget = quota_mgr.remaining_budget(username, &peer.ip()); + if budget < u64::MAX { + state.set_budget(budget); + tracing::info!("Byte budget for '{}' from {}: {} bytes", username, peer.ip(), budget); + } + let enforcer = QuotaEnforcer::new( quota_mgr.clone(), - username.clone(), + username.to_string(), peer.ip(), state.clone(), quota_check_interval, quota_mgr.max_duration(), ); - // Spawn quota enforcer — runs alongside the test let enforcer_state = state.clone(); let enforcer_handle = tokio::spawn(async move { enforcer.run().await }); - // Run the actual bandwidth test using the standard server handlers. - // The enforcer runs concurrently and will set state.running = false - // if any quota is exceeded, which gracefully stops the TX/RX loops. static UDP_PORT_OFFSET: std::sync::atomic::AtomicU16 = std::sync::atomic::AtomicU16::new(0); + let mut stream_mut = stream; let test_result = if cmd.is_udp() { let offset = UDP_PORT_OFFSET.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let udp_port = btest_rs::protocol::BTEST_UDP_PORT_START + offset; btest_rs::server::run_udp_test( - &mut stream, peer, &cmd, state.clone(), udp_port, + &mut stream_mut, peer, &cmd, state.clone(), udp_port, ).await } else { - btest_rs::server::run_tcp_test(stream, cmd.clone(), state.clone()).await + btest_rs::server::run_tcp_test(stream_mut, cmd.clone(), state.clone()).await }; - // Test finished — stop the enforcer if still running enforcer_state.running.store(false, std::sync::atomic::Ordering::SeqCst); let stop_reason = enforcer_handle.await.unwrap_or(StopReason::ClientDisconnected); - // Determine final stop reason let final_reason = match &test_result { Ok(_) => { if stop_reason == StopReason::ClientDisconnected { StopReason::ClientDisconnected } else { - stop_reason // quota or duration exceeded + stop_reason } } Err(_) => StopReason::ClientDisconnected, }; - // Record final usage let (total_tx, total_rx, _, _) = state.summary(); - - // Flush to DB - quota_mgr.record_usage(&username, &peer.ip().to_string(), total_tx, total_rx); + quota_mgr.record_usage(username, &peer.ip().to_string(), total_tx, total_rx); db.end_session(session_id, total_tx, total_rx)?; - Ok((username, final_reason, total_tx, total_rx)) + Ok((final_reason, total_tx, total_rx)) +} + +/// Run a TCP multi-connection test with all streams collected. +/// Delegates to the standard multi-conn handler which correctly manages +/// TX+status injection for bidirectional mode. +async fn run_pro_multiconn_test( + streams: Vec, + cmd: Command, + peer: SocketAddr, + username: &str, + db: UserDb, + quota_mgr: QuotaManager, + quota_check_interval: u64, +) -> anyhow::Result<(StopReason, u64, u64)> { + let dir_str = match cmd.direction { + CMD_DIR_RX => "RX", CMD_DIR_TX => "TX", _ => "BOTH" + }; + let session_id = db.start_session( + username, &peer.ip().to_string(), "TCP", dir_str, + )?; + + tracing::info!( + "Starting TCP multi-conn test: {} streams, dir={}", + streams.len(), dir_str, + ); + + let state = BandwidthState::new(); + + let budget = quota_mgr.remaining_budget(username, &peer.ip()); + if budget < u64::MAX { + state.set_budget(budget); + } + + let enforcer = QuotaEnforcer::new( + quota_mgr.clone(), + username.to_string(), + peer.ip(), + state.clone(), + quota_check_interval, + quota_mgr.max_duration(), + ); + + let enforcer_state = state.clone(); + let enforcer_handle = tokio::spawn(async move { + enforcer.run().await + }); + + // Use the standard multi-connection handler which correctly handles + // all direction modes (TX, RX, BOTH with status injection) + let _test_result = btest_rs::server::run_tcp_multiconn_test( + streams, cmd, state.clone(), + ).await; + + enforcer_state.running.store(false, std::sync::atomic::Ordering::SeqCst); + let stop_reason = enforcer_handle.await.unwrap_or(StopReason::ClientDisconnected); + + let (total_tx, total_rx, _, _) = state.summary(); + quota_mgr.record_usage(username, &peer.ip().to_string(), total_tx, total_rx); + db.end_session(session_id, total_tx, total_rx)?; + + Ok((stop_reason, total_tx, total_rx)) } diff --git a/src/server_pro/user_db.rs b/src/server_pro/user_db.rs index cc3f731..661931e 100644 --- a/src/server_pro/user_db.rs +++ b/src/server_pro/user_db.rs @@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex}; #[derive(Clone)] pub struct UserDb { conn: Arc>, + path: Arc, } #[derive(Debug, Clone)] @@ -68,9 +69,15 @@ impl UserDb { conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?; Ok(Self { conn: Arc::new(Mutex::new(conn)), + path: Arc::new(path.to_string()), }) } + /// Return the database file path. + pub fn path(&self) -> &str { + &self.path + } + pub fn ensure_tables(&self) -> anyhow::Result<()> { let conn = self.conn.lock().unwrap(); conn.execute_batch(" @@ -147,13 +154,26 @@ impl UserDb { pub fn add_user(&self, username: &str, password: &str) -> anyhow::Result<()> { let hash = hash_password(username, password); let conn = self.conn.lock().unwrap(); + // Ensure password_raw column exists (migration for older databases) + let _ = conn.execute("ALTER TABLE users ADD COLUMN password_raw TEXT DEFAULT ''", []); conn.execute( - "INSERT OR REPLACE INTO users (username, password_hash) VALUES (?1, ?2)", - params![username, hash], + "INSERT OR REPLACE INTO users (username, password_hash, password_raw) VALUES (?1, ?2, ?3)", + params![username, hash, password], )?; Ok(()) } + /// Get the raw password for MD5 challenge-response auth. + pub fn get_password(&self, username: &str) -> anyhow::Result> { + let conn = self.conn.lock().unwrap(); + let result = conn.query_row( + "SELECT password_raw FROM users WHERE username = ?1 AND enabled = 1", + params![username], + |row| row.get::<_, String>(0), + ).optional()?; + Ok(result) + } + pub fn get_user(&self, username: &str) -> anyhow::Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( diff --git a/src/server_pro/web/mod.rs b/src/server_pro/web/mod.rs index b7206c7..1654b8c 100644 --- a/src/server_pro/web/mod.rs +++ b/src/server_pro/web/mod.rs @@ -76,7 +76,7 @@ const DEFAULT_DB_PATH: &str = "btest-users.db"; /// the web module is optional and failure during startup should surface /// loudly rather than silently serving broken pages. pub fn create_router(db: UserDb) -> Router { - let db_path = std::env::var("BTEST_DB_PATH").unwrap_or_else(|_| DEFAULT_DB_PATH.to_string()); + let db_path = db.path().to_string(); let query_conn = Connection::open_with_flags( &db_path, @@ -104,6 +104,8 @@ pub fn create_router(db: UserDb) -> Router { .route("/dashboard/{ip}", get(dashboard_page)) .route("/api/ip/{ip}/sessions", get(api_sessions)) .route("/api/ip/{ip}/stats", get(api_stats)) + .route("/api/ip/{ip}/export", get(api_export)) + .route("/api/ip/{ip}/quota", get(api_quota)) .route("/api/session/{id}/intervals", get(api_intervals)) .with_state(state) } @@ -142,47 +144,87 @@ fn ensure_web_tables(db_path: &str) -> anyhow::Result<()> { -btest-rs Public Bandwidth Test Server +btest-rs — Free Public Bandwidth Test Server

btest-rs

-

Public MikroTik Bandwidth Test Server — view your test results and history.

- - -
-

How it works

-

Run a bandwidth test from your MikroTik router targeting this server. - After the test completes, enter your public IP above to see - throughput charts, session history, and aggregate statistics.

-

- Example: /tool bandwidth-test address=this-server protocol=tcp direction=both -

+

Free public MikroTik-compatible bandwidth test server.
Test your link speed from any RouterOS device — no registration required.

+ +
+

Quick Start

+

Open a terminal on your MikroTik router and run one of the following commands:

+

TCP Recommended

+
/tool bandwidth-test address=104.225.217.60 user=btest password=btest protocol=tcp direction=both
+

UDP

+
/tool bandwidth-test address=104.225.217.60 user=btest password=btest protocol=udp direction=both
- + +
+

Important Notes

+
    +
  • Credentials: user=btest password=btest
  • +
  • TCP is recommended for remote testing — it works reliably through any NAT or firewall
  • +
  • Per-IP daily quotas apply to keep the service fair for everyone
  • +
  • Maximum test duration: 120 seconds
  • +
  • Connection limit: 3 concurrent tests per IP
  • +
+
+ UDP bidirectional may not work through NAT/firewall. + UDP direction=both requires the server to send packets to a pre-calculated client port, which NAT routers typically block. If you need UDP testing:
+ • Forward UDP ports 2001–2100 on your router, or
+ • Use direction=send or direction=receive (one-way works fine), or
+ • Test from a device with a public IP +
+
+ +
+

Check Your Results

+

After running a test, enter your public IP to view throughput charts, session history, and statistics.

+ + +
+ +