diff --git a/src/server.rs b/src/server.rs index e1abcc5..211535a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -293,53 +293,83 @@ async fn handle_client( async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; + let server_should_tx = cmd.server_tx(); + let server_should_rx = cmd.server_rx(); let tx_speed = cmd.remote_tx_speed; let (reader, writer) = stream.into_split(); - // TCP mode: ALWAYS start both TX and RX threads. - // MikroTik expects bidirectional data flow on TCP regardless of direction. - // The direction only controls what gets measured/reported. + let mut _writer_keepalive = None; + let mut _reader_keepalive = None; + let state_tx = state.clone(); - let tx_handle = tokio::spawn(async move { - tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await - }); + let tx_handle = if server_should_tx { + Some(tokio::spawn(async move { + tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await + })) + } else if server_should_rx { + // Server receives: use the writer to send periodic status messages + // back to the client (MikroTik needs this to show speed) + let st = state.clone(); + Some(tokio::spawn(async move { + tcp_status_sender(writer, st).await + })) + } else { + _writer_keepalive = Some(writer); + None + }; let state_rx = state.clone(); - let rx_handle = tokio::spawn(async move { - tcp_rx_loop(reader, state_rx).await - }); + let rx_handle = if server_should_rx { + Some(tokio::spawn(async move { + tcp_rx_loop(reader, state_rx).await + })) + } else { + _reader_keepalive = Some(reader); + None + }; status_report_loop(&cmd, &state).await; state.running.store(false, Ordering::SeqCst); - let _ = tx_handle.await; - let _ = rx_handle.await; + if let Some(h) = tx_handle { let _ = h.await; } + if let Some(h) = rx_handle { let _ = h.await; } Ok(()) } -/// TCP multi-connection: always TX+RX on all streams. +/// TCP multi-connection. async fn run_tcp_multiconn_server(streams: Vec, cmd: Command) -> Result<()> { let state = BandwidthState::new(); let tx_size = cmd.tx_size as usize; + let server_should_tx = cmd.server_tx(); + let server_should_rx = cmd.server_rx(); let tx_speed = cmd.remote_tx_speed; let mut tx_handles = Vec::new(); let mut rx_handles = Vec::new(); + let mut _writer_keepalives: Vec = Vec::new(); + let mut _reader_keepalives: Vec = Vec::new(); - // TCP: always start both TX and RX on every stream for tcp_stream in streams { let (reader, writer) = tcp_stream.into_split(); - let st = state.clone(); - tx_handles.push(tokio::spawn(async move { - tcp_tx_loop(writer, tx_size, tx_speed, st).await - })); + if server_should_tx { + let st = state.clone(); + tx_handles.push(tokio::spawn(async move { + tcp_tx_loop(writer, tx_size, tx_speed, st).await + })); + } else { + _writer_keepalives.push(writer); + } - let st = state.clone(); - rx_handles.push(tokio::spawn(async move { - tcp_rx_loop(reader, st).await - })); + if server_should_rx { + let st = state.clone(); + rx_handles.push(tokio::spawn(async move { + tcp_rx_loop(reader, st).await + })); + } else { + _reader_keepalives.push(reader); + } } tracing::info!( @@ -413,6 +443,38 @@ async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc, +) { + let mut seq: u32 = 0; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.tick().await; // consume first immediate tick + + while state.running.load(Ordering::Relaxed) { + interval.tick().await; + if !state.running.load(Ordering::Relaxed) { + break; + } + + seq += 1; + let rx_bytes = state.rx_bytes.load(Ordering::Relaxed); + + let status = StatusMessage { + seq, + bytes_received: rx_bytes as u32, + }; + + if writer.write_all(&status.serialize()).await.is_err() { + state.running.store(false, Ordering::SeqCst); + break; + } + let _ = writer.flush().await; + } +} + // --- UDP Test Server --- async fn run_udp_test_server(