Fix disconnect detection: TX/RX loops set running=false on EOF
All checks were successful
CI / test (push) Successful in 1m20s
All checks were successful
CI / test (push) Successful in 1m20s
When a TCP connection closes (EOF or write error), the loop now sets the shared running flag to false, which stops the status report loop and all other tasks. Adds "test ended" log messages. The TCP multi-conn "MikroTik shows 0 on send" is a separate issue requiring TCP-level status exchange (MikroTik sends 12-byte status messages on TCP connections, not just a data stream). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -377,6 +377,7 @@ async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Resu
|
|||||||
state.running.store(false, Ordering::SeqCst);
|
state.running.store(false, Ordering::SeqCst);
|
||||||
for h in tx_handles { let _ = h.await; }
|
for h in tx_handles { let _ = h.await; }
|
||||||
for h in rx_handles { let _ = h.await; }
|
for h in rx_handles { let _ = h.await; }
|
||||||
|
tracing::info!("TCP multi-connection test ended");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -395,6 +396,7 @@ async fn tcp_tx_loop(
|
|||||||
|
|
||||||
while state.running.load(Ordering::Relaxed) {
|
while state.running.load(Ordering::Relaxed) {
|
||||||
if writer.write_all(&packet).await.is_err() {
|
if writer.write_all(&packet).await.is_err() {
|
||||||
|
state.running.store(false, Ordering::SeqCst);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed);
|
state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed);
|
||||||
@@ -425,7 +427,10 @@ async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc<Band
|
|||||||
let mut buf = vec![0u8; 65536];
|
let mut buf = vec![0u8; 65536];
|
||||||
while state.running.load(Ordering::Relaxed) {
|
while state.running.load(Ordering::Relaxed) {
|
||||||
match reader.read(&mut buf).await {
|
match reader.read(&mut buf).await {
|
||||||
Ok(0) | Err(_) => break,
|
Ok(0) | Err(_) => {
|
||||||
|
state.running.store(false, Ordering::SeqCst);
|
||||||
|
break;
|
||||||
|
}
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user