diff --git a/src/client.rs b/src/client.rs index 484481d..d960278 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,6 +10,7 @@ use crate::auth; use crate::bandwidth::{self, BandwidthState}; use crate::protocol::*; +/// Returns (total_tx_bytes, total_rx_bytes, total_lost_packets, duration_secs). pub async fn run_client( host: &str, port: u16, @@ -20,7 +21,7 @@ pub async fn run_client( auth_user: Option, auth_pass: Option, nat_mode: bool, -) -> Result<()> { +) -> Result<(u64, u64, u64, u32)> { let addr = format!("{}:{}", host, port); tracing::info!("Connecting to {}...", addr); let mut stream = TcpStream::connect(&addr).await?; @@ -98,7 +99,7 @@ pub async fn run_client( // --- TCP Test Client --- -async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<()> { +async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<(u64, u64, u64, u32)> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; let client_should_tx = cmd.client_tx(); @@ -137,7 +138,7 @@ async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<()> { state.running.store(false, Ordering::SeqCst); if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = rx_handle { let _ = h.await; } - Ok(()) + Ok(state.summary()) } async fn tcp_client_tx_loop( @@ -202,7 +203,7 @@ async fn run_udp_test_client( host: &str, cmd: &Command, nat_mode: bool, -) -> Result<()> { +) -> Result<(u64, u64, u64, u32)> { let mut port_buf = [0u8; 2]; stream.read_exact(&mut port_buf).await?; let server_udp_port = u16::from_be_bytes(port_buf); @@ -265,7 +266,7 @@ async fn run_udp_test_client( state.running.store(false, Ordering::SeqCst); if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = rx_handle { let _ = h.await; } - Ok(()) + Ok(state.summary()) } async fn udp_client_tx_loop( @@ -378,13 +379,14 @@ async fn client_status_loop(cmd: &Command, state: &BandwidthState) { seq += 1; + let tx = if cmd.client_tx() { state.tx_bytes.swap(0, Ordering::Relaxed) } else { 0 }; + let rx = if cmd.client_rx() { state.rx_bytes.swap(0, Ordering::Relaxed) } else { 0 }; + state.record_interval(tx, rx, 0); + if cmd.client_tx() { - let tx = state.tx_bytes.swap(0, Ordering::Relaxed); bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); } - if cmd.client_rx() { - let rx = state.rx_bytes.swap(0, Ordering::Relaxed); bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), None); } } @@ -450,6 +452,7 @@ async fn udp_client_status_loop( let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed); let tx_bytes = state.tx_bytes.swap(0, Ordering::Relaxed); let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed); + state.record_interval(tx_bytes, rx_bytes, lost); let status = StatusMessage { seq, diff --git a/src/main.rs b/src/main.rs index 14ba3b9..f471d21 100644 --- a/src/main.rs +++ b/src/main.rs @@ -189,28 +189,27 @@ async fn main() -> anyhow::Result<()> { cli.nat, ); - if cli.duration > 0 { + let stats = if cli.duration > 0 { match tokio::time::timeout( std::time::Duration::from_secs(cli.duration), client_fut, ) .await { - Ok(result) => result?, - Err(_) => { - // Timeout — normal exit - } + Ok(result) => Some(result?), + Err(_) => None, // Timeout — stats not available from aborted future } } else { - client_fut.await?; - } + Some(client_fut.await?) + }; let elapsed = start.elapsed().as_secs(); + let (total_tx, total_rx, total_lost, _intervals) = stats.unwrap_or((0, 0, 0, 0)); // Log test end to syslog syslog_logger::test_end( &host, proto_str, dir_str, - 0, 0, 0, elapsed as u32, + total_tx, total_rx, total_lost, elapsed as u32, ); // Write CSV if enabled @@ -218,7 +217,7 @@ async fn main() -> anyhow::Result<()> { let auth_type = if cli.auth_user.is_some() { "auth" } else { "none" }; csv_output::write_result( &host, cli.port, proto_str, dir_str, - elapsed, 0, 0, 0, auth_type, + elapsed, total_tx, total_rx, total_lost, auth_type, ); } } else {