Fix TCP multi-connection: TX/RX on ALL streams, not just primary
All checks were successful
CI / test (push) Successful in 1m19s
All checks were successful
CI / test (push) Successful in 1m19s
pcap analysis showed MikroTik sends/receives data across all 20 TCP connections, but we only used the primary. Now all streams get their own TX and RX tasks, distributing bandwidth across all connections. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -274,15 +274,15 @@ async fn handle_client(
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut all_streams = vec![stream];
|
||||||
|
all_streams.extend(extra_streams);
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"TCP multi-connection: starting with {} total streams",
|
"TCP multi-connection: starting with {} total streams",
|
||||||
1 + extra_streams.len(),
|
all_streams.len(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Run test - primary stream handles data, extras provide parallel TCP bandwidth
|
run_tcp_multiconn_server(all_streams, cmd).await
|
||||||
// For now just use the primary; extras keep the connection alive
|
|
||||||
let _extra_keepalive = extra_streams;
|
|
||||||
run_tcp_test_server(stream, cmd).await
|
|
||||||
} else {
|
} else {
|
||||||
run_tcp_test_server(stream, cmd).await
|
run_tcp_test_server(stream, cmd).await
|
||||||
}
|
}
|
||||||
@@ -331,6 +331,55 @@ async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TCP multi-connection: distribute TX across all streams, aggregate RX from all.
|
||||||
|
async fn run_tcp_multiconn_server(streams: Vec<TcpStream>, cmd: Command) -> Result<()> {
|
||||||
|
let state = BandwidthState::new();
|
||||||
|
let tx_size = cmd.tx_size as usize;
|
||||||
|
let server_should_tx = cmd.server_tx();
|
||||||
|
let server_should_rx = cmd.server_rx();
|
||||||
|
let tx_speed = cmd.remote_tx_speed;
|
||||||
|
|
||||||
|
let mut tx_handles = Vec::new();
|
||||||
|
let mut rx_handles = Vec::new();
|
||||||
|
let mut _writer_keepalives: Vec<tokio::net::tcp::OwnedWriteHalf> = Vec::new();
|
||||||
|
let mut _reader_keepalives: Vec<tokio::net::tcp::OwnedReadHalf> = Vec::new();
|
||||||
|
|
||||||
|
for tcp_stream in streams {
|
||||||
|
let (reader, writer) = tcp_stream.into_split();
|
||||||
|
|
||||||
|
if server_should_tx {
|
||||||
|
let st = state.clone();
|
||||||
|
tx_handles.push(tokio::spawn(async move {
|
||||||
|
tcp_tx_loop(writer, tx_size, tx_speed, st).await
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
_writer_keepalives.push(writer);
|
||||||
|
}
|
||||||
|
|
||||||
|
if server_should_rx {
|
||||||
|
let st = state.clone();
|
||||||
|
rx_handles.push(tokio::spawn(async move {
|
||||||
|
tcp_rx_loop(reader, st).await
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
_reader_keepalives.push(reader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"TCP multi-conn: {} TX tasks, {} RX tasks",
|
||||||
|
tx_handles.len(),
|
||||||
|
rx_handles.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
status_report_loop(&cmd, &state).await;
|
||||||
|
|
||||||
|
state.running.store(false, Ordering::SeqCst);
|
||||||
|
for h in tx_handles { let _ = h.await; }
|
||||||
|
for h in rx_handles { let _ = h.await; }
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn tcp_tx_loop(
|
async fn tcp_tx_loop(
|
||||||
mut writer: tokio::net::tcp::OwnedWriteHalf,
|
mut writer: tokio::net::tcp::OwnedWriteHalf,
|
||||||
tx_size: usize,
|
tx_size: usize,
|
||||||
|
|||||||
Reference in New Issue
Block a user