Fix TCP: always send bidirectional data regardless of direction
All checks were successful
CI / test (push) Successful in 1m21s

MITM capture of MikroTik-to-MikroTik showed both sides always send
zero-filled TCP streams, regardless of the direction setting. Direction
only controls what gets measured. Our server wasn't starting a TX thread
when direction=RX, so MikroTik saw no data and reported 0 speed.

Now TCP always starts both TX and RX on every connection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-31 17:46:16 +04:00
parent d8f3b9c189
commit fa4fd63fb3

View File

@@ -293,77 +293,53 @@ async fn handle_client(
async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
let state = BandwidthState::new(); let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize; let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx();
let server_should_rx = cmd.server_rx();
let tx_speed = cmd.remote_tx_speed; let tx_speed = cmd.remote_tx_speed;
let (reader, writer) = stream.into_split(); let (reader, writer) = stream.into_split();
// IMPORTANT: Do NOT drop unused halves - dropping sends TCP FIN // TCP mode: ALWAYS start both TX and RX threads.
let mut _writer_keepalive = None; // MikroTik expects bidirectional data flow on TCP regardless of direction.
let mut _reader_keepalive = None; // The direction only controls what gets measured/reported.
let state_tx = state.clone(); let state_tx = state.clone();
let tx_handle = if server_should_tx { let tx_handle = tokio::spawn(async move {
Some(tokio::spawn(async move { tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await
tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await });
}))
} else {
_writer_keepalive = Some(writer);
None
};
let state_rx = state.clone(); let state_rx = state.clone();
let rx_handle = if server_should_rx { let rx_handle = tokio::spawn(async move {
Some(tokio::spawn(async move { tcp_rx_loop(reader, state_rx).await
tcp_rx_loop(reader, state_rx).await });
}))
} else {
_reader_keepalive = Some(reader);
None
};
status_report_loop(&cmd, &state).await; status_report_loop(&cmd, &state).await;
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
if let Some(h) = tx_handle { let _ = h.await; } let _ = tx_handle.await;
if let Some(h) = rx_handle { let _ = h.await; } let _ = rx_handle.await;
Ok(()) Ok(())
} }
/// TCP multi-connection: distribute TX across all streams, aggregate RX from all. /// TCP multi-connection: always TX+RX on all streams.
async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<()> { async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<()> {
let state = BandwidthState::new(); let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize; let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx();
let server_should_rx = cmd.server_rx();
let tx_speed = cmd.remote_tx_speed; let tx_speed = cmd.remote_tx_speed;
let mut tx_handles = Vec::new(); let mut tx_handles = Vec::new();
let mut rx_handles = Vec::new(); let mut rx_handles = Vec::new();
let mut _writer_keepalives: Vec<tokio::net::tcp::OwnedWriteHalf> = Vec::new();
let mut _reader_keepalives: Vec<tokio::net::tcp::OwnedReadHalf> = Vec::new();
// TCP: always start both TX and RX on every stream
for tcp_stream in streams { for tcp_stream in streams {
let (reader, writer) = tcp_stream.into_split(); let (reader, writer) = tcp_stream.into_split();
if server_should_tx { let st = state.clone();
let st = state.clone(); tx_handles.push(tokio::spawn(async move {
tx_handles.push(tokio::spawn(async move { tcp_tx_loop(writer, tx_size, tx_speed, st).await
tcp_tx_loop(writer, tx_size, tx_speed, st).await }));
}));
} else {
_writer_keepalives.push(writer);
}
if server_should_rx { let st = state.clone();
let st = state.clone(); rx_handles.push(tokio::spawn(async move {
rx_handles.push(tokio::spawn(async move { tcp_rx_loop(reader, st).await
tcp_rx_loop(reader, st).await }));
}));
} else {
_reader_keepalives.push(reader);
}
} }
tracing::info!( tracing::info!(