diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index ca50926..646744a 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -132,6 +132,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer, SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane SignalMessage::CandidateUpdate { .. } => CallSignalType::IceCandidate, // mid-call re-gather + SignalMessage::HardNatProbe { .. } => CallSignalType::IceCandidate, // hard NAT coordination SignalMessage::QualityDirective { .. } => CallSignalType::Offer, // relay-initiated } } diff --git a/crates/wzp-client/src/netcheck.rs b/crates/wzp-client/src/netcheck.rs index 3a8a80d..7255199 100644 --- a/crates/wzp-client/src/netcheck.rs +++ b/crates/wzp-client/src/netcheck.rs @@ -54,6 +54,8 @@ pub struct NetcheckReport { pub duration_ms: u32, /// Individual STUN probe results. pub stun_probes: Vec, + /// NAT port allocation pattern (sequential vs random). + pub port_allocation: Option, } /// Latency to a specific relay. @@ -108,9 +110,10 @@ pub async fn run_netcheck(config: &NetcheckConfig) -> NetcheckReport { let portmap_fut = probe_portmap(config.test_portmap, config.local_port); let gateway_fut = portmap::default_gateway(); let ipv6_fut = test_ipv6(config.test_ipv6, config.timeout); + let port_alloc_fut = stun::detect_port_allocation(&config.stun_config); - let (stun_probes, relay_latencies, portmap_result, gateway_result, ipv6_reachable) = - tokio::join!(stun_fut, relay_fut, portmap_fut, gateway_result_fut(gateway_fut), ipv6_fut); + let (stun_probes, relay_latencies, portmap_result, gateway_result, ipv6_reachable, port_alloc_result) = + tokio::join!(stun_fut, relay_fut, portmap_fut, gateway_result_fut(gateway_fut), ipv6_fut, port_alloc_fut); // Classify NAT from STUN probes. let (nat_type, consensus_addr) = reflect::classify_nat(&stun_probes); @@ -168,6 +171,7 @@ pub async fn run_netcheck(config: &NetcheckConfig) -> NetcheckReport { gateway, duration_ms: start.elapsed().as_millis() as u32, stun_probes, + port_allocation: Some(port_alloc_result.allocation), } } @@ -293,6 +297,12 @@ pub fn format_report(report: &NetcheckReport) -> String { report.gateway.as_deref().unwrap_or("(unknown)") )); + if let Some(ref alloc) = report.port_allocation { + out.push_str(&format!( + "Port Alloc: {alloc}\n" + )); + } + out.push_str(&format!("\n--- Port Mapping ---\n")); out.push_str(&format!( "NAT-PMP: {} PCP: {} UPnP: {}\n", @@ -372,6 +382,7 @@ mod tests { gateway: Some("192.168.1.1".into()), duration_ms: 1500, stun_probes: vec![], + port_allocation: None, }; let text = format_report(&report); @@ -399,6 +410,7 @@ mod tests { gateway: Some("192.168.1.1".into()), duration_ms: 500, stun_probes: vec![], + port_allocation: Some(stun::PortAllocation::Sequential { delta: 1 }), }; let json = serde_json::to_string(&report).unwrap(); assert!(json.contains("Cone")); @@ -443,6 +455,7 @@ mod tests { gateway: None, duration_ms: 100, stun_probes: vec![], + port_allocation: None, }; let text = format_report(&report); assert!(text.contains("Unknown")); @@ -487,6 +500,7 @@ mod tests { latency_ms: Some(20), error: None, }], + port_allocation: Some(stun::PortAllocation::Random), }; let text = format_report(&report); assert!(text.contains("SymmetricPort")); diff --git a/crates/wzp-client/src/stun.rs b/crates/wzp-client/src/stun.rs index 73abfc6..983592b 100644 --- a/crates/wzp-client/src/stun.rs +++ b/crates/wzp-client/src/stun.rs @@ -541,6 +541,213 @@ pub async fn probe_stun_servers( results } +// ── Port allocation pattern detection ────────────────────────────── + +/// NAT port allocation pattern, detected by probing multiple STUN +/// servers from a single socket and analyzing the observed external +/// port sequence. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +pub enum PortAllocation { + /// Same external port for all destinations — cone-like NAT. + /// Standard hole-punching works; no hard NAT techniques needed. + PortPreserving, + /// Ports increment by a consistent delta per new flow. + /// Port prediction is viable: next_port = last_port + delta. + Sequential { delta: i16 }, + /// No discernible pattern — truly random allocation. + /// Only birthday attack or relay can traverse this. + Random, + /// Not enough data to classify (< 3 successful probes). + Unknown, +} + +impl std::fmt::Display for PortAllocation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::PortPreserving => write!(f, "port-preserving"), + Self::Sequential { delta } => write!(f, "sequential(delta={delta})"), + Self::Random => write!(f, "random"), + Self::Unknown => write!(f, "unknown"), + } + } +} + +/// Result of port allocation analysis. +#[derive(Debug, Clone, serde::Serialize)] +pub struct PortAllocationResult { + /// Detected allocation pattern. + pub allocation: PortAllocation, + /// Observed external ports (one per successful STUN probe), + /// in probe order. + pub observed_ports: Vec, + /// External IP (consensus from probes, if available). + pub external_ip: Option, +} + +/// Detect the NAT's port allocation pattern by sending STUN probes +/// to multiple servers from a **single socket**. +/// +/// Unlike `probe_stun_servers` (which creates one socket per server +/// for NAT type classification), this uses one socket so we see how +/// the NAT maps the SAME source port to different destinations. +/// +/// - Same external port for all → `PortPreserving` (cone-like) +/// - Consistent delta → `Sequential { delta }` +/// - No pattern → `Random` +/// +/// Requires at least 3 servers for reliable classification. +pub async fn detect_port_allocation( + config: &StunConfig, +) -> PortAllocationResult { + if config.servers.len() < 2 { + return PortAllocationResult { + allocation: PortAllocation::Unknown, + observed_ports: vec![], + external_ip: None, + }; + } + + // Single socket — all probes share the same source port. + let socket = match UdpSocket::bind("0.0.0.0:0").await { + Ok(s) => s, + Err(_) => { + return PortAllocationResult { + allocation: PortAllocation::Unknown, + observed_ports: vec![], + external_ip: None, + }; + } + }; + + // Probe servers SEQUENTIALLY (not parallel) so the NAT sees + // distinct flows in order. Parallel probes could arrive out-of- + // order and confuse sequential delta detection. + let mut observed: Vec = Vec::new(); + for server_str in &config.servers { + let resolved = match resolve_stun_server(server_str).await { + Ok(a) => a, + Err(_) => continue, + }; + match stun_reflect(&socket, resolved, config.timeout).await { + Ok(addr) => observed.push(addr), + Err(_) => continue, + } + } + + let ports: Vec = observed.iter().map(|a| a.port()).collect(); + let external_ip = observed.first().map(|a| a.ip()); + let allocation = classify_port_allocation(&ports); + + tracing::info!( + ?allocation, + ?ports, + external_ip = ?external_ip, + "stun: port allocation detected" + ); + + PortAllocationResult { + allocation, + observed_ports: ports, + external_ip, + } +} + +/// Pure-function classifier — split out for unit testing. +pub fn classify_port_allocation(ports: &[u16]) -> PortAllocation { + if ports.len() < 2 { + return PortAllocation::Unknown; + } + + // All same port? + if ports.iter().all(|&p| p == ports[0]) { + return PortAllocation::PortPreserving; + } + + if ports.len() < 3 { + // With only 2 different ports we can't distinguish + // sequential from random reliably. + return PortAllocation::Unknown; + } + + // Compute deltas between consecutive ports. + let deltas: Vec = ports + .windows(2) + .map(|w| w[1] as i32 - w[0] as i32) + .map(|d| { + // Handle wraparound: if delta is huge negative (e.g., + // 65535 -> 2 = -65533), treat as +3. And vice versa. + if d > 32768 { + (d - 65536) as i16 + } else if d < -32768 { + (d + 65536) as i16 + } else { + d as i16 + } + }) + .collect(); + + // Check if all deltas are the same (sequential pattern). + let first_delta = deltas[0]; + if first_delta == 0 { + // All same port was already handled above, this means + // mixed same/different — not sequential. + return PortAllocation::Random; + } + + // Allow small jitter: if all deltas are within ±1 of each other, + // consider it sequential with the median delta. + let all_close = deltas.iter().all(|&d| (d - first_delta).unsigned_abs() <= 1); + if all_close { + // Use the most common delta (mode). + let median_delta = first_delta; + return PortAllocation::Sequential { delta: median_delta }; + } + + // Check for consistent delta with occasional skip (some NATs + // skip a port when another flow grabs it concurrently). + // If most deltas (>= 60%) agree on the same value, call it + // sequential. + let mut delta_counts = std::collections::HashMap::new(); + for &d in &deltas { + *delta_counts.entry(d).or_insert(0u32) += 1; + } + if let Some((&most_common, &count)) = delta_counts.iter().max_by_key(|(_, v)| *v) { + let threshold = (deltas.len() as f64 * 0.6).ceil() as u32; + if count >= threshold && most_common != 0 { + return PortAllocation::Sequential { delta: most_common }; + } + } + + PortAllocation::Random +} + +/// Predict the next N external ports for a sequential NAT. +/// +/// Given the last observed port and the delta, returns a range of +/// predicted ports centered around the most likely next value. +/// The `offset` parameter accounts for additional flows that may +/// open between the probe and the actual connection attempt. +pub fn predict_ports( + last_port: u16, + delta: i16, + offset: u16, + spread: u16, +) -> Vec { + let base = last_port as i32 + (delta as i32 * (offset as i32 + 1)); + let mut ports = Vec::with_capacity((spread * 2 + 1) as usize); + for i in -(spread as i32)..=(spread as i32) { + let p = base + (i * delta as i32); + // Wrap to valid port range (1..=65535) + let p = ((p % 65536) + 65536) % 65536; + if p > 0 && p <= 65535 { + ports.push(p as u16); + } + } + ports.sort(); + ports.dedup(); + ports +} + // ── Tests ────────────────────────────────────────────────────────── #[cfg(test)] @@ -1032,6 +1239,165 @@ mod tests { assert!(matches!(err, StunError::Io(_))); } + // ── Port allocation classification tests ──────────────────── + + #[test] + fn classify_port_preserving() { + let ports = vec![4433, 4433, 4433, 4433, 4433]; + assert_eq!(classify_port_allocation(&ports), PortAllocation::PortPreserving); + } + + #[test] + fn classify_sequential_delta_1() { + let ports = vec![40001, 40002, 40003, 40004, 40005]; + assert_eq!( + classify_port_allocation(&ports), + PortAllocation::Sequential { delta: 1 } + ); + } + + #[test] + fn classify_sequential_delta_2() { + let ports = vec![50000, 50002, 50004, 50006]; + assert_eq!( + classify_port_allocation(&ports), + PortAllocation::Sequential { delta: 2 } + ); + } + + #[test] + fn classify_sequential_negative_delta() { + // Some NATs decrement + let ports = vec![50000, 49999, 49998, 49997]; + assert_eq!( + classify_port_allocation(&ports), + PortAllocation::Sequential { delta: -1 } + ); + } + + #[test] + fn classify_random() { + let ports = vec![40001, 52847, 19432, 61203, 8847]; + assert_eq!(classify_port_allocation(&ports), PortAllocation::Random); + } + + #[test] + fn classify_too_few_ports() { + assert_eq!(classify_port_allocation(&[]), PortAllocation::Unknown); + assert_eq!(classify_port_allocation(&[4433]), PortAllocation::Unknown); + } + + #[test] + fn classify_two_same_is_preserving() { + let ports = vec![4433, 4433]; + assert_eq!(classify_port_allocation(&ports), PortAllocation::PortPreserving); + } + + #[test] + fn classify_two_different_is_unknown() { + // Can't distinguish sequential from random with only 2 points + let ports = vec![4433, 4434]; + assert_eq!(classify_port_allocation(&ports), PortAllocation::Unknown); + } + + #[test] + fn classify_sequential_with_jitter() { + // Delta is mostly 1 but one jump of 2 (concurrent flow grabbed a port) + let ports = vec![40001, 40002, 40004, 40005, 40006]; + // Deltas: [1, 2, 1, 1] — 3 out of 4 are delta=1, above 60% threshold + assert_eq!( + classify_port_allocation(&ports), + PortAllocation::Sequential { delta: 1 } + ); + } + + #[test] + fn classify_sequential_wraparound() { + // Port wraps from 65534 -> 65535 -> 1 -> 2 + let ports = vec![65534, 65535, 1, 2]; + // Deltas: [1, -65534(→+2), 1] — wraparound handling + let alloc = classify_port_allocation(&ports); + // Should detect as sequential with delta ~1 + assert!( + matches!(alloc, PortAllocation::Sequential { delta: 1 }), + "wraparound should be sequential, got: {alloc:?}" + ); + } + + #[test] + fn predict_ports_sequential() { + // Last port 40005, delta 1, offset 0, spread 2 + let predicted = predict_ports(40005, 1, 0, 2); + assert!(predicted.contains(&40006)); // most likely next + assert!(predicted.contains(&40004)); // spread -2 + assert!(predicted.contains(&40008)); // spread +2 + } + + #[test] + fn predict_ports_delta_2() { + let predicted = predict_ports(50000, 2, 0, 1); + assert!(predicted.contains(&50002)); // next + assert!(predicted.contains(&50000)); // spread -1*delta + assert!(predicted.contains(&50004)); // spread +1*delta + } + + #[test] + fn predict_ports_with_offset() { + // offset=2 means 2 extra flows will open before our dial, + // so prediction jumps further: 40005 + 1*(2+1) = 40008 + let predicted = predict_ports(40005, 1, 2, 1); + assert!(predicted.contains(&40008)); + } + + #[test] + fn predict_ports_wraparound() { + let predicted = predict_ports(65534, 1, 0, 2); + // Should handle the u16 wraparound gracefully + assert!(predicted.contains(&65535)); + assert!(!predicted.is_empty()); + } + + #[test] + fn port_allocation_display() { + assert_eq!(PortAllocation::PortPreserving.to_string(), "port-preserving"); + assert_eq!(PortAllocation::Sequential { delta: 1 }.to_string(), "sequential(delta=1)"); + assert_eq!(PortAllocation::Random.to_string(), "random"); + assert_eq!(PortAllocation::Unknown.to_string(), "unknown"); + } + + #[test] + fn port_allocation_serde() { + let alloc = PortAllocation::Sequential { delta: 3 }; + let json = serde_json::to_string(&alloc).unwrap(); + assert!(json.contains("Sequential")); + assert!(json.contains("3")); + } + + #[test] + fn port_allocation_result_serde() { + let result = PortAllocationResult { + allocation: PortAllocation::Sequential { delta: 1 }, + observed_ports: vec![40001, 40002, 40003], + external_ip: Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 5))), + }; + let json = serde_json::to_string(&result).unwrap(); + assert!(json.contains("Sequential")); + assert!(json.contains("40001")); + assert!(json.contains("203.0.113.5")); + } + + /// Integration test: detect port allocation on real network. + #[tokio::test] + #[ignore] + async fn integration_detect_port_allocation() { + let config = StunConfig::default(); + let result = detect_port_allocation(&config).await; + println!("Port allocation: {:?}", result.allocation); + println!("Observed ports: {:?}", result.observed_ports); + println!("External IP: {:?}", result.external_ip); + assert!(!result.observed_ports.is_empty()); + } + /// Integration test: actually query stun.l.google.com. /// Ignored by default since it requires network access. #[tokio::test] diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index e22d340..bcc08e8 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -947,6 +947,26 @@ pub enum SignalMessage { generation: u32, }, + // ── Hard NAT traversal (port prediction) ────────────────────── + + /// Hard NAT probe coordination — exchanged when both peers + /// detect symmetric NAT. Carries the port allocation pattern + /// and recent port sequence so the peer can predict which port + /// to dial. + HardNatProbe { + call_id: String, + /// Last observed external ports (most recent first). + /// Typically 3-5 entries from sequential STUN probes. + port_sequence: Vec, + /// Detected allocation pattern as string: + /// "sequential:N" (N=delta), "random", "preserving" + allocation: String, + /// Probe timestamp (ms since epoch) for synchronization. + probe_time_ms: u64, + /// External IP from STUN. + external_ip: String, + }, + // ── Phase 4: cross-relay direct-call signaling ──────────────────── /// Phase 4: relay-to-relay envelope for forwarding direct-call diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 82742d2..1de7db9 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -1443,6 +1443,37 @@ async fn main() -> anyhow::Result<()> { } } + // Hard NAT: forward HardNatProbe to call peer + // (same forwarding pattern as CandidateUpdate). + SignalMessage::HardNatProbe { ref call_id, .. } => { + let (peer_fp, peer_relay_fp) = { + let reg = call_registry.lock().await; + match reg.get(call_id) { + Some(c) => ( + reg.peer_fingerprint(call_id, &client_fp) + .map(|s| s.to_string()), + c.peer_relay_fp.clone(), + ), + None => (None, None), + } + }; + + if let Some(fp) = peer_fp { + if let Some(ref origin_fp) = peer_relay_fp { + if let Some(ref fm) = federation_mgr { + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(msg.clone()), + origin_relay_fp: tls_fp.clone(), + }; + let _ = fm.send_signal_to_peer(origin_fp, &forward).await; + } + } else { + let hub = signal_hub.lock().await; + let _ = hub.send_to(&fp, &msg).await; + } + } + } + SignalMessage::Ping { timestamp_ms } => { let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await; }