From fa4fd63fb3958c1b885c726f81294214efae40db Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Tue, 31 Mar 2026 17:46:16 +0400 Subject: [PATCH] Fix TCP: always send bidirectional data regardless of direction MITM capture of MikroTik-to-MikroTik showed both sides always send zero-filled TCP streams, regardless of the direction setting. Direction only controls what gets measured. Our server wasn't starting a TX thread when direction=RX, so MikroTik saw no data and reported 0 speed. Now TCP always starts both TX and RX on every connection. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server.rs | 66 ++++++++++++++++----------------------------------- 1 file changed, 21 insertions(+), 45 deletions(-) diff --git a/src/server.rs b/src/server.rs index f629e60..e1abcc5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -293,77 +293,53 @@ async fn handle_client( async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; - let server_should_tx = cmd.server_tx(); - let server_should_rx = cmd.server_rx(); let tx_speed = cmd.remote_tx_speed; let (reader, writer) = stream.into_split(); - // IMPORTANT: Do NOT drop unused halves - dropping sends TCP FIN - let mut _writer_keepalive = None; - let mut _reader_keepalive = None; - + // TCP mode: ALWAYS start both TX and RX threads. + // MikroTik expects bidirectional data flow on TCP regardless of direction. + // The direction only controls what gets measured/reported. let state_tx = state.clone(); - let tx_handle = if server_should_tx { - Some(tokio::spawn(async move { - tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await - })) - } else { - _writer_keepalive = Some(writer); - None - }; + let tx_handle = tokio::spawn(async move { + tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await + }); let state_rx = state.clone(); - let rx_handle = if server_should_rx { - Some(tokio::spawn(async move { - tcp_rx_loop(reader, state_rx).await - })) - } else { - _reader_keepalive = Some(reader); - None - }; + let rx_handle = tokio::spawn(async move { + tcp_rx_loop(reader, state_rx).await + }); status_report_loop(&cmd, &state).await; state.running.store(false, Ordering::SeqCst); - if let Some(h) = tx_handle { let _ = h.await; } - if let Some(h) = rx_handle { let _ = h.await; } + let _ = tx_handle.await; + let _ = rx_handle.await; Ok(()) } -/// TCP multi-connection: distribute TX across all streams, aggregate RX from all. +/// TCP multi-connection: always TX+RX on all streams. async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Result<()> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; - let server_should_tx = cmd.server_tx(); - let server_should_rx = cmd.server_rx(); let tx_speed = cmd.remote_tx_speed; let mut tx_handles = Vec::new(); let mut rx_handles = Vec::new(); - let mut _writer_keepalives: Vec = Vec::new(); - let mut _reader_keepalives: Vec = Vec::new(); + // TCP: always start both TX and RX on every stream for tcp_stream in streams { let (reader, writer) = tcp_stream.into_split(); - if server_should_tx { - let st = state.clone(); - tx_handles.push(tokio::spawn(async move { - tcp_tx_loop(writer, tx_size, tx_speed, st).await - })); - } else { - _writer_keepalives.push(writer); - } + let st = state.clone(); + tx_handles.push(tokio::spawn(async move { + tcp_tx_loop(writer, tx_size, tx_speed, st).await + })); - if server_should_rx { - let st = state.clone(); - rx_handles.push(tokio::spawn(async move { - tcp_rx_loop(reader, st).await - })); - } else { - _reader_keepalives.push(reader); - } + let st = state.clone(); + rx_handles.push(tokio::spawn(async move { + tcp_rx_loop(reader, st).await + })); } tracing::info!(