Add speed/bytes/duration to syslog TEST_END events
All checks were successful
CI / test (push) Successful in 1m24s

TEST_END now includes: duration, avg TX/RX Mbps, total bytes, lost packets.
All test functions track cumulative totals via BandwidthState::record_interval()
and return summary stats.

Example:
  TEST_END peer=172.16.81.1:59070 proto=UDP dir=TX duration=6s
    tx_avg=275.00Mbps rx_avg=0.00Mbps tx_bytes=206250000 rx_bytes=0 lost=0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-01 09:23:11 +04:00
parent 7bc54a977c
commit ce01d514b2
3 changed files with 70 additions and 27 deletions

View File

@@ -13,6 +13,11 @@ pub struct BandwidthState {
pub rx_packets: AtomicU64, pub rx_packets: AtomicU64,
pub rx_lost_packets: AtomicU64, pub rx_lost_packets: AtomicU64,
pub last_udp_seq: AtomicU32, pub last_udp_seq: AtomicU32,
/// Cumulative totals (never reset by swap)
pub total_tx_bytes: AtomicU64,
pub total_rx_bytes: AtomicU64,
pub total_lost_packets: AtomicU64,
pub intervals: AtomicU32,
} }
impl BandwidthState { impl BandwidthState {
@@ -26,8 +31,32 @@ impl BandwidthState {
rx_packets: AtomicU64::new(0), rx_packets: AtomicU64::new(0),
rx_lost_packets: AtomicU64::new(0), rx_lost_packets: AtomicU64::new(0),
last_udp_seq: AtomicU32::new(0), last_udp_seq: AtomicU32::new(0),
total_tx_bytes: AtomicU64::new(0),
total_rx_bytes: AtomicU64::new(0),
total_lost_packets: AtomicU64::new(0),
intervals: AtomicU32::new(0),
}) })
} }
/// Record an interval's stats into cumulative totals.
pub fn record_interval(&self, tx: u64, rx: u64, lost: u64) {
use std::sync::atomic::Ordering::Relaxed;
self.total_tx_bytes.fetch_add(tx, Relaxed);
self.total_rx_bytes.fetch_add(rx, Relaxed);
self.total_lost_packets.fetch_add(lost, Relaxed);
self.intervals.fetch_add(1, Relaxed);
}
/// Get summary for syslog reporting.
pub fn summary(&self) -> (u64, u64, u64, u32) {
use std::sync::atomic::Ordering::Relaxed;
(
self.total_tx_bytes.load(Relaxed),
self.total_rx_bytes.load(Relaxed),
self.total_lost_packets.load(Relaxed),
self.intervals.load(Relaxed),
)
}
} }
/// Calculate the sleep interval between packets to achieve target bandwidth. /// Calculate the sleep interval between packets to achieve target bandwidth.

View File

@@ -346,13 +346,20 @@ async fn handle_client(
run_tcp_test_server(stream, cmd).await run_tcp_test_server(stream, cmd).await
}; };
crate::syslog_logger::test_end(&peer.to_string(), proto_str, dir_str); let (total_tx, total_rx, total_lost, intervals) = match &result {
result Ok(summary) => *summary,
Err(_) => (0, 0, 0, 0),
};
crate::syslog_logger::test_end(
&peer.to_string(), proto_str, dir_str,
total_tx, total_rx, total_lost, intervals,
);
result.map(|_| ())
} }
// --- TCP Test Server --- // --- TCP Test Server ---
async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<(u64, u64, u64, u32)> {
let state = BandwidthState::new(); let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize; let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx(); let server_should_tx = cmd.server_tx();
@@ -420,11 +427,11 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = tx_handle { let _ = h.await; }
if let Some(h) = rx_handle { let _ = h.await; } if let Some(h) = rx_handle { let _ = h.await; }
Ok(()) Ok(state.summary())
} }
/// TCP multi-connection. /// TCP multi-connection.
async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<()> { async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<(u64, u64, u64, u32)> {
let state = BandwidthState::new(); let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize; let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx(); let server_should_tx = cmd.server_tx();
@@ -480,7 +487,7 @@ async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Resu
for h in tx_handles { let _ = h.await; } for h in tx_handles { let _ = h.await; }
for h in rx_handles { let _ = h.await; } for h in rx_handles { let _ = h.await; }
tracing::info!("TCP multi-connection test ended"); tracing::info!("TCP multi-connection test ended");
Ok(()) Ok(state.summary())
} }
async fn tcp_tx_loop( async fn tcp_tx_loop(
@@ -531,6 +538,7 @@ async fn tcp_tx_loop_inner(
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
break; break;
} }
state.record_interval(0, rx_bytes, 0);
bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None); bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None);
next_status = Instant::now() + Duration::from_secs(1); next_status = Instant::now() + Duration::from_secs(1);
} }
@@ -612,7 +620,7 @@ async fn tcp_status_sender(
} }
let _ = writer.flush().await; let _ = writer.flush().await;
// Also print locally state.record_interval(0, rx_bytes, 0);
bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), None); bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), None);
} }
} }
@@ -624,7 +632,7 @@ async fn run_udp_test_server(
peer: SocketAddr, peer: SocketAddr,
cmd: &Command, cmd: &Command,
udp_port_offset: Arc<std::sync::atomic::AtomicU16>, udp_port_offset: Arc<std::sync::atomic::AtomicU16>,
) -> Result<()> { ) -> Result<(u64, u64, u64, u32)> {
let offset = udp_port_offset.fetch_add(1, Ordering::SeqCst); let offset = udp_port_offset.fetch_add(1, Ordering::SeqCst);
let server_udp_port = BTEST_UDP_PORT_START + offset; let server_udp_port = BTEST_UDP_PORT_START + offset;
let client_udp_port = server_udp_port + BTEST_PORT_CLIENT_OFFSET; let client_udp_port = server_udp_port + BTEST_PORT_CLIENT_OFFSET;
@@ -729,7 +737,7 @@ async fn run_udp_test_server(
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = tx_handle { let _ = h.await; }
if let Some(h) = rx_handle { let _ = h.await; } if let Some(h) = rx_handle { let _ = h.await; }
Ok(()) Ok(state.summary())
} }
async fn udp_tx_loop( async fn udp_tx_loop(
@@ -863,14 +871,15 @@ async fn status_report_loop(cmd: &Command, state: &BandwidthState) {
seq += 1; seq += 1;
let tx = if cmd.server_tx() { state.tx_bytes.swap(0, Ordering::Relaxed) } else { 0 };
let rx = if cmd.server_rx() { state.rx_bytes.swap(0, Ordering::Relaxed) } else { 0 };
let lost = if cmd.server_rx() { state.rx_lost_packets.swap(0, Ordering::Relaxed) } else { 0 };
state.record_interval(tx, rx, lost);
if cmd.server_tx() { if cmd.server_tx() {
let tx = state.tx_bytes.swap(0, Ordering::Relaxed);
bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None);
} }
if cmd.server_rx() { if cmd.server_rx() {
let rx = state.rx_bytes.swap(0, Ordering::Relaxed);
let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed);
let lost_opt = if cmd.is_udp() { Some(lost) } else { None }; let lost_opt = if cmd.is_udp() { Some(lost) } else { None };
bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), lost_opt); bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), lost_opt);
} }
@@ -972,7 +981,8 @@ async fn udp_status_loop(
} }
let _ = writer.flush().await; let _ = writer.flush().await;
// Print local stats // Print local stats and record totals
state.record_interval(tx_bytes, rx_bytes, lost);
if cmd.server_tx() { if cmd.server_tx() {
bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None); bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None);
} }

View File

@@ -121,24 +121,28 @@ pub fn test_start(peer: &str, proto: &str, direction: &str, conn_count: u8) {
send(6, &msg); send(6, &msg);
} }
pub fn test_end(peer: &str, proto: &str, direction: &str) { pub fn test_end(
let msg = format!(
"TEST_END peer={} proto={} dir={}",
peer, proto, direction,
);
tracing::info!("{}", msg);
send(6, &msg);
}
pub fn test_result(
peer: &str, peer: &str,
proto: &str,
direction: &str, direction: &str,
avg_mbps: f64, total_tx: u64,
total_rx: u64,
total_lost: u64,
duration_secs: u32, duration_secs: u32,
) { ) {
let tx_mbps = if duration_secs > 0 {
total_tx as f64 * 8.0 / duration_secs as f64 / 1_000_000.0
} else {
0.0
};
let rx_mbps = if duration_secs > 0 {
total_rx as f64 * 8.0 / duration_secs as f64 / 1_000_000.0
} else {
0.0
};
let msg = format!( let msg = format!(
"TEST_RESULT peer={} dir={} avg_mbps={:.2} duration={}s", "TEST_END peer={} proto={} dir={} duration={}s tx_avg={:.2}Mbps rx_avg={:.2}Mbps tx_bytes={} rx_bytes={} lost={}",
peer, direction, avg_mbps, duration_secs, peer, proto, direction, duration_secs, tx_mbps, rx_mbps, total_tx, total_rx, total_lost,
); );
tracing::info!("{}", msg); tracing::info!("{}", msg);
send(6, &msg); send(6, &msg);