feat(nat): wire hard NAT port prediction into call flow (#85)
Some checks failed
Mirror to GitHub / mirror (push) Failing after 28s
Build Release Binaries / build-amd64 (push) Failing after 3m27s

End-to-end integration of sequential port prediction:

- place_call: spawns background detect_port_allocation() + sends
  HardNatProbe signal after offer (doesn't delay call setup)
- answer_call: same for AcceptTrusted answers (privacy mode skips)
- Signal recv loop: stashes HardNatProbe in SignalState.peer_hard_nat_probe
- connect: reads peer's probe, if Sequential{delta} runs predict_ports()
  and adds predicted addrs to PeerCandidates.local for the dual-path race
- parse_sequential_delta() helper for "sequential(delta=N)" strings

The full flow: both peers independently detect their NAT's port
allocation, exchange HardNatProbe via relay, and the connect command
uses the peer's sequence to predict which ports to dial — all before
the dual-path race starts.

588 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-14 11:39:40 +04:00
parent 7b4bce69d5
commit e3e63a40a0

View File

@@ -404,9 +404,45 @@ async fn connect(
let peer_mapped_parsed: Option<std::net::SocketAddr> = peer_mapped_addr
.as_deref()
.and_then(|s| s.parse().ok());
// Phase 8.6: if peer sent a HardNatProbe with sequential
// allocation, predict their next ports and add as candidates.
let mut predicted_addrs: Vec<std::net::SocketAddr> = Vec::new();
{
let sig = state.signal.lock().await;
if let Some(ref probe) = sig.peer_hard_nat_probe {
if let Some(delta) = parse_sequential_delta(&probe.allocation) {
if let Some(&last_port) = probe.port_sequence.first() {
let predicted = wzp_client::stun::predict_ports(
last_port, delta, 1, 3,
);
for p in predicted {
predicted_addrs.push(
std::net::SocketAddr::new(probe.external_ip, p)
);
}
tracing::info!(
delta,
last_port,
predicted_count = predicted_addrs.len(),
"connect: added predicted ports from HardNatProbe"
);
emit_call_debug(&app, "connect:hard_nat_predicted", serde_json::json!({
"delta": delta,
"last_port": last_port,
"predicted": predicted_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(),
}));
}
}
}
}
let mut all_local = peer_local_parsed.clone();
all_local.extend(predicted_addrs);
let candidates = wzp_client::dual_path::PeerCandidates {
reflexive: peer_addr_parsed,
local: peer_local_parsed.clone(),
local: all_local,
mapped: peer_mapped_parsed,
};
tracing::info!(
@@ -952,6 +988,25 @@ struct SignalState {
/// peer's, it installs a oneshot sender here. The recv loop
/// fires it when MediaPathReport arrives.
pending_path_report: Option<tokio::sync::oneshot::Sender<bool>>,
/// Phase 8.6: peer's HardNatProbe data, if received. The connect
/// command reads this to generate predicted port candidates for
/// sequential NATs.
peer_hard_nat_probe: Option<PeerHardNatInfo>,
}
/// Parsed data from a peer's HardNatProbe signal.
#[derive(Debug, Clone)]
struct PeerHardNatInfo {
external_ip: std::net::IpAddr,
port_sequence: Vec<u16>,
allocation: String,
}
/// Parse "sequential(delta=N)" allocation string into the delta value.
fn parse_sequential_delta(allocation: &str) -> Option<i16> {
let s = allocation.strip_prefix("sequential(delta=")?;
let s = s.strip_suffix(')')?;
s.parse().ok()
}
#[tauri::command]
@@ -1255,6 +1310,31 @@ fn do_register_signal(
// TODO Phase 8: use IceAgent.apply_peer_update() +
// race_upgrade() to attempt transport hot-swap
}
Ok(Some(SignalMessage::HardNatProbe { call_id, port_sequence, allocation, probe_time_ms, external_ip })) => {
tracing::info!(
%call_id,
%allocation,
ports = ?port_sequence,
%external_ip,
probe_time_ms,
"signal: HardNatProbe from peer"
);
emit_call_debug(&app_clone, "recv:HardNatProbe", serde_json::json!({
"call_id": call_id,
"allocation": allocation,
"port_sequence": port_sequence,
"external_ip": external_ip,
}));
// Stash for the connect command to use in port prediction
if let Ok(ip) = external_ip.parse::<std::net::IpAddr>() {
let mut sig = signal_state.lock().await;
sig.peer_hard_nat_probe = Some(PeerHardNatInfo {
external_ip: ip,
port_sequence,
allocation,
});
}
}
Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => {
// "STUN for QUIC" response — the relay told us our
// own server-reflexive address. If a Tauri command
@@ -1606,6 +1686,48 @@ async fn place_call(
"target_fp": target_fp,
"caller_reflexive_addr": own_reflex,
}));
// Phase 8.6: spawn background port allocation detection + HardNatProbe.
// This runs AFTER the offer is sent so it doesn't delay call setup.
// The probe result arrives at the peer before or during the connect
// command, giving both sides time to compute predicted ports.
{
let state_bg = (*state).clone();
let call_id_bg = call_id.clone();
tokio::spawn(async move {
let stun_config = wzp_client::stun::StunConfig {
servers: vec![
"stun.l.google.com:19302".into(),
"stun1.l.google.com:19302".into(),
"stun.cloudflare.com:3478".into(),
],
timeout: std::time::Duration::from_secs(2),
};
let result = wzp_client::stun::detect_port_allocation(&stun_config).await;
let alloc_str = result.allocation.to_string();
tracing::info!(
allocation = %alloc_str,
ports = ?result.observed_ports,
"place_call: port allocation detected, sending HardNatProbe"
);
let sig = state_bg.signal.lock().await;
if let Some(ref t) = sig.transport {
let _ = t.send_signal(&SignalMessage::HardNatProbe {
call_id: call_id_bg,
port_sequence: result.observed_ports,
allocation: alloc_str,
probe_time_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
external_ip: result.external_ip
.map(|ip| ip.to_string())
.unwrap_or_default(),
}).await;
}
});
}
history::log(call_id, target_fp, None, history::CallDirection::Placed);
let _ = app.emit("history-changed", ());
Ok(())
@@ -1763,6 +1885,46 @@ async fn answer_call(
if mode != 0 && history::mark_received_if_pending(&call_id) {
let _ = app.emit("history-changed", ());
}
// Phase 8.6: send HardNatProbe (AcceptTrusted only — same
// privacy gate as reflexive addr).
if accept_mode == wzp_proto::CallAcceptMode::AcceptTrusted {
let state_bg = (*state).clone();
let call_id_bg = call_id.clone();
tokio::spawn(async move {
let stun_config = wzp_client::stun::StunConfig {
servers: vec![
"stun.l.google.com:19302".into(),
"stun1.l.google.com:19302".into(),
"stun.cloudflare.com:3478".into(),
],
timeout: std::time::Duration::from_secs(2),
};
let result = wzp_client::stun::detect_port_allocation(&stun_config).await;
let alloc_str = result.allocation.to_string();
tracing::info!(
allocation = %alloc_str,
ports = ?result.observed_ports,
"answer_call: port allocation detected, sending HardNatProbe"
);
let sig = state_bg.signal.lock().await;
if let Some(ref t) = sig.transport {
let _ = t.send_signal(&wzp_proto::SignalMessage::HardNatProbe {
call_id: call_id_bg,
port_sequence: result.observed_ports,
allocation: alloc_str,
probe_time_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
external_ip: result.external_ip
.map(|ip| ip.to_string())
.unwrap_or_default(),
}).await;
}
});
}
Ok(())
}
@@ -2086,6 +2248,7 @@ pub fn run() {
desired_relay_addr: None,
reconnect_in_progress: false,
pending_path_report: None,
peer_hard_nat_probe: None,
})),
});