feat(nat): smart candidate filtering + acceptor NAT tickle + 4s timeout
Some checks failed
Mirror to GitHub / mirror (push) Failing after 24s
Build Release Binaries / build-amd64 (push) Failing after 3m33s

Major P2P improvements for cross-network calls:

Smart candidate filtering (smart_dial_order):
- Strip LAN candidates when peer's public IP differs from ours
  (172.16.x.x is unreachable from a different network)
- Strip all IPv6 candidates (Phase 7 disabled, wastes dial slots)
- Only keep mapped + reflexive for cross-network calls
- LAN candidates preserved when both peers share the same public IP

Acceptor NAT tickle:
- A-role sends a 1-byte UDP packet to each peer candidate BEFORE
  accepting. This opens the NAT pinhole for return traffic from
  the Dialer's IP — critical for address-restricted NATs that only
  allow inbound from IPs they've seen outbound traffic to.
- Uses SO_REUSEADDR on the same port as the quinn endpoint.

Direct timeout increased from 2s to 4s:
- Cross-network QUIC handshakes through CGNAT can take 2-3s
- 2s was too aggressive for 5G/LTE networks

Diagnostic fix:
- Record "timeout:4s" for candidates still in-flight when the
  timeout fires (previously these had no diagnostic entry)

5 new tests for smart_dial_order edge cases.
593 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-14 15:42:02 +04:00
parent c478224d67
commit bc6d327ebb
3 changed files with 202 additions and 4 deletions

View File

@@ -130,6 +130,50 @@ impl PeerCandidates {
out out
} }
/// Smart dial order: filters out candidates that can't possibly
/// work given our own reflexive address.
///
/// - **LAN candidates**: only included if peer's public IP
/// matches ours (same network). Private IPs are unreachable
/// cross-network.
/// - **IPv6 candidates**: stripped entirely (Phase 7 disabled).
/// - **Reflexive + mapped**: always included.
pub fn smart_dial_order(&self, own_reflexive: Option<&SocketAddr>) -> Vec<SocketAddr> {
let own_public_ip = own_reflexive.map(|a| a.ip());
let peer_public_ip = self.reflexive.map(|a| a.ip());
let same_network = match (own_public_ip, peer_public_ip) {
(Some(a), Some(b)) => a == b,
_ => false,
};
let mut out = Vec::with_capacity(self.local.len() + 2);
// LAN candidates only when on the same network.
if same_network {
for addr in &self.local {
if !addr.is_ipv6() {
out.push(*addr);
}
}
}
// Port-mapped (always useful — it's a public addr).
if let Some(a) = self.mapped {
if !a.is_ipv6() && !out.contains(&a) {
out.push(a);
}
}
// Reflexive (always useful — it's the peer's public addr).
if let Some(a) = self.reflexive {
if !a.is_ipv6() && !out.contains(&a) {
out.push(a);
}
}
out
}
/// Is there anything for the D-role to dial? If not, the /// Is there anything for the D-role to dial? If not, the
/// race reduces to relay-only. /// race reduces to relay-only.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
@@ -144,6 +188,9 @@ pub async fn race(
relay_addr: SocketAddr, relay_addr: SocketAddr,
room_sni: String, room_sni: String,
call_sni: String, call_sni: String,
// Our own reflexive address — used to filter LAN candidates
// that can't work cross-network.
own_reflexive: Option<SocketAddr>,
// Phase 5: when `Some`, reuse this endpoint for BOTH the // Phase 5: when `Some`, reuse this endpoint for BOTH the
// direct-path branch AND the relay dial. Pass the signal // direct-path branch AND the relay dial. Pass the signal
// endpoint. The endpoint MUST be server-capable (created // endpoint. The endpoint MUST be server-capable (created
@@ -222,7 +269,49 @@ pub async fn race(
// as dial — IPv6 connections die on datagram send). // as dial — IPv6 connections die on datagram send).
// Accept on IPv4 shared endpoint only. // Accept on IPv4 shared endpoint only.
let _v6_ep_unused = ipv6_endpoint.clone(); let _v6_ep_unused = ipv6_endpoint.clone();
// Collect peer addrs for NAT tickle (Acceptor-side).
let tickle_addrs: Vec<SocketAddr> = peer_candidates
.smart_dial_order(own_reflexive.as_ref())
.into_iter()
.filter(|a| !a.ip().is_loopback() && !a.ip().is_unspecified())
.collect();
direct_fut = Box::pin(async move { direct_fut = Box::pin(async move {
// NAT tickle: send a small UDP packet to each of the
// Dialer's candidate addresses FROM our shared endpoint.
// This opens our NAT's pinhole for return traffic from
// those IPs — critical for address-restricted NATs that
// only allow inbound from IPs they've seen outbound
// traffic to. Without this, the Dialer's QUIC Initial
// gets dropped by our NAT.
if !tickle_addrs.is_empty() {
if let Ok(local_addr) = ep_for_fut.local_addr() {
// We can't send raw UDP on the quinn endpoint,
// so we use a fresh socket on the SAME port
// (SO_REUSEADDR). This makes the NAT see
// outbound traffic from our port to the peer,
// opening the pinhole.
let bind = SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
local_addr.port(),
);
if let Ok(tickle_sock) = tokio::net::UdpSocket::bind(bind).await {
for addr in &tickle_addrs {
// Send a minimal QUIC-like packet (version
// negotiation bait). The content doesn't
// matter — we just need the NAT to see
// outbound traffic from our port to this IP.
let tickle_bytes = [0u8; 1];
let _ = tickle_sock.send_to(&tickle_bytes, addr).await;
tracing::info!(
%addr,
local_port = local_addr.port(),
"dual_path: A-role sent NAT tickle"
);
}
}
}
}
// Accept loop: retry if we get a stale/closed // Accept loop: retry if we get a stale/closed
// connection from a previous call. Max 3 retries // connection from a previous call. Max 3 retries
// to avoid spinning until the race timeout. // to avoid spinning until the race timeout.
@@ -296,7 +385,7 @@ pub async fn race(
}; };
let ep_for_fut = ep.clone(); let ep_for_fut = ep.clone();
let _v6_ep_for_dial = ipv6_endpoint.clone(); let _v6_ep_for_dial = ipv6_endpoint.clone();
let dial_order = peer_candidates.dial_order(); let dial_order = peer_candidates.smart_dial_order(own_reflexive.as_ref());
let sni = call_sni.clone(); let sni = call_sni.clone();
let diags = diags_collector.clone(); let diags = diags_collector.clone();
direct_fut = Box::pin(async move { direct_fut = Box::pin(async move {
@@ -478,15 +567,18 @@ pub async fn race(
// RaceResult with both transports (when available) and uses the // RaceResult with both transports (when available) and uses the
// Phase 6 MediaPathReport exchange to decide which one to // Phase 6 MediaPathReport exchange to decide which one to
// actually use for media. // actually use for media.
let smart_order = peer_candidates.smart_dial_order(own_reflexive.as_ref());
tracing::info!( tracing::info!(
?role, ?role,
candidates = ?peer_candidates.dial_order(), raw_candidates = ?peer_candidates.dial_order(),
filtered_candidates = ?smart_order,
?own_reflexive,
%relay_addr, %relay_addr,
"dual_path: racing direct vs relay" "dual_path: racing direct vs relay"
); );
let mut direct_task = tokio::spawn( let mut direct_task = tokio::spawn(
tokio::time::timeout(Duration::from_secs(2), direct_fut), tokio::time::timeout(Duration::from_secs(4), direct_fut),
); );
let mut relay_task = tokio::spawn(async move { let mut relay_task = tokio::spawn(async move {
// Keep the 500ms head start so direct has a chance // Keep the 500ms head start so direct has a chance
@@ -519,9 +611,25 @@ pub async fn race(
local_winner = WinningPath::Relay; // direct failed → relay is our only hope local_winner = WinningPath::Relay; // direct failed → relay is our only hope
} }
Ok(Err(_)) => { Ok(Err(_)) => {
tracing::warn!("dual_path: direct timed out (2s)"); tracing::warn!("dual_path: direct timed out (4s)");
direct_result = Some(Err(anyhow::anyhow!("direct timeout"))); direct_result = Some(Err(anyhow::anyhow!("direct timeout")));
local_winner = WinningPath::Relay; local_winner = WinningPath::Relay;
// Record timeout diag for candidates that were
// still in-flight when the timeout fired.
if let Ok(mut d) = diags_collector.lock() {
let recorded_indices: std::collections::HashSet<usize> =
d.iter().map(|diag| diag.index).collect();
for (idx, addr) in smart_order.iter().enumerate() {
if !recorded_indices.contains(&idx) {
d.push(CandidateDiag {
index: idx,
addr: addr.to_string(),
result: "timeout:4s".into(),
elapsed_ms: Some(4000),
});
}
}
}
} }
Err(e) => { Err(e) => {
tracing::warn!(error = %e, "dual_path: direct task panicked"); tracing::warn!(error = %e, "dual_path: direct task panicked");
@@ -721,4 +829,88 @@ mod tests {
let _ = format!("{:?}", WinningPath::Direct); let _ = format!("{:?}", WinningPath::Direct);
let _ = format!("{:?}", WinningPath::Relay); let _ = format!("{:?}", WinningPath::Relay);
} }
// ── smart_dial_order tests ─────────────────────────────────
#[test]
fn smart_dial_order_same_network_includes_lan() {
let candidates = PeerCandidates {
reflexive: Some("203.0.113.5:4433".parse().unwrap()),
local: vec![
"192.168.1.10:4433".parse().unwrap(),
"10.0.0.5:4433".parse().unwrap(),
],
mapped: None,
};
let own: SocketAddr = "203.0.113.5:12345".parse().unwrap();
let order = candidates.smart_dial_order(Some(&own));
// Same public IP → LAN candidates included
assert!(order.contains(&"192.168.1.10:4433".parse().unwrap()));
assert!(order.contains(&"10.0.0.5:4433".parse().unwrap()));
assert!(order.contains(&"203.0.113.5:4433".parse().unwrap()));
}
#[test]
fn smart_dial_order_different_network_strips_lan() {
let candidates = PeerCandidates {
reflexive: Some("150.228.49.65:4433".parse().unwrap()),
local: vec![
"172.16.81.126:4433".parse().unwrap(),
"10.0.0.5:4433".parse().unwrap(),
],
mapped: None,
};
// Different public IP → LAN candidates stripped
let own: SocketAddr = "185.115.4.212:12345".parse().unwrap();
let order = candidates.smart_dial_order(Some(&own));
assert!(!order.contains(&"172.16.81.126:4433".parse().unwrap()));
assert!(!order.contains(&"10.0.0.5:4433".parse().unwrap()));
// Reflexive still included
assert!(order.contains(&"150.228.49.65:4433".parse().unwrap()));
}
#[test]
fn smart_dial_order_strips_ipv6() {
let candidates = PeerCandidates {
reflexive: Some("150.228.49.65:4433".parse().unwrap()),
local: vec![
"[2a0d:3344:692c::1]:4433".parse().unwrap(),
"172.16.81.126:4433".parse().unwrap(),
],
mapped: None,
};
// Same network, but IPv6 should be stripped
let own: SocketAddr = "150.228.49.65:5555".parse().unwrap();
let order = candidates.smart_dial_order(Some(&own));
assert!(!order.iter().any(|a| a.is_ipv6()));
assert!(order.contains(&"172.16.81.126:4433".parse().unwrap()));
}
#[test]
fn smart_dial_order_no_own_reflexive_strips_lan() {
let candidates = PeerCandidates {
reflexive: Some("150.228.49.65:4433".parse().unwrap()),
local: vec!["172.16.81.126:4433".parse().unwrap()],
mapped: Some("198.51.100.42:12345".parse().unwrap()),
};
// No own reflexive → can't determine same network → strip LAN
let order = candidates.smart_dial_order(None);
assert!(!order.contains(&"172.16.81.126:4433".parse().unwrap()));
assert!(order.contains(&"198.51.100.42:12345".parse().unwrap()));
assert!(order.contains(&"150.228.49.65:4433".parse().unwrap()));
}
#[test]
fn smart_dial_order_mapped_always_included() {
let candidates = PeerCandidates {
reflexive: Some("150.228.49.65:4433".parse().unwrap()),
local: vec![],
mapped: Some("198.51.100.42:12345".parse().unwrap()),
};
let own: SocketAddr = "185.115.4.212:12345".parse().unwrap();
let order = candidates.smart_dial_order(Some(&own));
assert_eq!(order.len(), 2); // mapped + reflexive
assert!(order.contains(&"198.51.100.42:12345".parse().unwrap()));
assert!(order.contains(&"150.228.49.65:4433".parse().unwrap()));
}
} }

View File

@@ -118,6 +118,7 @@ async fn dual_path_direct_wins_on_loopback() {
relay_addr, relay_addr,
"test-room".into(), "test-room".into(),
"call-test".into(), "call-test".into(),
None, // own_reflexive: not needed in tests
None, // Phase 5: tests use fresh endpoints (no shared signal) None, // Phase 5: tests use fresh endpoints (no shared signal)
None, // Phase 7: no IPv6 endpoint in tests None, // Phase 7: no IPv6 endpoint in tests
) )
@@ -162,6 +163,7 @@ async fn dual_path_relay_wins_when_direct_is_dead() {
relay_addr, relay_addr,
"test-room".into(), "test-room".into(),
"call-test".into(), "call-test".into(),
None, // own_reflexive: not needed in tests
None, // Phase 5: tests use fresh endpoints (no shared signal) None, // Phase 5: tests use fresh endpoints (no shared signal)
None, // Phase 7: no IPv6 endpoint in tests None, // Phase 7: no IPv6 endpoint in tests
) )
@@ -202,6 +204,7 @@ async fn dual_path_errors_cleanly_when_both_paths_dead() {
dead_relay, dead_relay,
"test-room".into(), "test-room".into(),
"call-test".into(), "call-test".into(),
None, // own_reflexive: not needed in tests
None, // Phase 5: tests use fresh endpoints (no shared signal) None, // Phase 5: tests use fresh endpoints (no shared signal)
None, // Phase 7: no IPv6 endpoint in tests None, // Phase 7: no IPv6 endpoint in tests
) )

View File

@@ -476,12 +476,15 @@ async fn connect(
let room_sni = room.clone(); let room_sni = room.clone();
let call_sni = format!("call-{room}"); let call_sni = format!("call-{room}");
let own_reflex_parsed: Option<std::net::SocketAddr> =
own_reflex_addr.as_deref().and_then(|s| s.parse().ok());
match wzp_client::dual_path::race( match wzp_client::dual_path::race(
r, r,
candidates, candidates,
relay_sockaddr, relay_sockaddr,
room_sni, room_sni,
call_sni, call_sni,
own_reflex_parsed,
signal_endpoint_for_race.clone(), signal_endpoint_for_race.clone(),
ipv6_endpoint_for_race.clone(), ipv6_endpoint_for_race.clone(),
) )