Fix TCP status: use swap(0) and skip status_report_loop in RX mode
All checks were successful
CI / test (push) Successful in 1m24s

The status sender and status_report_loop were BOTH calling swap(0)
on rx_bytes, racing each other. Now the status sender owns the swap
and prints stats itself. The report loop is skipped in RX-only TCP mode.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-31 18:03:28 +04:00
parent cdad23ffa0
commit 8b127d833f

View File

@@ -329,7 +329,19 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
None
};
if !server_should_rx || server_should_tx {
// Normal status loop (TX-only or BOTH modes)
status_report_loop(&cmd, &state).await;
} else {
// RX-only: tcp_status_sender handles both status sending and printing.
// Just wait for it and the RX loop to finish.
if tx_handle.is_some() {
// tx_handle is the status sender in this case
while state.running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
state.running.store(false, Ordering::SeqCst);
if let Some(h) = tx_handle { let _ = h.await; }
@@ -445,15 +457,14 @@ async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<Band
/// Send periodic 12-byte status messages on the TCP connection.
/// Used when server is in RX mode — tells the client how many bytes we received.
/// Send periodic 12-byte status messages on the TCP connection.
/// Reports bytes received since last status (delta, not cumulative).
/// MikroTik server sends these ~every 500ms-1s.
/// Send periodic 12-byte status messages on the TCP connection AND print local stats.
/// Used when server is in RX-only mode. Replaces the normal status_report_loop
/// because it needs the writer and must own the rx_bytes swap.
async fn tcp_status_sender(
mut writer: tokio::net::tcp::OwnedWriteHalf,
state: Arc<BandwidthState>,
) {
let mut seq: u32 = 0;
let mut last_rx: u64 = 0;
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;
@@ -464,13 +475,12 @@ async fn tcp_status_sender(
}
seq += 1;
let current_rx = state.rx_bytes.load(Ordering::Relaxed);
let delta = current_rx - last_rx;
last_rx = current_rx;
// Swap to get bytes received this interval (atomic reset)
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage {
seq,
bytes_received: delta as u32,
bytes_received: rx_bytes as u32,
};
if writer.write_all(&status.serialize()).await.is_err() {
@@ -478,6 +488,9 @@ async fn tcp_status_sender(
break;
}
let _ = writer.flush().await;
// Also print locally
bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), None);
}
}