Fix TCP send: server sends 12-byte status messages when receiving
All checks were successful
CI / test (push) Successful in 1m19s

pcap of MikroTik-as-server showed it sends periodic 12-byte status
messages back to the client even in RX-only mode. The client needs
these to display speed. Added tcp_status_sender that writes status
messages containing rx_bytes on the TCP write half every 1 second.

Reverted the "always bidirectional" change — TCP direction is
conditional, but RX mode now uses the writer for status instead
of keeping it idle.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-31 17:56:36 +04:00
parent fa4fd63fb3
commit 51bc4ddf16

View File

@@ -293,53 +293,83 @@ async fn handle_client(
async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx();
let server_should_rx = cmd.server_rx();
let tx_speed = cmd.remote_tx_speed;
let (reader, writer) = stream.into_split();
// TCP mode: ALWAYS start both TX and RX threads.
// MikroTik expects bidirectional data flow on TCP regardless of direction.
// The direction only controls what gets measured/reported.
let mut _writer_keepalive = None;
let mut _reader_keepalive = None;
let state_tx = state.clone();
let tx_handle = tokio::spawn(async move {
tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await
});
let tx_handle = if server_should_tx {
Some(tokio::spawn(async move {
tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await
}))
} else if server_should_rx {
// Server receives: use the writer to send periodic status messages
// back to the client (MikroTik needs this to show speed)
let st = state.clone();
Some(tokio::spawn(async move {
tcp_status_sender(writer, st).await
}))
} else {
_writer_keepalive = Some(writer);
None
};
let state_rx = state.clone();
let rx_handle = tokio::spawn(async move {
tcp_rx_loop(reader, state_rx).await
});
let rx_handle = if server_should_rx {
Some(tokio::spawn(async move {
tcp_rx_loop(reader, state_rx).await
}))
} else {
_reader_keepalive = Some(reader);
None
};
status_report_loop(&cmd, &state).await;
state.running.store(false, Ordering::SeqCst);
let _ = tx_handle.await;
let _ = rx_handle.await;
if let Some(h) = tx_handle { let _ = h.await; }
if let Some(h) = rx_handle { let _ = h.await; }
Ok(())
}
/// TCP multi-connection: always TX+RX on all streams.
/// TCP multi-connection.
async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<()> {
let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize;
let server_should_tx = cmd.server_tx();
let server_should_rx = cmd.server_rx();
let tx_speed = cmd.remote_tx_speed;
let mut tx_handles = Vec::new();
let mut rx_handles = Vec::new();
let mut _writer_keepalives: Vec<tokio::net::tcp::OwnedWriteHalf> = Vec::new();
let mut _reader_keepalives: Vec<tokio::net::tcp::OwnedReadHalf> = Vec::new();
// TCP: always start both TX and RX on every stream
for tcp_stream in streams {
let (reader, writer) = tcp_stream.into_split();
let st = state.clone();
tx_handles.push(tokio::spawn(async move {
tcp_tx_loop(writer, tx_size, tx_speed, st).await
}));
if server_should_tx {
let st = state.clone();
tx_handles.push(tokio::spawn(async move {
tcp_tx_loop(writer, tx_size, tx_speed, st).await
}));
} else {
_writer_keepalives.push(writer);
}
let st = state.clone();
rx_handles.push(tokio::spawn(async move {
tcp_rx_loop(reader, st).await
}));
if server_should_rx {
let st = state.clone();
rx_handles.push(tokio::spawn(async move {
tcp_rx_loop(reader, st).await
}));
} else {
_reader_keepalives.push(reader);
}
}
tracing::info!(
@@ -413,6 +443,38 @@ async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<Band
}
}
/// Send periodic 12-byte status messages on the TCP connection.
/// Used when server is in RX mode — tells the client how many bytes we received.
async fn tcp_status_sender(
mut writer: tokio::net::tcp::OwnedWriteHalf,
state: Arc<BandwidthState>,
) {
let mut seq: u32 = 0;
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await; // consume first immediate tick
while state.running.load(Ordering::Relaxed) {
interval.tick().await;
if !state.running.load(Ordering::Relaxed) {
break;
}
seq += 1;
let rx_bytes = state.rx_bytes.load(Ordering::Relaxed);
let status = StatusMessage {
seq,
bytes_received: rx_bytes as u32,
};
if writer.write_all(&status.serialize()).await.is_err() {
state.running.store(false, Ordering::SeqCst);
break;
}
let _ = writer.flush().await;
}
}
// --- UDP Test Server ---
async fn run_udp_test_server(