feat(nat): hard NAT port allocation detection + prediction + HardNatProbe signal (#29)
Some checks failed
Mirror to GitHub / mirror (push) Failing after 31s
Build Release Binaries / build-amd64 (push) Failing after 3m30s

Phase A of hard NAT traversal (PRD-hard-nat.md):

- PortAllocation enum: PortPreserving / Sequential{delta} / Random / Unknown
- detect_port_allocation(): sequential STUN probes from single socket,
  analyzes port sequence for allocation pattern
- classify_port_allocation(): pure function with jitter tolerance,
  wraparound handling, 60% threshold for noisy sequences
- predict_ports(): generates target port range from last_port + delta
- HardNatProbe signal message: carries port_sequence, allocation
  pattern, external_ip for peer coordination
- Relay forwards HardNatProbe to call peer
- Netcheck gains port_allocation field + format_report display

588 tests pass (17 new), 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-14 11:29:35 +04:00
parent ee14862376
commit ec1bdf3cd5
5 changed files with 434 additions and 2 deletions

View File

@@ -132,6 +132,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer, SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer,
SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane
SignalMessage::CandidateUpdate { .. } => CallSignalType::IceCandidate, // mid-call re-gather SignalMessage::CandidateUpdate { .. } => CallSignalType::IceCandidate, // mid-call re-gather
SignalMessage::HardNatProbe { .. } => CallSignalType::IceCandidate, // hard NAT coordination
SignalMessage::QualityDirective { .. } => CallSignalType::Offer, // relay-initiated SignalMessage::QualityDirective { .. } => CallSignalType::Offer, // relay-initiated
} }
} }

View File

@@ -54,6 +54,8 @@ pub struct NetcheckReport {
pub duration_ms: u32, pub duration_ms: u32,
/// Individual STUN probe results. /// Individual STUN probe results.
pub stun_probes: Vec<reflect::NatProbeResult>, pub stun_probes: Vec<reflect::NatProbeResult>,
/// NAT port allocation pattern (sequential vs random).
pub port_allocation: Option<stun::PortAllocation>,
} }
/// Latency to a specific relay. /// Latency to a specific relay.
@@ -108,9 +110,10 @@ pub async fn run_netcheck(config: &NetcheckConfig) -> NetcheckReport {
let portmap_fut = probe_portmap(config.test_portmap, config.local_port); let portmap_fut = probe_portmap(config.test_portmap, config.local_port);
let gateway_fut = portmap::default_gateway(); let gateway_fut = portmap::default_gateway();
let ipv6_fut = test_ipv6(config.test_ipv6, config.timeout); let ipv6_fut = test_ipv6(config.test_ipv6, config.timeout);
let port_alloc_fut = stun::detect_port_allocation(&config.stun_config);
let (stun_probes, relay_latencies, portmap_result, gateway_result, ipv6_reachable) = let (stun_probes, relay_latencies, portmap_result, gateway_result, ipv6_reachable, port_alloc_result) =
tokio::join!(stun_fut, relay_fut, portmap_fut, gateway_result_fut(gateway_fut), ipv6_fut); tokio::join!(stun_fut, relay_fut, portmap_fut, gateway_result_fut(gateway_fut), ipv6_fut, port_alloc_fut);
// Classify NAT from STUN probes. // Classify NAT from STUN probes.
let (nat_type, consensus_addr) = reflect::classify_nat(&stun_probes); let (nat_type, consensus_addr) = reflect::classify_nat(&stun_probes);
@@ -168,6 +171,7 @@ pub async fn run_netcheck(config: &NetcheckConfig) -> NetcheckReport {
gateway, gateway,
duration_ms: start.elapsed().as_millis() as u32, duration_ms: start.elapsed().as_millis() as u32,
stun_probes, stun_probes,
port_allocation: Some(port_alloc_result.allocation),
} }
} }
@@ -293,6 +297,12 @@ pub fn format_report(report: &NetcheckReport) -> String {
report.gateway.as_deref().unwrap_or("(unknown)") report.gateway.as_deref().unwrap_or("(unknown)")
)); ));
if let Some(ref alloc) = report.port_allocation {
out.push_str(&format!(
"Port Alloc: {alloc}\n"
));
}
out.push_str(&format!("\n--- Port Mapping ---\n")); out.push_str(&format!("\n--- Port Mapping ---\n"));
out.push_str(&format!( out.push_str(&format!(
"NAT-PMP: {} PCP: {} UPnP: {}\n", "NAT-PMP: {} PCP: {} UPnP: {}\n",
@@ -372,6 +382,7 @@ mod tests {
gateway: Some("192.168.1.1".into()), gateway: Some("192.168.1.1".into()),
duration_ms: 1500, duration_ms: 1500,
stun_probes: vec![], stun_probes: vec![],
port_allocation: None,
}; };
let text = format_report(&report); let text = format_report(&report);
@@ -399,6 +410,7 @@ mod tests {
gateway: Some("192.168.1.1".into()), gateway: Some("192.168.1.1".into()),
duration_ms: 500, duration_ms: 500,
stun_probes: vec![], stun_probes: vec![],
port_allocation: Some(stun::PortAllocation::Sequential { delta: 1 }),
}; };
let json = serde_json::to_string(&report).unwrap(); let json = serde_json::to_string(&report).unwrap();
assert!(json.contains("Cone")); assert!(json.contains("Cone"));
@@ -443,6 +455,7 @@ mod tests {
gateway: None, gateway: None,
duration_ms: 100, duration_ms: 100,
stun_probes: vec![], stun_probes: vec![],
port_allocation: None,
}; };
let text = format_report(&report); let text = format_report(&report);
assert!(text.contains("Unknown")); assert!(text.contains("Unknown"));
@@ -487,6 +500,7 @@ mod tests {
latency_ms: Some(20), latency_ms: Some(20),
error: None, error: None,
}], }],
port_allocation: Some(stun::PortAllocation::Random),
}; };
let text = format_report(&report); let text = format_report(&report);
assert!(text.contains("SymmetricPort")); assert!(text.contains("SymmetricPort"));

View File

@@ -541,6 +541,213 @@ pub async fn probe_stun_servers(
results results
} }
// ── Port allocation pattern detection ──────────────────────────────
/// NAT port allocation pattern, detected by probing multiple STUN
/// servers from a single socket and analyzing the observed external
/// port sequence.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
pub enum PortAllocation {
/// Same external port for all destinations — cone-like NAT.
/// Standard hole-punching works; no hard NAT techniques needed.
PortPreserving,
/// Ports increment by a consistent delta per new flow.
/// Port prediction is viable: next_port = last_port + delta.
Sequential { delta: i16 },
/// No discernible pattern — truly random allocation.
/// Only birthday attack or relay can traverse this.
Random,
/// Not enough data to classify (< 3 successful probes).
Unknown,
}
impl std::fmt::Display for PortAllocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::PortPreserving => write!(f, "port-preserving"),
Self::Sequential { delta } => write!(f, "sequential(delta={delta})"),
Self::Random => write!(f, "random"),
Self::Unknown => write!(f, "unknown"),
}
}
}
/// Result of port allocation analysis.
#[derive(Debug, Clone, serde::Serialize)]
pub struct PortAllocationResult {
/// Detected allocation pattern.
pub allocation: PortAllocation,
/// Observed external ports (one per successful STUN probe),
/// in probe order.
pub observed_ports: Vec<u16>,
/// External IP (consensus from probes, if available).
pub external_ip: Option<IpAddr>,
}
/// Detect the NAT's port allocation pattern by sending STUN probes
/// to multiple servers from a **single socket**.
///
/// Unlike `probe_stun_servers` (which creates one socket per server
/// for NAT type classification), this uses one socket so we see how
/// the NAT maps the SAME source port to different destinations.
///
/// - Same external port for all → `PortPreserving` (cone-like)
/// - Consistent delta → `Sequential { delta }`
/// - No pattern → `Random`
///
/// Requires at least 3 servers for reliable classification.
pub async fn detect_port_allocation(
config: &StunConfig,
) -> PortAllocationResult {
if config.servers.len() < 2 {
return PortAllocationResult {
allocation: PortAllocation::Unknown,
observed_ports: vec![],
external_ip: None,
};
}
// Single socket — all probes share the same source port.
let socket = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(_) => {
return PortAllocationResult {
allocation: PortAllocation::Unknown,
observed_ports: vec![],
external_ip: None,
};
}
};
// Probe servers SEQUENTIALLY (not parallel) so the NAT sees
// distinct flows in order. Parallel probes could arrive out-of-
// order and confuse sequential delta detection.
let mut observed: Vec<SocketAddr> = Vec::new();
for server_str in &config.servers {
let resolved = match resolve_stun_server(server_str).await {
Ok(a) => a,
Err(_) => continue,
};
match stun_reflect(&socket, resolved, config.timeout).await {
Ok(addr) => observed.push(addr),
Err(_) => continue,
}
}
let ports: Vec<u16> = observed.iter().map(|a| a.port()).collect();
let external_ip = observed.first().map(|a| a.ip());
let allocation = classify_port_allocation(&ports);
tracing::info!(
?allocation,
?ports,
external_ip = ?external_ip,
"stun: port allocation detected"
);
PortAllocationResult {
allocation,
observed_ports: ports,
external_ip,
}
}
/// Pure-function classifier — split out for unit testing.
pub fn classify_port_allocation(ports: &[u16]) -> PortAllocation {
if ports.len() < 2 {
return PortAllocation::Unknown;
}
// All same port?
if ports.iter().all(|&p| p == ports[0]) {
return PortAllocation::PortPreserving;
}
if ports.len() < 3 {
// With only 2 different ports we can't distinguish
// sequential from random reliably.
return PortAllocation::Unknown;
}
// Compute deltas between consecutive ports.
let deltas: Vec<i16> = ports
.windows(2)
.map(|w| w[1] as i32 - w[0] as i32)
.map(|d| {
// Handle wraparound: if delta is huge negative (e.g.,
// 65535 -> 2 = -65533), treat as +3. And vice versa.
if d > 32768 {
(d - 65536) as i16
} else if d < -32768 {
(d + 65536) as i16
} else {
d as i16
}
})
.collect();
// Check if all deltas are the same (sequential pattern).
let first_delta = deltas[0];
if first_delta == 0 {
// All same port was already handled above, this means
// mixed same/different — not sequential.
return PortAllocation::Random;
}
// Allow small jitter: if all deltas are within ±1 of each other,
// consider it sequential with the median delta.
let all_close = deltas.iter().all(|&d| (d - first_delta).unsigned_abs() <= 1);
if all_close {
// Use the most common delta (mode).
let median_delta = first_delta;
return PortAllocation::Sequential { delta: median_delta };
}
// Check for consistent delta with occasional skip (some NATs
// skip a port when another flow grabs it concurrently).
// If most deltas (>= 60%) agree on the same value, call it
// sequential.
let mut delta_counts = std::collections::HashMap::new();
for &d in &deltas {
*delta_counts.entry(d).or_insert(0u32) += 1;
}
if let Some((&most_common, &count)) = delta_counts.iter().max_by_key(|(_, v)| *v) {
let threshold = (deltas.len() as f64 * 0.6).ceil() as u32;
if count >= threshold && most_common != 0 {
return PortAllocation::Sequential { delta: most_common };
}
}
PortAllocation::Random
}
/// Predict the next N external ports for a sequential NAT.
///
/// Given the last observed port and the delta, returns a range of
/// predicted ports centered around the most likely next value.
/// The `offset` parameter accounts for additional flows that may
/// open between the probe and the actual connection attempt.
pub fn predict_ports(
last_port: u16,
delta: i16,
offset: u16,
spread: u16,
) -> Vec<u16> {
let base = last_port as i32 + (delta as i32 * (offset as i32 + 1));
let mut ports = Vec::with_capacity((spread * 2 + 1) as usize);
for i in -(spread as i32)..=(spread as i32) {
let p = base + (i * delta as i32);
// Wrap to valid port range (1..=65535)
let p = ((p % 65536) + 65536) % 65536;
if p > 0 && p <= 65535 {
ports.push(p as u16);
}
}
ports.sort();
ports.dedup();
ports
}
// ── Tests ────────────────────────────────────────────────────────── // ── Tests ──────────────────────────────────────────────────────────
#[cfg(test)] #[cfg(test)]
@@ -1032,6 +1239,165 @@ mod tests {
assert!(matches!(err, StunError::Io(_))); assert!(matches!(err, StunError::Io(_)));
} }
// ── Port allocation classification tests ────────────────────
#[test]
fn classify_port_preserving() {
let ports = vec![4433, 4433, 4433, 4433, 4433];
assert_eq!(classify_port_allocation(&ports), PortAllocation::PortPreserving);
}
#[test]
fn classify_sequential_delta_1() {
let ports = vec![40001, 40002, 40003, 40004, 40005];
assert_eq!(
classify_port_allocation(&ports),
PortAllocation::Sequential { delta: 1 }
);
}
#[test]
fn classify_sequential_delta_2() {
let ports = vec![50000, 50002, 50004, 50006];
assert_eq!(
classify_port_allocation(&ports),
PortAllocation::Sequential { delta: 2 }
);
}
#[test]
fn classify_sequential_negative_delta() {
// Some NATs decrement
let ports = vec![50000, 49999, 49998, 49997];
assert_eq!(
classify_port_allocation(&ports),
PortAllocation::Sequential { delta: -1 }
);
}
#[test]
fn classify_random() {
let ports = vec![40001, 52847, 19432, 61203, 8847];
assert_eq!(classify_port_allocation(&ports), PortAllocation::Random);
}
#[test]
fn classify_too_few_ports() {
assert_eq!(classify_port_allocation(&[]), PortAllocation::Unknown);
assert_eq!(classify_port_allocation(&[4433]), PortAllocation::Unknown);
}
#[test]
fn classify_two_same_is_preserving() {
let ports = vec![4433, 4433];
assert_eq!(classify_port_allocation(&ports), PortAllocation::PortPreserving);
}
#[test]
fn classify_two_different_is_unknown() {
// Can't distinguish sequential from random with only 2 points
let ports = vec![4433, 4434];
assert_eq!(classify_port_allocation(&ports), PortAllocation::Unknown);
}
#[test]
fn classify_sequential_with_jitter() {
// Delta is mostly 1 but one jump of 2 (concurrent flow grabbed a port)
let ports = vec![40001, 40002, 40004, 40005, 40006];
// Deltas: [1, 2, 1, 1] — 3 out of 4 are delta=1, above 60% threshold
assert_eq!(
classify_port_allocation(&ports),
PortAllocation::Sequential { delta: 1 }
);
}
#[test]
fn classify_sequential_wraparound() {
// Port wraps from 65534 -> 65535 -> 1 -> 2
let ports = vec![65534, 65535, 1, 2];
// Deltas: [1, -65534(→+2), 1] — wraparound handling
let alloc = classify_port_allocation(&ports);
// Should detect as sequential with delta ~1
assert!(
matches!(alloc, PortAllocation::Sequential { delta: 1 }),
"wraparound should be sequential, got: {alloc:?}"
);
}
#[test]
fn predict_ports_sequential() {
// Last port 40005, delta 1, offset 0, spread 2
let predicted = predict_ports(40005, 1, 0, 2);
assert!(predicted.contains(&40006)); // most likely next
assert!(predicted.contains(&40004)); // spread -2
assert!(predicted.contains(&40008)); // spread +2
}
#[test]
fn predict_ports_delta_2() {
let predicted = predict_ports(50000, 2, 0, 1);
assert!(predicted.contains(&50002)); // next
assert!(predicted.contains(&50000)); // spread -1*delta
assert!(predicted.contains(&50004)); // spread +1*delta
}
#[test]
fn predict_ports_with_offset() {
// offset=2 means 2 extra flows will open before our dial,
// so prediction jumps further: 40005 + 1*(2+1) = 40008
let predicted = predict_ports(40005, 1, 2, 1);
assert!(predicted.contains(&40008));
}
#[test]
fn predict_ports_wraparound() {
let predicted = predict_ports(65534, 1, 0, 2);
// Should handle the u16 wraparound gracefully
assert!(predicted.contains(&65535));
assert!(!predicted.is_empty());
}
#[test]
fn port_allocation_display() {
assert_eq!(PortAllocation::PortPreserving.to_string(), "port-preserving");
assert_eq!(PortAllocation::Sequential { delta: 1 }.to_string(), "sequential(delta=1)");
assert_eq!(PortAllocation::Random.to_string(), "random");
assert_eq!(PortAllocation::Unknown.to_string(), "unknown");
}
#[test]
fn port_allocation_serde() {
let alloc = PortAllocation::Sequential { delta: 3 };
let json = serde_json::to_string(&alloc).unwrap();
assert!(json.contains("Sequential"));
assert!(json.contains("3"));
}
#[test]
fn port_allocation_result_serde() {
let result = PortAllocationResult {
allocation: PortAllocation::Sequential { delta: 1 },
observed_ports: vec![40001, 40002, 40003],
external_ip: Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 5))),
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("Sequential"));
assert!(json.contains("40001"));
assert!(json.contains("203.0.113.5"));
}
/// Integration test: detect port allocation on real network.
#[tokio::test]
#[ignore]
async fn integration_detect_port_allocation() {
let config = StunConfig::default();
let result = detect_port_allocation(&config).await;
println!("Port allocation: {:?}", result.allocation);
println!("Observed ports: {:?}", result.observed_ports);
println!("External IP: {:?}", result.external_ip);
assert!(!result.observed_ports.is_empty());
}
/// Integration test: actually query stun.l.google.com. /// Integration test: actually query stun.l.google.com.
/// Ignored by default since it requires network access. /// Ignored by default since it requires network access.
#[tokio::test] #[tokio::test]

View File

@@ -947,6 +947,26 @@ pub enum SignalMessage {
generation: u32, generation: u32,
}, },
// ── Hard NAT traversal (port prediction) ──────────────────────
/// Hard NAT probe coordination — exchanged when both peers
/// detect symmetric NAT. Carries the port allocation pattern
/// and recent port sequence so the peer can predict which port
/// to dial.
HardNatProbe {
call_id: String,
/// Last observed external ports (most recent first).
/// Typically 3-5 entries from sequential STUN probes.
port_sequence: Vec<u16>,
/// Detected allocation pattern as string:
/// "sequential:N" (N=delta), "random", "preserving"
allocation: String,
/// Probe timestamp (ms since epoch) for synchronization.
probe_time_ms: u64,
/// External IP from STUN.
external_ip: String,
},
// ── Phase 4: cross-relay direct-call signaling ──────────────────── // ── Phase 4: cross-relay direct-call signaling ────────────────────
/// Phase 4: relay-to-relay envelope for forwarding direct-call /// Phase 4: relay-to-relay envelope for forwarding direct-call

View File

@@ -1443,6 +1443,37 @@ async fn main() -> anyhow::Result<()> {
} }
} }
// Hard NAT: forward HardNatProbe to call peer
// (same forwarding pattern as CandidateUpdate).
SignalMessage::HardNatProbe { ref call_id, .. } => {
let (peer_fp, peer_relay_fp) = {
let reg = call_registry.lock().await;
match reg.get(call_id) {
Some(c) => (
reg.peer_fingerprint(call_id, &client_fp)
.map(|s| s.to_string()),
c.peer_relay_fp.clone(),
),
None => (None, None),
}
};
if let Some(fp) = peer_fp {
if let Some(ref origin_fp) = peer_relay_fp {
if let Some(ref fm) = federation_mgr {
let forward = SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
let _ = fm.send_signal_to_peer(origin_fp, &forward).await;
}
} else {
let hub = signal_hub.lock().await;
let _ = hub.send_to(&fp, &msg).await;
}
}
}
SignalMessage::Ping { timestamp_ms } => { SignalMessage::Ping { timestamp_ms } => {
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await; let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
} }