From f9289cca55dd36d629502bc4a6cdc9fce18c74dc Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Tue, 31 Mar 2026 18:10:46 +0400 Subject: [PATCH] Add TCP status for bidirectional mode In BOTH direction, the TX loop now injects 12-byte status messages every 1 second between data packets, reporting rx_bytes to the client. Multi-connection mode also updated with same logic for all 3 cases: - TX only: pure data - RX only: status sender on writer - BOTH: TX data + interleaved status messages Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server.rs | 89 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 14 deletions(-) diff --git a/src/server.rs b/src/server.rs index 043689f..3ef2e59 100644 --- a/src/server.rs +++ b/src/server.rs @@ -303,13 +303,18 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { let mut _reader_keepalive = None; let state_tx = state.clone(); - let tx_handle = if server_should_tx { + let tx_handle = if server_should_tx && server_should_rx { + // BOTH mode: TX data + inject status messages for the RX direction + Some(tokio::spawn(async move { + tcp_tx_with_status(writer, tx_size, tx_speed, state_tx).await + })) + } else if server_should_tx { + // TX only Some(tokio::spawn(async move { tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await })) } else if server_should_rx { - // Server receives: use the writer to send periodic status messages - // back to the client (MikroTik needs this to show speed) + // RX only: use writer for status messages let st = state.clone(); Some(tokio::spawn(async move { tcp_status_sender(writer, st).await @@ -329,17 +334,24 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { None }; - if !server_should_rx || server_should_tx { - // Normal status loop (TX-only or BOTH modes) + if server_should_tx && !server_should_rx { + // TX-only: normal status loop reports TX stats status_report_loop(&cmd, &state).await; + } else if server_should_tx && server_should_rx { + // BOTH: TX loop injects status + prints RX. Just report TX here. + let mut seq: u32 = 0; + let mut tick = tokio::time::interval(Duration::from_secs(1)); + loop { + tick.tick().await; + if !state.running.load(Ordering::Relaxed) { break; } + seq += 1; + let tx = state.tx_bytes.swap(0, Ordering::Relaxed); + bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); + } } else { - // RX-only: tcp_status_sender handles both status sending and printing. - // Just wait for it and the RX loop to finish. - if tx_handle.is_some() { - // tx_handle is the status sender in this case - while state.running.load(Ordering::Relaxed) { - tokio::time::sleep(Duration::from_millis(500)).await; - } + // RX-only: tcp_status_sender handles everything. Just wait. + while state.running.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(500)).await; } } @@ -365,11 +377,21 @@ async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Resu for tcp_stream in streams { let (reader, writer) = tcp_stream.into_split(); - if server_should_tx { + if server_should_tx && server_should_rx { + let st = state.clone(); + tx_handles.push(tokio::spawn(async move { + tcp_tx_with_status(writer, tx_size, tx_speed, st).await + })); + } else if server_should_tx { let st = state.clone(); tx_handles.push(tokio::spawn(async move { tcp_tx_loop(writer, tx_size, tx_speed, st).await })); + } else if server_should_rx { + let st = state.clone(); + tx_handles.push(tokio::spawn(async move { + tcp_status_sender(writer, st).await + })); } else { _writer_keepalives.push(writer); } @@ -404,14 +426,53 @@ async fn tcp_tx_loop( tx_size: usize, tx_speed: u32, state: Arc, +) { + tcp_tx_loop_inner(&mut writer, tx_size, tx_speed, &state, false).await; +} + +/// TCP TX loop that also sends status messages when `send_status` is true. +/// Used in bidirectional mode where the writer handles both data and status. +async fn tcp_tx_with_status( + mut writer: tokio::net::tcp::OwnedWriteHalf, + tx_size: usize, + tx_speed: u32, + state: Arc, +) { + tcp_tx_loop_inner(&mut writer, tx_size, tx_speed, &state, true).await; +} + +async fn tcp_tx_loop_inner( + writer: &mut tokio::net::tcp::OwnedWriteHalf, + tx_size: usize, + tx_speed: u32, + state: &Arc, + send_status: bool, ) { tokio::time::sleep(Duration::from_millis(100)).await; - let packet = vec![0u8; tx_size]; // TCP data is all zeros (no 0x07 header) + let packet = vec![0u8; tx_size]; let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16); let mut next_send = Instant::now(); + let mut next_status = Instant::now() + Duration::from_secs(1); + let mut status_seq: u32 = 0; while state.running.load(Ordering::Relaxed) { + // Inject status message every ~1 second if in bidirectional mode + if send_status && Instant::now() >= next_status { + status_seq += 1; + let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed); + let status = StatusMessage { + seq: status_seq, + bytes_received: rx_bytes as u32, + }; + if writer.write_all(&status.serialize()).await.is_err() { + state.running.store(false, Ordering::SeqCst); + break; + } + bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None); + next_status = Instant::now() + Duration::from_secs(1); + } + if writer.write_all(&packet).await.is_err() { state.running.store(false, Ordering::SeqCst); break;