Add TCP status for bidirectional mode
All checks were successful
CI / test (push) Successful in 1m22s

In BOTH direction, the TX loop now injects 12-byte status messages
every 1 second between data packets, reporting rx_bytes to the client.
Multi-connection mode also updated with same logic for all 3 cases:
- TX only: pure data
- RX only: status sender on writer
- BOTH: TX data + interleaved status messages

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-31 18:10:46 +04:00
parent 8b127d833f
commit f9289cca55

View File

@@ -303,13 +303,18 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
let mut _reader_keepalive = None; let mut _reader_keepalive = None;
let state_tx = state.clone(); let state_tx = state.clone();
let tx_handle = if server_should_tx { let tx_handle = if server_should_tx && server_should_rx {
// BOTH mode: TX data + inject status messages for the RX direction
Some(tokio::spawn(async move {
tcp_tx_with_status(writer, tx_size, tx_speed, state_tx).await
}))
} else if server_should_tx {
// TX only
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await
})) }))
} else if server_should_rx { } else if server_should_rx {
// Server receives: use the writer to send periodic status messages // RX only: use writer for status messages
// back to the client (MikroTik needs this to show speed)
let st = state.clone(); let st = state.clone();
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
tcp_status_sender(writer, st).await tcp_status_sender(writer, st).await
@@ -329,19 +334,26 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
None None
}; };
if !server_should_rx || server_should_tx { if server_should_tx && !server_should_rx {
// Normal status loop (TX-only or BOTH modes) // TX-only: normal status loop reports TX stats
status_report_loop(&cmd, &state).await; status_report_loop(&cmd, &state).await;
} else if server_should_tx && server_should_rx {
// BOTH: TX loop injects status + prints RX. Just report TX here.
let mut seq: u32 = 0;
let mut tick = tokio::time::interval(Duration::from_secs(1));
loop {
tick.tick().await;
if !state.running.load(Ordering::Relaxed) { break; }
seq += 1;
let tx = state.tx_bytes.swap(0, Ordering::Relaxed);
bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None);
}
} else { } else {
// RX-only: tcp_status_sender handles both status sending and printing. // RX-only: tcp_status_sender handles everything. Just wait.
// Just wait for it and the RX loop to finish.
if tx_handle.is_some() {
// tx_handle is the status sender in this case
while state.running.load(Ordering::Relaxed) { while state.running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(500)).await; tokio::time::sleep(Duration::from_millis(500)).await;
} }
} }
}
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
if let Some(h) = tx_handle { let _ = h.await; } if let Some(h) = tx_handle { let _ = h.await; }
@@ -365,11 +377,21 @@ async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Resu
for tcp_stream in streams { for tcp_stream in streams {
let (reader, writer) = tcp_stream.into_split(); let (reader, writer) = tcp_stream.into_split();
if server_should_tx { if server_should_tx && server_should_rx {
let st = state.clone();
tx_handles.push(tokio::spawn(async move {
tcp_tx_with_status(writer, tx_size, tx_speed, st).await
}));
} else if server_should_tx {
let st = state.clone(); let st = state.clone();
tx_handles.push(tokio::spawn(async move { tx_handles.push(tokio::spawn(async move {
tcp_tx_loop(writer, tx_size, tx_speed, st).await tcp_tx_loop(writer, tx_size, tx_speed, st).await
})); }));
} else if server_should_rx {
let st = state.clone();
tx_handles.push(tokio::spawn(async move {
tcp_status_sender(writer, st).await
}));
} else { } else {
_writer_keepalives.push(writer); _writer_keepalives.push(writer);
} }
@@ -404,14 +426,53 @@ async fn tcp_tx_loop(
tx_size: usize, tx_size: usize,
tx_speed: u32, tx_speed: u32,
state: Arc<BandwidthState>, state: Arc<BandwidthState>,
) {
tcp_tx_loop_inner(&mut writer, tx_size, tx_speed, &state, false).await;
}
/// TCP TX loop that also sends status messages when `send_status` is true.
/// Used in bidirectional mode where the writer handles both data and status.
async fn tcp_tx_with_status(
mut writer: tokio::net::tcp::OwnedWriteHalf,
tx_size: usize,
tx_speed: u32,
state: Arc<BandwidthState>,
) {
tcp_tx_loop_inner(&mut writer, tx_size, tx_speed, &state, true).await;
}
async fn tcp_tx_loop_inner(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
tx_size: usize,
tx_speed: u32,
state: &Arc<BandwidthState>,
send_status: bool,
) { ) {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let packet = vec![0u8; tx_size]; // TCP data is all zeros (no 0x07 header) let packet = vec![0u8; tx_size];
let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16); let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16);
let mut next_send = Instant::now(); let mut next_send = Instant::now();
let mut next_status = Instant::now() + Duration::from_secs(1);
let mut status_seq: u32 = 0;
while state.running.load(Ordering::Relaxed) { while state.running.load(Ordering::Relaxed) {
// Inject status message every ~1 second if in bidirectional mode
if send_status && Instant::now() >= next_status {
status_seq += 1;
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage {
seq: status_seq,
bytes_received: rx_bytes as u32,
};
if writer.write_all(&status.serialize()).await.is_err() {
state.running.store(false, Ordering::SeqCst);
break;
}
bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None);
next_status = Instant::now() + Duration::from_secs(1);
}
if writer.write_all(&packet).await.is_err() { if writer.write_all(&packet).await.is_err() {
state.running.store(false, Ordering::SeqCst); state.running.store(false, Ordering::SeqCst);
break; break;