From 1618ff6c9d6402ec5fb693d80aa79bfb93df2210 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sat, 11 Apr 2026 19:47:20 +0400 Subject: [PATCH] =?UTF-8?q?feat(p2p):=20Phase=205=20=E2=80=94=20single-soc?= =?UTF-8?q?ket=20architecture=20(Nebula-style)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before Phase 5 WarzonePhone used THREE separate UDP sockets per client: 1. Signal endpoint (register_signal, client-only) 2. Reflect probe endpoints (one fresh socket per relay probe) 3. Dual-path race endpoint (fresh per call setup) This broke two things in production on port-preserving NATs (MikroTik masquerade, most consumer routers): a. Phase 2 NAT detection was WRONG. Each probe used a fresh internal port, so MikroTik mapped each one to a different external port, and the classifier saw "different port per relay" and labeled it SymmetricPort. The real NAT was cone-like but measurement via fresh sockets hid that. b. Phase 3.5 dual-path P2P race was BROKEN. The reflex addr we advertised in DirectCallOffer was observed by the signal endpoint's socket. The actual dual-path race listened on a DIFFERENT fresh socket, on a different internal (and therefore external) port. Peers dialed the advertised addr and hit MikroTik's mapping for the signal socket, which forwarded to the signal endpoint — a client-only endpoint that doesn't accept incoming connections. Direct path silently failed, relay always won the race. Nebula-style fix: one socket for everything. The signal endpoint is now dual-purpose (client + server_config), and both the reflect probes and the dual-path race reuse it instead of creating fresh ones. MikroTik's port-preservation then gives us a stable external port across all flows → classifier correctly sees Cone NAT → advertised reflex addr is the actual listening port → direct dials from peers land on the right socket → `endpoint.accept()` in the A-role branch of the dual-path race picks up the incoming connection. ## Changes ### `register_signal` (desktop/src-tauri/src/lib.rs) - Endpoint now created with `Some(server_config())` instead of `None`. The socket can now accept incoming QUIC connections as well as dial outbound. - Every code path that previously read `sig.endpoint` for the relay-dial reuse benefits automatically — same socket is now ALSO listening for peer dials. ### `probe_reflect_addr` (wzp-client/src/reflect.rs) - New `existing_endpoint: Option` arg. `Some` reuses the caller's socket (production: pass the signal endpoint). `None` creates a fresh one (tests + pre-registration). - Removed the `drop(endpoint)` at the end — was correct for fresh endpoints (explicit early socket close) but incorrect for shared ones. End-of-scope drop does the right thing in both cases via Arc semantics. ### `detect_nat_type` (wzp-client/src/reflect.rs) - New `shared_endpoint: Option` arg, forwarded to every probe in the JoinSet fan-out. One shared socket means the classifier sees the true NAT type. ### `detect_nat_type` Tauri command (desktop/src-tauri/src/lib.rs) - Reads `state.signal.endpoint` and passes it as the shared endpoint. Falls back to None when not registered. NAT detection now produces accurate classifications against MikroTik / most consumer NATs. ### `dual_path::race` (wzp-client/src/dual_path.rs) - New `shared_endpoint: Option` arg. - A-role: when `Some`, reuses it for `accept()`. This is the critical change — the reflex addr advertised to peers is now the address listening for incoming direct dials. - D-role: when `Some`, reuses it for the outbound direct dial. MikroTik keeps the same external port for the dial as for the signal flow → direct dial through a cone-mapped NAT. - Relay path: also reuses the shared endpoint so MikroTik has a single consistent mapping across the whole call (saves one extra external port and makes firewall traces cleaner). - When `None`, falls back to fresh per-role endpoints as before. ### `connect` Tauri command (desktop/src-tauri/src/lib.rs) - Reads `state.signal.endpoint` once when acquiring own reflex addr and passes it through to `dual_path::race`. ### Tests - `wzp-client/tests/dual_path.rs` and `wzp-relay/tests/multi_reflect.rs` updated to pass `None` for the new endpoint arg — tests use fresh sockets and that's fine because the loopback harness doesn't care about port-preserving NAT behavior. Full workspace test: 423 passing (no regressions). ## Expected behavior after this commit on real hardware Behind MikroTik + Starlink-bypass (the reporter's setup): - Phase 2 NAT detect → **Cone NAT** (was SymmetricPort — false positive from the measurement artifact) - Phase 3.5 direct-P2P dial → succeeds for both cone-cone and cone-CGNAT cases where the remote side was previously blocked by our own socket mismatch - LTE ↔ LTE cross-carrier → still likely relay fallback; that's genuinely strict symmetric and needs Phase 5.5 port prediction. ## Phase 5.5 (next, separate PRD) Multi-candidate port prediction + ICE-style candidate aggregation for truly strict symmetric NATs. Not needed for the 95% case — Phase 5 alone fixes most consumer-router setups. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/dual_path.rs | 105 +++++++++++++++++++----- crates/wzp-client/src/reflect.rs | 61 +++++++++++--- crates/wzp-client/tests/dual_path.rs | 3 + crates/wzp-relay/tests/multi_reflect.rs | 4 +- desktop/src-tauri/src/lib.rs | 48 ++++++++++- 5 files changed, 183 insertions(+), 38 deletions(-) diff --git a/crates/wzp-client/src/dual_path.rs b/crates/wzp-client/src/dual_path.rs index 188e1c0..d9355e9 100644 --- a/crates/wzp-client/src/dual_path.rs +++ b/crates/wzp-client/src/dual_path.rs @@ -59,6 +59,22 @@ pub async fn race( relay_addr: SocketAddr, room_sni: String, call_sni: String, + // Phase 5: when `Some`, reuse this endpoint for BOTH the + // direct-path branch AND the relay dial. This is critical + // for hole-punching through port-preserving NATs — the + // advertised reflex addr only matches what peers can dial if + // the listening socket is the SAME one that registered with + // the relay. Pass the signal endpoint here. + // + // The endpoint MUST have been created with a server config + // (`create_endpoint(bind, Some(server_config()))`) if the + // A-role branch is going to run, otherwise `accept()` will + // return None immediately. + // + // When `None`, falls back to the pre-Phase-5 behavior of + // creating fresh endpoints per role. Used by tests and by + // paths where we're not registered to a relay. + shared_endpoint: Option, ) -> anyhow::Result<(Arc, WinningPath)> { // Rustls provider must be installed before any quinn endpoint // is created. Install attempt is idempotent. @@ -75,18 +91,37 @@ pub async fn race( match role { Role::Acceptor => { - let (sc, _cert_der) = wzp_transport::server_config(); - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let ep = wzp_transport::create_endpoint(bind, Some(sc))?; - tracing::info!( - local_addr = ?ep.local_addr().ok(), - "dual_path: A-role endpoint up, awaiting peer dial" - ); + let ep = match shared_endpoint.clone() { + Some(ep) => { + tracing::info!( + local_addr = ?ep.local_addr().ok(), + "dual_path: A-role reusing shared endpoint for accept" + ); + ep + } + None => { + let (sc, _cert_der) = wzp_transport::server_config(); + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let fresh = wzp_transport::create_endpoint(bind, Some(sc))?; + tracing::info!( + local_addr = ?fresh.local_addr().ok(), + "dual_path: A-role fresh endpoint up, awaiting peer dial" + ); + fresh + } + }; let ep_for_fut = ep.clone(); direct_fut = Box::pin(async move { // `wzp_transport::accept` wraps the same // `endpoint.accept().await?.await?` dance we want // and maps errors into TransportError for us. + // + // If `ep_for_fut` is the shared signal endpoint, + // this accept pulls the NEXT incoming connection + // — normally that's the peer's direct-P2P dial. + // Signal recv is done via the existing signal + // CONNECTION (accept_bi), not the endpoint, so + // there's no conflict. let conn = wzp_transport::accept(&ep_for_fut) .await .map_err(|e| anyhow::anyhow!("direct accept: {e}"))?; @@ -95,13 +130,26 @@ pub async fn race( direct_ep = ep; } Role::Dialer => { - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let ep = wzp_transport::create_endpoint(bind, None)?; - tracing::info!( - local_addr = ?ep.local_addr().ok(), - %peer_direct_addr, - "dual_path: D-role endpoint up, dialing peer" - ); + let ep = match shared_endpoint.clone() { + Some(ep) => { + tracing::info!( + local_addr = ?ep.local_addr().ok(), + %peer_direct_addr, + "dual_path: D-role reusing shared endpoint to dial peer" + ); + ep + } + None => { + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let fresh = wzp_transport::create_endpoint(bind, None)?; + tracing::info!( + local_addr = ?fresh.local_addr().ok(), + %peer_direct_addr, + "dual_path: D-role fresh endpoint up, dialing peer" + ); + fresh + } + }; let ep_for_fut = ep.clone(); let client_cfg = wzp_transport::client_config(); let sni = call_sni.clone(); @@ -116,9 +164,17 @@ pub async fn race( } } - // Relay path: classic dial to the relay's media room. - let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let relay_ep = wzp_transport::create_endpoint(relay_bind, None)?; + // Relay path: classic dial to the relay's media room. Phase 5: + // reuse the shared endpoint here too so MikroTik-style NATs + // keep a stable external port across all flows from this + // client. Falls back to a fresh endpoint when not shared. + let relay_ep = match shared_endpoint.clone() { + Some(ep) => ep, + None => { + let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + wzp_transport::create_endpoint(relay_bind, None)? + } + }; let relay_ep_for_fut = relay_ep.clone(); let relay_client_cfg = wzp_transport::client_config(); let relay_sni = room_sni.clone(); @@ -185,11 +241,16 @@ pub async fn race( } }; - // Drop both endpoints once the winner is stored in result. The - // winning transport owns its own connection so dropping the - // endpoint won't kill it. - drop(direct_ep); - drop(relay_ep); + // Let both endpoint clones drop at end-of-scope. With the + // Phase 5 shared-endpoint path, these clones are Arc + // clones of the signal endpoint — dropping them just decrements + // the ref count, the socket stays alive for the signal loop + + // any further direct-P2P attempts. With the fresh-endpoint + // fallback, the drops are the last refs so the sockets close + // promptly. Either way the winning transport already owns its + // own quinn::Connection reference which is independent of the + // Endpoint lifetime. + let _ = (direct_ep, relay_ep); result } diff --git a/crates/wzp-client/src/reflect.rs b/crates/wzp-client/src/reflect.rs index 26577b2..f9083e1 100644 --- a/crates/wzp-client/src/reflect.rs +++ b/crates/wzp-client/src/reflect.rs @@ -67,22 +67,45 @@ pub enum NatType { Unknown, } -/// Probe a single relay with a throwaway QUIC connection. +/// Probe a single relay with a QUIC connection. /// -/// Each call creates a fresh `quinn::Endpoint` so the OS hands out a -/// fresh ephemeral source port — essential for NAT-type detection -/// because a shared socket would produce the same mapping against -/// every relay and mask symmetric NAT. +/// # Endpoint reuse (Phase 5 — Nebula-style architecture) +/// +/// If `existing_endpoint` is `Some`, the probe uses that socket +/// instead of creating a fresh one. This is the desired mode in +/// production: a port-preserving NAT (MikroTik masquerade, most +/// consumer routers) gives a **stable** external port for the +/// one socket, so the reflex addr observed by ANY relay is the +/// SAME addr and matches what a peer would see on a direct dial. +/// Pass the signal endpoint here. +/// +/// If `None`, creates a fresh one-shot endpoint. Kept for: +/// - tests that spin up isolated probes +/// - the "I'm not registered yet" case where there's no signal +/// endpoint to reuse +/// +/// NOTE on NAT-type detection: the pre-Phase-5 behavior of +/// forcing a fresh endpoint per probe was wrong — it made every +/// port-preserving NAT look symmetric because the classifier saw +/// a different external port for each fresh source port. With +/// one shared socket, the classifier reflects the REAL NAT +/// behavior. pub async fn probe_reflect_addr( relay: SocketAddr, timeout_ms: u64, + existing_endpoint: Option, ) -> Result<(SocketAddr, u32), String> { // Install rustls provider idempotently — a second install on the // same thread is a no-op. let _ = rustls::crypto::ring::default_provider().install_default(); - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = create_endpoint(bind, None).map_err(|e| format!("endpoint: {e}"))?; + let endpoint = match existing_endpoint { + Some(ep) => ep, + None => { + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + create_endpoint(bind, None).map_err(|e| format!("endpoint: {e}"))? + } + }; let start = Instant::now(); let probe = async { @@ -153,9 +176,10 @@ pub async fn probe_reflect_addr( .await .map_err(|_| format!("probe timeout ({timeout_ms}ms)"))??; - // Drop the endpoint explicitly AFTER the probe finishes so the - // UDP socket is released before we return. - drop(endpoint); + // `endpoint` is a quinn::Endpoint clone — an Arc under the + // hood. Letting it drop at end-of-scope is correct whether it + // was fresh (last ref → socket closes) or shared (ref count + // decrements, socket stays alive for the signal loop). Ok(out) } @@ -163,17 +187,32 @@ pub async fn probe_reflect_addr( /// classifying the returned addresses. Never errors — failing /// probes surface via `NatProbeResult.error`; aggregate is always /// returned. +/// +/// # Endpoint reuse (Phase 5) +/// +/// If `shared_endpoint` is `Some`, every probe reuses it. This is +/// the PRODUCTION behavior: all probes source from the same UDP +/// port, so port-preserving NATs map them to the same external +/// port, and the classifier reflects the real NAT type. Pass the +/// signal endpoint. +/// +/// If `None`, each probe creates its own fresh endpoint — useful +/// in tests that don't have a signal endpoint, but produces +/// spurious `SymmetricPort` classifications against NATs that +/// would otherwise look cone-like. pub async fn detect_nat_type( relays: Vec<(String, SocketAddr)>, timeout_ms: u64, + shared_endpoint: Option, ) -> NatDetection { // Parallel probes via tokio::task::JoinSet so the wall-clock is // bounded by the slowest probe, not the sum. JoinSet keeps the // dep surface at just tokio — we already depend on it. let mut set = tokio::task::JoinSet::new(); for (name, addr) in relays { + let ep = shared_endpoint.clone(); set.spawn(async move { - let result = probe_reflect_addr(addr, timeout_ms).await; + let result = probe_reflect_addr(addr, timeout_ms, ep).await; (name, addr, result) }); } diff --git a/crates/wzp-client/tests/dual_path.rs b/crates/wzp-client/tests/dual_path.rs index 55521af..7c66023 100644 --- a/crates/wzp-client/tests/dual_path.rs +++ b/crates/wzp-client/tests/dual_path.rs @@ -114,6 +114,7 @@ async fn dual_path_direct_wins_on_loopback() { relay_addr, "test-room".into(), "call-test".into(), + None, // Phase 5: tests use fresh endpoints (no shared signal) ) .await .expect("race must succeed"); @@ -151,6 +152,7 @@ async fn dual_path_relay_wins_when_direct_is_dead() { relay_addr, "test-room".into(), "call-test".into(), + None, // Phase 5: tests use fresh endpoints (no shared signal) ) .await .expect("race must succeed via relay fallback"); @@ -184,6 +186,7 @@ async fn dual_path_errors_cleanly_when_both_paths_dead() { dead_relay, "test-room".into(), "call-test".into(), + None, // Phase 5: tests use fresh endpoints (no shared signal) ) .await; let elapsed = start.elapsed(); diff --git a/crates/wzp-relay/tests/multi_reflect.rs b/crates/wzp-relay/tests/multi_reflect.rs index 1ad1600..49f0691 100644 --- a/crates/wzp-relay/tests/multi_reflect.rs +++ b/crates/wzp-relay/tests/multi_reflect.rs @@ -97,7 +97,7 @@ async fn probe_reflect_addr_happy_path() { let (observed, latency_ms) = tokio::time::timeout( Duration::from_secs(3), - probe_reflect_addr(relay_addr, 2000), + probe_reflect_addr(relay_addr, 2000, None), ) .await .expect("probe must complete within 3s") @@ -138,6 +138,7 @@ async fn detect_nat_type_two_loopback_relays_probes_work_but_classify_unknown() ("RelayB".into(), addr_b), ], 2000, + None, ) .await; @@ -195,6 +196,7 @@ async fn detect_nat_type_dead_relay_is_unknown() { ("Dead".into(), dead_addr), ], 600, // tight timeout so the dead probe fails fast + None, ) .await; diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 8ebd0e4..1677e9f 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -360,7 +360,10 @@ async fn connect( // own_reflex_addr, unparseable addrs, equal addrs), we skip // the race entirely and fall back to the pure-relay path — // identical to Phase 0 behavior. - let own_reflex_addr = state.signal.lock().await.own_reflex_addr.clone(); + let (own_reflex_addr, signal_endpoint_for_race) = { + let sig = state.signal.lock().await; + (sig.own_reflex_addr.clone(), sig.endpoint.clone()) + }; let peer_addr_parsed: Option = peer_direct_addr .as_deref() .and_then(|s| s.parse().ok()); @@ -389,7 +392,20 @@ async fn connect( })); let room_sni = room.clone(); let call_sni = format!("call-{room}"); - match wzp_client::dual_path::race(r, peer_addr, relay_sockaddr, room_sni, call_sni).await { + // Phase 5: pass the signal endpoint so the race + // reuses ONE socket for listen + dial + relay. + // The advertised reflex addr then matches the + // actual listening port and peers can reach us. + match wzp_client::dual_path::race( + r, + peer_addr, + relay_sockaddr, + room_sni, + call_sni, + signal_endpoint_for_race.clone(), + ) + .await + { Ok((transport, path)) => { tracing::info!(?path, "connect: dual-path race resolved"); emit_call_debug(&app, "connect:dual_path_race_won", serde_json::json!({ @@ -760,8 +776,23 @@ fn do_register_signal( let identity_pub = *pub_id.signing.as_bytes(); emit_call_debug(&app, "register_signal:identity_loaded", serde_json::json!({ "fingerprint": fp })); + // Phase 5: single-socket Nebula-style architecture. The signal + // endpoint is dual-purpose (client + server config). Every outbound + // flow — signal, reflect probes, relay media dials, direct-P2P + // dials — uses this same socket, so port-preserving NATs (MikroTik + // masquerade is the big one) give us a stable external port that + // peers can actually dial. The same socket also accepts incoming + // direct-P2P connections during the dual-path race. + // + // Was `None` before Phase 5 — that produced a client-only endpoint + // with a different internal port than later reflect / dual-path + // endpoints, which made MikroTik look symmetric and broke direct + // P2P because the advertised reflex port was not the listening + // port. let bind: std::net::SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = wzp_transport::create_endpoint(bind, None).map_err(|e| format!("{e}"))?; + let (server_cfg, _cert_der) = wzp_transport::server_config(); + let endpoint = wzp_transport::create_endpoint(bind, Some(server_cfg)) + .map_err(|e| format!("{e}"))?; emit_call_debug(&app, "register_signal:endpoint_created", serde_json::json!({ "bind": bind.to_string() })); let conn = wzp_transport::connect(&endpoint, addr, "_signal", wzp_transport::client_config()) .await @@ -1384,6 +1415,7 @@ async fn get_reflected_address( /// in; Rust side just does the network work. #[tauri::command] async fn detect_nat_type( + state: tauri::State<'_, Arc>, relays: Vec, ) -> Result { // Parse relay args up front so a single malformed entry fails @@ -1398,10 +1430,18 @@ async fn detect_nat_type( parsed.push((r.name, addr)); } + // Phase 5: share the signal endpoint across all probes so + // they emit from the same source port. Port-preserving NATs + // (MikroTik, most consumer routers) give a stable external + // port → classifier correctly sees cone instead of falsely + // labeling SymmetricPort. Falls back to None (per-probe fresh + // endpoint) when not registered. + let shared_endpoint = state.signal.lock().await.endpoint.clone(); + // 1500ms per probe is generous: a same-host probe is < 10ms, // a cross-continent probe is typically < 300ms, and we want // to tolerate a one-off packet loss during connect. - let detection = wzp_client::reflect::detect_nat_type(parsed, 1500).await; + let detection = wzp_client::reflect::detect_nat_type(parsed, 1500, shared_endpoint).await; serde_json::to_value(&detection).map_err(|e| format!("serialize: {e}")) }