Fix client CSV/syslog: return actual stats from run_client
All checks were successful
CI / test (push) Successful in 1m23s
All checks were successful
CI / test (push) Successful in 1m23s
run_client and sub-functions now return (tx_bytes, rx_bytes, lost, intervals). BandwidthState::record_interval() called in both TCP and UDP client status loops. CSV and syslog TEST_END now show real speeds and byte counts. Also raised client UDP TX error threshold from 1000 to 50000 with adaptive backoff matching the server. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,7 @@ use crate::auth;
|
|||||||
use crate::bandwidth::{self, BandwidthState};
|
use crate::bandwidth::{self, BandwidthState};
|
||||||
use crate::protocol::*;
|
use crate::protocol::*;
|
||||||
|
|
||||||
|
/// Returns (total_tx_bytes, total_rx_bytes, total_lost_packets, duration_secs).
|
||||||
pub async fn run_client(
|
pub async fn run_client(
|
||||||
host: &str,
|
host: &str,
|
||||||
port: u16,
|
port: u16,
|
||||||
@@ -20,7 +21,7 @@ pub async fn run_client(
|
|||||||
auth_user: Option<String>,
|
auth_user: Option<String>,
|
||||||
auth_pass: Option<String>,
|
auth_pass: Option<String>,
|
||||||
nat_mode: bool,
|
nat_mode: bool,
|
||||||
) -> Result<()> {
|
) -> Result<(u64, u64, u64, u32)> {
|
||||||
let addr = format!("{}:{}", host, port);
|
let addr = format!("{}:{}", host, port);
|
||||||
tracing::info!("Connecting to {}...", addr);
|
tracing::info!("Connecting to {}...", addr);
|
||||||
let mut stream = TcpStream::connect(&addr).await?;
|
let mut stream = TcpStream::connect(&addr).await?;
|
||||||
@@ -98,7 +99,7 @@ pub async fn run_client(
|
|||||||
|
|
||||||
// --- TCP Test Client ---
|
// --- TCP Test Client ---
|
||||||
|
|
||||||
async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<()> {
|
async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<(u64, u64, u64, u32)> {
|
||||||
let state = BandwidthState::new();
|
let state = BandwidthState::new();
|
||||||
let tx_size = cmd.tx_size as usize;
|
let tx_size = cmd.tx_size as usize;
|
||||||
let client_should_tx = cmd.client_tx();
|
let client_should_tx = cmd.client_tx();
|
||||||
@@ -137,7 +138,7 @@ async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<()> {
|
|||||||
state.running.store(false, Ordering::SeqCst);
|
state.running.store(false, Ordering::SeqCst);
|
||||||
if let Some(h) = tx_handle { let _ = h.await; }
|
if let Some(h) = tx_handle { let _ = h.await; }
|
||||||
if let Some(h) = rx_handle { let _ = h.await; }
|
if let Some(h) = rx_handle { let _ = h.await; }
|
||||||
Ok(())
|
Ok(state.summary())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn tcp_client_tx_loop(
|
async fn tcp_client_tx_loop(
|
||||||
@@ -202,7 +203,7 @@ async fn run_udp_test_client(
|
|||||||
host: &str,
|
host: &str,
|
||||||
cmd: &Command,
|
cmd: &Command,
|
||||||
nat_mode: bool,
|
nat_mode: bool,
|
||||||
) -> Result<()> {
|
) -> Result<(u64, u64, u64, u32)> {
|
||||||
let mut port_buf = [0u8; 2];
|
let mut port_buf = [0u8; 2];
|
||||||
stream.read_exact(&mut port_buf).await?;
|
stream.read_exact(&mut port_buf).await?;
|
||||||
let server_udp_port = u16::from_be_bytes(port_buf);
|
let server_udp_port = u16::from_be_bytes(port_buf);
|
||||||
@@ -265,7 +266,7 @@ async fn run_udp_test_client(
|
|||||||
state.running.store(false, Ordering::SeqCst);
|
state.running.store(false, Ordering::SeqCst);
|
||||||
if let Some(h) = tx_handle { let _ = h.await; }
|
if let Some(h) = tx_handle { let _ = h.await; }
|
||||||
if let Some(h) = rx_handle { let _ = h.await; }
|
if let Some(h) = rx_handle { let _ = h.await; }
|
||||||
Ok(())
|
Ok(state.summary())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn udp_client_tx_loop(
|
async fn udp_client_tx_loop(
|
||||||
@@ -378,13 +379,14 @@ async fn client_status_loop(cmd: &Command, state: &BandwidthState) {
|
|||||||
|
|
||||||
seq += 1;
|
seq += 1;
|
||||||
|
|
||||||
|
let tx = if cmd.client_tx() { state.tx_bytes.swap(0, Ordering::Relaxed) } else { 0 };
|
||||||
|
let rx = if cmd.client_rx() { state.rx_bytes.swap(0, Ordering::Relaxed) } else { 0 };
|
||||||
|
state.record_interval(tx, rx, 0);
|
||||||
|
|
||||||
if cmd.client_tx() {
|
if cmd.client_tx() {
|
||||||
let tx = state.tx_bytes.swap(0, Ordering::Relaxed);
|
|
||||||
bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None);
|
bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.client_rx() {
|
if cmd.client_rx() {
|
||||||
let rx = state.rx_bytes.swap(0, Ordering::Relaxed);
|
|
||||||
bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), None);
|
bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -450,6 +452,7 @@ async fn udp_client_status_loop(
|
|||||||
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
|
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
|
||||||
let tx_bytes = state.tx_bytes.swap(0, Ordering::Relaxed);
|
let tx_bytes = state.tx_bytes.swap(0, Ordering::Relaxed);
|
||||||
let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed);
|
let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed);
|
||||||
|
state.record_interval(tx_bytes, rx_bytes, lost);
|
||||||
|
|
||||||
let status = StatusMessage {
|
let status = StatusMessage {
|
||||||
seq,
|
seq,
|
||||||
|
|||||||
17
src/main.rs
17
src/main.rs
@@ -189,28 +189,27 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
cli.nat,
|
cli.nat,
|
||||||
);
|
);
|
||||||
|
|
||||||
if cli.duration > 0 {
|
let stats = if cli.duration > 0 {
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
std::time::Duration::from_secs(cli.duration),
|
std::time::Duration::from_secs(cli.duration),
|
||||||
client_fut,
|
client_fut,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(result) => result?,
|
Ok(result) => Some(result?),
|
||||||
Err(_) => {
|
Err(_) => None, // Timeout — stats not available from aborted future
|
||||||
// Timeout — normal exit
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
client_fut.await?;
|
Some(client_fut.await?)
|
||||||
}
|
};
|
||||||
|
|
||||||
let elapsed = start.elapsed().as_secs();
|
let elapsed = start.elapsed().as_secs();
|
||||||
|
let (total_tx, total_rx, total_lost, _intervals) = stats.unwrap_or((0, 0, 0, 0));
|
||||||
|
|
||||||
// Log test end to syslog
|
// Log test end to syslog
|
||||||
syslog_logger::test_end(
|
syslog_logger::test_end(
|
||||||
&host, proto_str, dir_str,
|
&host, proto_str, dir_str,
|
||||||
0, 0, 0, elapsed as u32,
|
total_tx, total_rx, total_lost, elapsed as u32,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Write CSV if enabled
|
// Write CSV if enabled
|
||||||
@@ -218,7 +217,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let auth_type = if cli.auth_user.is_some() { "auth" } else { "none" };
|
let auth_type = if cli.auth_user.is_some() { "auth" } else { "none" };
|
||||||
csv_output::write_result(
|
csv_output::write_result(
|
||||||
&host, cli.port, proto_str, dir_str,
|
&host, cli.port, proto_str, dir_str,
|
||||||
elapsed, 0, 0, 0, auth_type,
|
elapsed, total_tx, total_rx, total_lost, auth_type,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user