From e3e63a40a08ed121c444af8c69ecbe73c2cf820b Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Tue, 14 Apr 2026 11:39:40 +0400 Subject: [PATCH] feat(nat): wire hard NAT port prediction into call flow (#85) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit End-to-end integration of sequential port prediction: - place_call: spawns background detect_port_allocation() + sends HardNatProbe signal after offer (doesn't delay call setup) - answer_call: same for AcceptTrusted answers (privacy mode skips) - Signal recv loop: stashes HardNatProbe in SignalState.peer_hard_nat_probe - connect: reads peer's probe, if Sequential{delta} runs predict_ports() and adds predicted addrs to PeerCandidates.local for the dual-path race - parse_sequential_delta() helper for "sequential(delta=N)" strings The full flow: both peers independently detect their NAT's port allocation, exchange HardNatProbe via relay, and the connect command uses the peer's sequence to predict which ports to dial — all before the dual-path race starts. 588 tests pass, 0 regressions. Co-Authored-By: Claude Opus 4.6 (1M context) --- desktop/src-tauri/src/lib.rs | 165 ++++++++++++++++++++++++++++++++++- 1 file changed, 164 insertions(+), 1 deletion(-) diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index b90d888..8632548 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -404,9 +404,45 @@ async fn connect( let peer_mapped_parsed: Option = peer_mapped_addr .as_deref() .and_then(|s| s.parse().ok()); + + // Phase 8.6: if peer sent a HardNatProbe with sequential + // allocation, predict their next ports and add as candidates. + let mut predicted_addrs: Vec = Vec::new(); + { + let sig = state.signal.lock().await; + if let Some(ref probe) = sig.peer_hard_nat_probe { + if let Some(delta) = parse_sequential_delta(&probe.allocation) { + if let Some(&last_port) = probe.port_sequence.first() { + let predicted = wzp_client::stun::predict_ports( + last_port, delta, 1, 3, + ); + for p in predicted { + predicted_addrs.push( + std::net::SocketAddr::new(probe.external_ip, p) + ); + } + tracing::info!( + delta, + last_port, + predicted_count = predicted_addrs.len(), + "connect: added predicted ports from HardNatProbe" + ); + emit_call_debug(&app, "connect:hard_nat_predicted", serde_json::json!({ + "delta": delta, + "last_port": last_port, + "predicted": predicted_addrs.iter().map(|a| a.to_string()).collect::>(), + })); + } + } + } + } + + let mut all_local = peer_local_parsed.clone(); + all_local.extend(predicted_addrs); + let candidates = wzp_client::dual_path::PeerCandidates { reflexive: peer_addr_parsed, - local: peer_local_parsed.clone(), + local: all_local, mapped: peer_mapped_parsed, }; tracing::info!( @@ -952,6 +988,25 @@ struct SignalState { /// peer's, it installs a oneshot sender here. The recv loop /// fires it when MediaPathReport arrives. pending_path_report: Option>, + /// Phase 8.6: peer's HardNatProbe data, if received. The connect + /// command reads this to generate predicted port candidates for + /// sequential NATs. + peer_hard_nat_probe: Option, +} + +/// Parsed data from a peer's HardNatProbe signal. +#[derive(Debug, Clone)] +struct PeerHardNatInfo { + external_ip: std::net::IpAddr, + port_sequence: Vec, + allocation: String, +} + +/// Parse "sequential(delta=N)" allocation string into the delta value. +fn parse_sequential_delta(allocation: &str) -> Option { + let s = allocation.strip_prefix("sequential(delta=")?; + let s = s.strip_suffix(')')?; + s.parse().ok() } #[tauri::command] @@ -1255,6 +1310,31 @@ fn do_register_signal( // TODO Phase 8: use IceAgent.apply_peer_update() + // race_upgrade() to attempt transport hot-swap } + Ok(Some(SignalMessage::HardNatProbe { call_id, port_sequence, allocation, probe_time_ms, external_ip })) => { + tracing::info!( + %call_id, + %allocation, + ports = ?port_sequence, + %external_ip, + probe_time_ms, + "signal: HardNatProbe from peer" + ); + emit_call_debug(&app_clone, "recv:HardNatProbe", serde_json::json!({ + "call_id": call_id, + "allocation": allocation, + "port_sequence": port_sequence, + "external_ip": external_ip, + })); + // Stash for the connect command to use in port prediction + if let Ok(ip) = external_ip.parse::() { + let mut sig = signal_state.lock().await; + sig.peer_hard_nat_probe = Some(PeerHardNatInfo { + external_ip: ip, + port_sequence, + allocation, + }); + } + } Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => { // "STUN for QUIC" response — the relay told us our // own server-reflexive address. If a Tauri command @@ -1606,6 +1686,48 @@ async fn place_call( "target_fp": target_fp, "caller_reflexive_addr": own_reflex, })); + + // Phase 8.6: spawn background port allocation detection + HardNatProbe. + // This runs AFTER the offer is sent so it doesn't delay call setup. + // The probe result arrives at the peer before or during the connect + // command, giving both sides time to compute predicted ports. + { + let state_bg = (*state).clone(); + let call_id_bg = call_id.clone(); + tokio::spawn(async move { + let stun_config = wzp_client::stun::StunConfig { + servers: vec![ + "stun.l.google.com:19302".into(), + "stun1.l.google.com:19302".into(), + "stun.cloudflare.com:3478".into(), + ], + timeout: std::time::Duration::from_secs(2), + }; + let result = wzp_client::stun::detect_port_allocation(&stun_config).await; + let alloc_str = result.allocation.to_string(); + tracing::info!( + allocation = %alloc_str, + ports = ?result.observed_ports, + "place_call: port allocation detected, sending HardNatProbe" + ); + let sig = state_bg.signal.lock().await; + if let Some(ref t) = sig.transport { + let _ = t.send_signal(&SignalMessage::HardNatProbe { + call_id: call_id_bg, + port_sequence: result.observed_ports, + allocation: alloc_str, + probe_time_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + external_ip: result.external_ip + .map(|ip| ip.to_string()) + .unwrap_or_default(), + }).await; + } + }); + } + history::log(call_id, target_fp, None, history::CallDirection::Placed); let _ = app.emit("history-changed", ()); Ok(()) @@ -1763,6 +1885,46 @@ async fn answer_call( if mode != 0 && history::mark_received_if_pending(&call_id) { let _ = app.emit("history-changed", ()); } + + // Phase 8.6: send HardNatProbe (AcceptTrusted only — same + // privacy gate as reflexive addr). + if accept_mode == wzp_proto::CallAcceptMode::AcceptTrusted { + let state_bg = (*state).clone(); + let call_id_bg = call_id.clone(); + tokio::spawn(async move { + let stun_config = wzp_client::stun::StunConfig { + servers: vec![ + "stun.l.google.com:19302".into(), + "stun1.l.google.com:19302".into(), + "stun.cloudflare.com:3478".into(), + ], + timeout: std::time::Duration::from_secs(2), + }; + let result = wzp_client::stun::detect_port_allocation(&stun_config).await; + let alloc_str = result.allocation.to_string(); + tracing::info!( + allocation = %alloc_str, + ports = ?result.observed_ports, + "answer_call: port allocation detected, sending HardNatProbe" + ); + let sig = state_bg.signal.lock().await; + if let Some(ref t) = sig.transport { + let _ = t.send_signal(&wzp_proto::SignalMessage::HardNatProbe { + call_id: call_id_bg, + port_sequence: result.observed_ports, + allocation: alloc_str, + probe_time_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + external_ip: result.external_ip + .map(|ip| ip.to_string()) + .unwrap_or_default(), + }).await; + } + }); + } + Ok(()) } @@ -2086,6 +2248,7 @@ pub fn run() { desired_relay_addr: None, reconnect_in_progress: false, pending_path_report: None, + peer_hard_nat_probe: None, })), });