From a87dd7510f9ffa046726dc309eefbaae4e708143 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Tue, 31 Mar 2026 17:17:00 +0400 Subject: [PATCH] Fix TCP multi-connection: TX/RX on ALL streams, not just primary pcap analysis showed MikroTik sends/receives data across all 20 TCP connections, but we only used the primary. Now all streams get their own TX and RX tasks, distributing bandwidth across all connections. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/src/server.rs b/src/server.rs index cb102ac..bb94006 100644 --- a/src/server.rs +++ b/src/server.rs @@ -274,15 +274,15 @@ async fn handle_client( .unwrap_or_default() }; + let mut all_streams = vec![stream]; + all_streams.extend(extra_streams); + tracing::info!( "TCP multi-connection: starting with {} total streams", - 1 + extra_streams.len(), + all_streams.len(), ); - // Run test - primary stream handles data, extras provide parallel TCP bandwidth - // For now just use the primary; extras keep the connection alive - let _extra_keepalive = extra_streams; - run_tcp_test_server(stream, cmd).await + run_tcp_multiconn_server(all_streams, cmd).await } else { run_tcp_test_server(stream, cmd).await } @@ -331,6 +331,55 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { Ok(()) } +/// TCP multi-connection: distribute TX across all streams, aggregate RX from all. +async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Result<()> { + let state = BandwidthState::new(); + let tx_size = cmd.tx_size as usize; + let server_should_tx = cmd.server_tx(); + let server_should_rx = cmd.server_rx(); + let tx_speed = cmd.remote_tx_speed; + + let mut tx_handles = Vec::new(); + let mut rx_handles = Vec::new(); + let mut _writer_keepalives: Vec = Vec::new(); + let mut _reader_keepalives: Vec = Vec::new(); + + for tcp_stream in streams { + let (reader, writer) = tcp_stream.into_split(); + + if server_should_tx { + let st = state.clone(); + tx_handles.push(tokio::spawn(async move { + tcp_tx_loop(writer, tx_size, tx_speed, st).await + })); + } else { + _writer_keepalives.push(writer); + } + + if server_should_rx { + let st = state.clone(); + rx_handles.push(tokio::spawn(async move { + tcp_rx_loop(reader, st).await + })); + } else { + _reader_keepalives.push(reader); + } + } + + tracing::info!( + "TCP multi-conn: {} TX tasks, {} RX tasks", + tx_handles.len(), + rx_handles.len(), + ); + + status_report_loop(&cmd, &state).await; + + state.running.store(false, Ordering::SeqCst); + for h in tx_handles { let _ = h.await; } + for h in rx_handles { let _ = h.await; } + Ok(()) +} + async fn tcp_tx_loop( mut writer: tokio::net::tcp::OwnedWriteHalf, tx_size: usize,