Fix multi-connection: don't connect() UDP socket when conn_count > 1
Some checks failed
CI / test (push) Has been cancelled

Root cause found via pcap analysis: MikroTik with connection-count=N
sends UDP from N different source ports (2257, 2258, 2259, ...) all
to our single server port 2001. A connect()'d UDP socket only accepts
packets from the one connected address, silently dropping ~75% of
traffic with conn_count=4.

Fix: when tcp_conn_count > 0, leave the UDP socket unconnected and
use send_to()/recv_from() instead of send()/recv(). This accepts
packets from all MikroTik source ports.

This bug also exists in the original C btest-opensource.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-31 15:27:33 +04:00
parent 4dddf21f2f
commit 28e553bc5f
2 changed files with 109 additions and 7 deletions

View File

@@ -50,7 +50,7 @@ async fn handle_client(
let cmd = recv_command(&mut stream).await?;
tracing::info!(
"Client {} command: proto={} dir={} tx_size={} remote_speed={} local_speed={}",
"Client {} command: proto={} dir={} conn_count={} tx_size={} remote_speed={} local_speed={}",
peer,
if cmd.is_udp() { "UDP" } else { "TCP" },
match cmd.direction {
@@ -59,6 +59,7 @@ async fn handle_client(
CMD_DIR_BOTH => "BOTH",
_ => "?",
},
cmd.tcp_conn_count,
cmd.tx_size,
cmd.remote_tx_speed,
cmd.local_tx_speed,
@@ -197,7 +198,23 @@ async fn run_udp_test_server(
let udp = UdpSocket::bind(format!("0.0.0.0:{}", server_udp_port)).await?;
let client_udp_addr: SocketAddr =
format!("{}:{}", peer.ip(), client_udp_port).parse().unwrap();
udp.connect(client_udp_addr).await?;
// When connection_count > 1, MikroTik sends UDP from MULTIPLE source ports
// (base_port, base_port+1, ..., base_port+N-1) all to our single server port.
// A connect()'d UDP socket only accepts from the one connected address,
// silently dropping packets from the other ports.
// So: only connect() for single-connection mode (enables send() without addr).
// For multi-connection, we leave the socket unconnected and use send_to()/recv_from().
let multi_conn = cmd.tcp_conn_count > 0;
if !multi_conn {
udp.connect(client_udp_addr).await?;
}
tracing::info!(
"UDP mode: conn_count={}, socket={}",
cmd.tcp_conn_count.max(1),
if multi_conn { "unconnected (multi-port RX)" } else { "connected" },
);
let state = BandwidthState::new();
let tx_size = cmd.tx_size as usize;
@@ -209,9 +226,11 @@ async fn run_udp_test_server(
let state_tx = state.clone();
let udp_tx = udp.clone();
let tx_target = client_udp_addr;
let is_multi = multi_conn;
let tx_handle = if server_should_tx {
Some(tokio::spawn(async move {
udp_tx_loop(&udp_tx, tx_size, tx_speed, state_tx).await
udp_tx_loop(&udp_tx, tx_size, tx_speed, state_tx, is_multi, tx_target).await
}))
} else {
None
@@ -241,6 +260,8 @@ async fn udp_tx_loop(
tx_size: usize,
initial_tx_speed: u32,
state: Arc<BandwidthState>,
multi_conn: bool,
target: SocketAddr,
) {
let mut seq: u32 = 0;
let mut packet = vec![0u8; tx_size];
@@ -251,7 +272,12 @@ async fn udp_tx_loop(
while state.running.load(Ordering::Relaxed) {
packet[0..4].copy_from_slice(&seq.to_be_bytes());
match socket.send(&packet).await {
let result = if multi_conn {
socket.send_to(&packet, target).await
} else {
socket.send(&packet).await
};
match result {
Ok(n) => {
seq = seq.wrapping_add(1);
state.tx_bytes.fetch_add(n as u64, Ordering::Relaxed);
@@ -263,7 +289,6 @@ async fn udp_tx_loop(
tracing::warn!("UDP TX: too many consecutive send errors, stopping");
break;
}
// Back off on ENOBUFS/EAGAIN
tokio::time::sleep(Duration::from_micros(200)).await;
continue;
}
@@ -302,8 +327,10 @@ async fn udp_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut last_seq: Option<u32> = None;
while state.running.load(Ordering::Relaxed) {
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await {
Ok(Ok(n)) if n >= 4 => {
// Use recv_from to accept packets from any source port
// (multi-connection MikroTik sends from multiple ports)
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
Ok(Ok((n, _src))) if n >= 4 => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);