diff --git a/src/bandwidth.rs b/src/bandwidth.rs index a83caad..ecc1c07 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -13,6 +13,11 @@ pub struct BandwidthState { pub rx_packets: AtomicU64, pub rx_lost_packets: AtomicU64, pub last_udp_seq: AtomicU32, + /// Cumulative totals (never reset by swap) + pub total_tx_bytes: AtomicU64, + pub total_rx_bytes: AtomicU64, + pub total_lost_packets: AtomicU64, + pub intervals: AtomicU32, } impl BandwidthState { @@ -26,8 +31,32 @@ impl BandwidthState { rx_packets: AtomicU64::new(0), rx_lost_packets: AtomicU64::new(0), last_udp_seq: AtomicU32::new(0), + total_tx_bytes: AtomicU64::new(0), + total_rx_bytes: AtomicU64::new(0), + total_lost_packets: AtomicU64::new(0), + intervals: AtomicU32::new(0), }) } + + /// Record an interval's stats into cumulative totals. + pub fn record_interval(&self, tx: u64, rx: u64, lost: u64) { + use std::sync::atomic::Ordering::Relaxed; + self.total_tx_bytes.fetch_add(tx, Relaxed); + self.total_rx_bytes.fetch_add(rx, Relaxed); + self.total_lost_packets.fetch_add(lost, Relaxed); + self.intervals.fetch_add(1, Relaxed); + } + + /// Get summary for syslog reporting. + pub fn summary(&self) -> (u64, u64, u64, u32) { + use std::sync::atomic::Ordering::Relaxed; + ( + self.total_tx_bytes.load(Relaxed), + self.total_rx_bytes.load(Relaxed), + self.total_lost_packets.load(Relaxed), + self.intervals.load(Relaxed), + ) + } } /// Calculate the sleep interval between packets to achieve target bandwidth. diff --git a/src/server.rs b/src/server.rs index be930b8..57219f2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -346,13 +346,20 @@ async fn handle_client( run_tcp_test_server(stream, cmd).await }; - crate::syslog_logger::test_end(&peer.to_string(), proto_str, dir_str); - result + let (total_tx, total_rx, total_lost, intervals) = match &result { + Ok(summary) => *summary, + Err(_) => (0, 0, 0, 0), + }; + crate::syslog_logger::test_end( + &peer.to_string(), proto_str, dir_str, + total_tx, total_rx, total_lost, intervals, + ); + result.map(|_| ()) } // --- TCP Test Server --- -async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { +async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<(u64, u64, u64, u32)> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; let server_should_tx = cmd.server_tx(); @@ -420,11 +427,11 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { state.running.store(false, Ordering::SeqCst); if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = rx_handle { let _ = h.await; } - Ok(()) + Ok(state.summary()) } /// TCP multi-connection. -async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Result<()> { +async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Result<(u64, u64, u64, u32)> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; let server_should_tx = cmd.server_tx(); @@ -480,7 +487,7 @@ async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Resu for h in tx_handles { let _ = h.await; } for h in rx_handles { let _ = h.await; } tracing::info!("TCP multi-connection test ended"); - Ok(()) + Ok(state.summary()) } async fn tcp_tx_loop( @@ -531,6 +538,7 @@ async fn tcp_tx_loop_inner( state.running.store(false, Ordering::SeqCst); break; } + state.record_interval(0, rx_bytes, 0); bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None); next_status = Instant::now() + Duration::from_secs(1); } @@ -612,7 +620,7 @@ async fn tcp_status_sender( } let _ = writer.flush().await; - // Also print locally + state.record_interval(0, rx_bytes, 0); bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), None); } } @@ -624,7 +632,7 @@ async fn run_udp_test_server( peer: SocketAddr, cmd: &Command, udp_port_offset: Arc, -) -> Result<()> { +) -> Result<(u64, u64, u64, u32)> { let offset = udp_port_offset.fetch_add(1, Ordering::SeqCst); let server_udp_port = BTEST_UDP_PORT_START + offset; let client_udp_port = server_udp_port + BTEST_PORT_CLIENT_OFFSET; @@ -729,7 +737,7 @@ async fn run_udp_test_server( state.running.store(false, Ordering::SeqCst); if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = rx_handle { let _ = h.await; } - Ok(()) + Ok(state.summary()) } async fn udp_tx_loop( @@ -863,14 +871,15 @@ async fn status_report_loop(cmd: &Command, state: &BandwidthState) { seq += 1; + let tx = if cmd.server_tx() { state.tx_bytes.swap(0, Ordering::Relaxed) } else { 0 }; + let rx = if cmd.server_rx() { state.rx_bytes.swap(0, Ordering::Relaxed) } else { 0 }; + let lost = if cmd.server_rx() { state.rx_lost_packets.swap(0, Ordering::Relaxed) } else { 0 }; + state.record_interval(tx, rx, lost); + if cmd.server_tx() { - let tx = state.tx_bytes.swap(0, Ordering::Relaxed); bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); } - if cmd.server_rx() { - let rx = state.rx_bytes.swap(0, Ordering::Relaxed); - let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed); let lost_opt = if cmd.is_udp() { Some(lost) } else { None }; bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), lost_opt); } @@ -972,7 +981,8 @@ async fn udp_status_loop( } let _ = writer.flush().await; - // Print local stats + // Print local stats and record totals + state.record_interval(tx_bytes, rx_bytes, lost); if cmd.server_tx() { bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None); } diff --git a/src/syslog_logger.rs b/src/syslog_logger.rs index 772dd7a..c658fdc 100644 --- a/src/syslog_logger.rs +++ b/src/syslog_logger.rs @@ -121,24 +121,28 @@ pub fn test_start(peer: &str, proto: &str, direction: &str, conn_count: u8) { send(6, &msg); } -pub fn test_end(peer: &str, proto: &str, direction: &str) { - let msg = format!( - "TEST_END peer={} proto={} dir={}", - peer, proto, direction, - ); - tracing::info!("{}", msg); - send(6, &msg); -} - -pub fn test_result( +pub fn test_end( peer: &str, + proto: &str, direction: &str, - avg_mbps: f64, + total_tx: u64, + total_rx: u64, + total_lost: u64, duration_secs: u32, ) { + let tx_mbps = if duration_secs > 0 { + total_tx as f64 * 8.0 / duration_secs as f64 / 1_000_000.0 + } else { + 0.0 + }; + let rx_mbps = if duration_secs > 0 { + total_rx as f64 * 8.0 / duration_secs as f64 / 1_000_000.0 + } else { + 0.0 + }; let msg = format!( - "TEST_RESULT peer={} dir={} avg_mbps={:.2} duration={}s", - peer, direction, avg_mbps, duration_secs, + "TEST_END peer={} proto={} dir={} duration={}s tx_avg={:.2}Mbps rx_avg={:.2}Mbps tx_bytes={} rx_bytes={} lost={}", + peer, proto, direction, duration_secs, tx_mbps, rx_mbps, total_tx, total_rx, total_lost, ); tracing::info!("{}", msg); send(6, &msg);