diff --git a/crates/wzp-client/src/dual_path.rs b/crates/wzp-client/src/dual_path.rs index 188e1c0..d9355e9 100644 --- a/crates/wzp-client/src/dual_path.rs +++ b/crates/wzp-client/src/dual_path.rs @@ -59,6 +59,22 @@ pub async fn race( relay_addr: SocketAddr, room_sni: String, call_sni: String, + // Phase 5: when `Some`, reuse this endpoint for BOTH the + // direct-path branch AND the relay dial. This is critical + // for hole-punching through port-preserving NATs — the + // advertised reflex addr only matches what peers can dial if + // the listening socket is the SAME one that registered with + // the relay. Pass the signal endpoint here. + // + // The endpoint MUST have been created with a server config + // (`create_endpoint(bind, Some(server_config()))`) if the + // A-role branch is going to run, otherwise `accept()` will + // return None immediately. + // + // When `None`, falls back to the pre-Phase-5 behavior of + // creating fresh endpoints per role. Used by tests and by + // paths where we're not registered to a relay. + shared_endpoint: Option, ) -> anyhow::Result<(Arc, WinningPath)> { // Rustls provider must be installed before any quinn endpoint // is created. Install attempt is idempotent. @@ -75,18 +91,37 @@ pub async fn race( match role { Role::Acceptor => { - let (sc, _cert_der) = wzp_transport::server_config(); - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let ep = wzp_transport::create_endpoint(bind, Some(sc))?; - tracing::info!( - local_addr = ?ep.local_addr().ok(), - "dual_path: A-role endpoint up, awaiting peer dial" - ); + let ep = match shared_endpoint.clone() { + Some(ep) => { + tracing::info!( + local_addr = ?ep.local_addr().ok(), + "dual_path: A-role reusing shared endpoint for accept" + ); + ep + } + None => { + let (sc, _cert_der) = wzp_transport::server_config(); + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let fresh = wzp_transport::create_endpoint(bind, Some(sc))?; + tracing::info!( + local_addr = ?fresh.local_addr().ok(), + "dual_path: A-role fresh endpoint up, awaiting peer dial" + ); + fresh + } + }; let ep_for_fut = ep.clone(); direct_fut = Box::pin(async move { // `wzp_transport::accept` wraps the same // `endpoint.accept().await?.await?` dance we want // and maps errors into TransportError for us. + // + // If `ep_for_fut` is the shared signal endpoint, + // this accept pulls the NEXT incoming connection + // — normally that's the peer's direct-P2P dial. + // Signal recv is done via the existing signal + // CONNECTION (accept_bi), not the endpoint, so + // there's no conflict. let conn = wzp_transport::accept(&ep_for_fut) .await .map_err(|e| anyhow::anyhow!("direct accept: {e}"))?; @@ -95,13 +130,26 @@ pub async fn race( direct_ep = ep; } Role::Dialer => { - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let ep = wzp_transport::create_endpoint(bind, None)?; - tracing::info!( - local_addr = ?ep.local_addr().ok(), - %peer_direct_addr, - "dual_path: D-role endpoint up, dialing peer" - ); + let ep = match shared_endpoint.clone() { + Some(ep) => { + tracing::info!( + local_addr = ?ep.local_addr().ok(), + %peer_direct_addr, + "dual_path: D-role reusing shared endpoint to dial peer" + ); + ep + } + None => { + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let fresh = wzp_transport::create_endpoint(bind, None)?; + tracing::info!( + local_addr = ?fresh.local_addr().ok(), + %peer_direct_addr, + "dual_path: D-role fresh endpoint up, dialing peer" + ); + fresh + } + }; let ep_for_fut = ep.clone(); let client_cfg = wzp_transport::client_config(); let sni = call_sni.clone(); @@ -116,9 +164,17 @@ pub async fn race( } } - // Relay path: classic dial to the relay's media room. - let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let relay_ep = wzp_transport::create_endpoint(relay_bind, None)?; + // Relay path: classic dial to the relay's media room. Phase 5: + // reuse the shared endpoint here too so MikroTik-style NATs + // keep a stable external port across all flows from this + // client. Falls back to a fresh endpoint when not shared. + let relay_ep = match shared_endpoint.clone() { + Some(ep) => ep, + None => { + let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + wzp_transport::create_endpoint(relay_bind, None)? + } + }; let relay_ep_for_fut = relay_ep.clone(); let relay_client_cfg = wzp_transport::client_config(); let relay_sni = room_sni.clone(); @@ -185,11 +241,16 @@ pub async fn race( } }; - // Drop both endpoints once the winner is stored in result. The - // winning transport owns its own connection so dropping the - // endpoint won't kill it. - drop(direct_ep); - drop(relay_ep); + // Let both endpoint clones drop at end-of-scope. With the + // Phase 5 shared-endpoint path, these clones are Arc + // clones of the signal endpoint — dropping them just decrements + // the ref count, the socket stays alive for the signal loop + + // any further direct-P2P attempts. With the fresh-endpoint + // fallback, the drops are the last refs so the sockets close + // promptly. Either way the winning transport already owns its + // own quinn::Connection reference which is independent of the + // Endpoint lifetime. + let _ = (direct_ep, relay_ep); result } diff --git a/crates/wzp-client/src/reflect.rs b/crates/wzp-client/src/reflect.rs index 26577b2..f9083e1 100644 --- a/crates/wzp-client/src/reflect.rs +++ b/crates/wzp-client/src/reflect.rs @@ -67,22 +67,45 @@ pub enum NatType { Unknown, } -/// Probe a single relay with a throwaway QUIC connection. +/// Probe a single relay with a QUIC connection. /// -/// Each call creates a fresh `quinn::Endpoint` so the OS hands out a -/// fresh ephemeral source port — essential for NAT-type detection -/// because a shared socket would produce the same mapping against -/// every relay and mask symmetric NAT. +/// # Endpoint reuse (Phase 5 — Nebula-style architecture) +/// +/// If `existing_endpoint` is `Some`, the probe uses that socket +/// instead of creating a fresh one. This is the desired mode in +/// production: a port-preserving NAT (MikroTik masquerade, most +/// consumer routers) gives a **stable** external port for the +/// one socket, so the reflex addr observed by ANY relay is the +/// SAME addr and matches what a peer would see on a direct dial. +/// Pass the signal endpoint here. +/// +/// If `None`, creates a fresh one-shot endpoint. Kept for: +/// - tests that spin up isolated probes +/// - the "I'm not registered yet" case where there's no signal +/// endpoint to reuse +/// +/// NOTE on NAT-type detection: the pre-Phase-5 behavior of +/// forcing a fresh endpoint per probe was wrong — it made every +/// port-preserving NAT look symmetric because the classifier saw +/// a different external port for each fresh source port. With +/// one shared socket, the classifier reflects the REAL NAT +/// behavior. pub async fn probe_reflect_addr( relay: SocketAddr, timeout_ms: u64, + existing_endpoint: Option, ) -> Result<(SocketAddr, u32), String> { // Install rustls provider idempotently — a second install on the // same thread is a no-op. let _ = rustls::crypto::ring::default_provider().install_default(); - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = create_endpoint(bind, None).map_err(|e| format!("endpoint: {e}"))?; + let endpoint = match existing_endpoint { + Some(ep) => ep, + None => { + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + create_endpoint(bind, None).map_err(|e| format!("endpoint: {e}"))? + } + }; let start = Instant::now(); let probe = async { @@ -153,9 +176,10 @@ pub async fn probe_reflect_addr( .await .map_err(|_| format!("probe timeout ({timeout_ms}ms)"))??; - // Drop the endpoint explicitly AFTER the probe finishes so the - // UDP socket is released before we return. - drop(endpoint); + // `endpoint` is a quinn::Endpoint clone — an Arc under the + // hood. Letting it drop at end-of-scope is correct whether it + // was fresh (last ref → socket closes) or shared (ref count + // decrements, socket stays alive for the signal loop). Ok(out) } @@ -163,17 +187,32 @@ pub async fn probe_reflect_addr( /// classifying the returned addresses. Never errors — failing /// probes surface via `NatProbeResult.error`; aggregate is always /// returned. +/// +/// # Endpoint reuse (Phase 5) +/// +/// If `shared_endpoint` is `Some`, every probe reuses it. This is +/// the PRODUCTION behavior: all probes source from the same UDP +/// port, so port-preserving NATs map them to the same external +/// port, and the classifier reflects the real NAT type. Pass the +/// signal endpoint. +/// +/// If `None`, each probe creates its own fresh endpoint — useful +/// in tests that don't have a signal endpoint, but produces +/// spurious `SymmetricPort` classifications against NATs that +/// would otherwise look cone-like. pub async fn detect_nat_type( relays: Vec<(String, SocketAddr)>, timeout_ms: u64, + shared_endpoint: Option, ) -> NatDetection { // Parallel probes via tokio::task::JoinSet so the wall-clock is // bounded by the slowest probe, not the sum. JoinSet keeps the // dep surface at just tokio — we already depend on it. let mut set = tokio::task::JoinSet::new(); for (name, addr) in relays { + let ep = shared_endpoint.clone(); set.spawn(async move { - let result = probe_reflect_addr(addr, timeout_ms).await; + let result = probe_reflect_addr(addr, timeout_ms, ep).await; (name, addr, result) }); } diff --git a/crates/wzp-client/tests/dual_path.rs b/crates/wzp-client/tests/dual_path.rs index 55521af..7c66023 100644 --- a/crates/wzp-client/tests/dual_path.rs +++ b/crates/wzp-client/tests/dual_path.rs @@ -114,6 +114,7 @@ async fn dual_path_direct_wins_on_loopback() { relay_addr, "test-room".into(), "call-test".into(), + None, // Phase 5: tests use fresh endpoints (no shared signal) ) .await .expect("race must succeed"); @@ -151,6 +152,7 @@ async fn dual_path_relay_wins_when_direct_is_dead() { relay_addr, "test-room".into(), "call-test".into(), + None, // Phase 5: tests use fresh endpoints (no shared signal) ) .await .expect("race must succeed via relay fallback"); @@ -184,6 +186,7 @@ async fn dual_path_errors_cleanly_when_both_paths_dead() { dead_relay, "test-room".into(), "call-test".into(), + None, // Phase 5: tests use fresh endpoints (no shared signal) ) .await; let elapsed = start.elapsed(); diff --git a/crates/wzp-relay/tests/multi_reflect.rs b/crates/wzp-relay/tests/multi_reflect.rs index 1ad1600..49f0691 100644 --- a/crates/wzp-relay/tests/multi_reflect.rs +++ b/crates/wzp-relay/tests/multi_reflect.rs @@ -97,7 +97,7 @@ async fn probe_reflect_addr_happy_path() { let (observed, latency_ms) = tokio::time::timeout( Duration::from_secs(3), - probe_reflect_addr(relay_addr, 2000), + probe_reflect_addr(relay_addr, 2000, None), ) .await .expect("probe must complete within 3s") @@ -138,6 +138,7 @@ async fn detect_nat_type_two_loopback_relays_probes_work_but_classify_unknown() ("RelayB".into(), addr_b), ], 2000, + None, ) .await; @@ -195,6 +196,7 @@ async fn detect_nat_type_dead_relay_is_unknown() { ("Dead".into(), dead_addr), ], 600, // tight timeout so the dead probe fails fast + None, ) .await; diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 8ebd0e4..1677e9f 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -360,7 +360,10 @@ async fn connect( // own_reflex_addr, unparseable addrs, equal addrs), we skip // the race entirely and fall back to the pure-relay path — // identical to Phase 0 behavior. - let own_reflex_addr = state.signal.lock().await.own_reflex_addr.clone(); + let (own_reflex_addr, signal_endpoint_for_race) = { + let sig = state.signal.lock().await; + (sig.own_reflex_addr.clone(), sig.endpoint.clone()) + }; let peer_addr_parsed: Option = peer_direct_addr .as_deref() .and_then(|s| s.parse().ok()); @@ -389,7 +392,20 @@ async fn connect( })); let room_sni = room.clone(); let call_sni = format!("call-{room}"); - match wzp_client::dual_path::race(r, peer_addr, relay_sockaddr, room_sni, call_sni).await { + // Phase 5: pass the signal endpoint so the race + // reuses ONE socket for listen + dial + relay. + // The advertised reflex addr then matches the + // actual listening port and peers can reach us. + match wzp_client::dual_path::race( + r, + peer_addr, + relay_sockaddr, + room_sni, + call_sni, + signal_endpoint_for_race.clone(), + ) + .await + { Ok((transport, path)) => { tracing::info!(?path, "connect: dual-path race resolved"); emit_call_debug(&app, "connect:dual_path_race_won", serde_json::json!({ @@ -760,8 +776,23 @@ fn do_register_signal( let identity_pub = *pub_id.signing.as_bytes(); emit_call_debug(&app, "register_signal:identity_loaded", serde_json::json!({ "fingerprint": fp })); + // Phase 5: single-socket Nebula-style architecture. The signal + // endpoint is dual-purpose (client + server config). Every outbound + // flow — signal, reflect probes, relay media dials, direct-P2P + // dials — uses this same socket, so port-preserving NATs (MikroTik + // masquerade is the big one) give us a stable external port that + // peers can actually dial. The same socket also accepts incoming + // direct-P2P connections during the dual-path race. + // + // Was `None` before Phase 5 — that produced a client-only endpoint + // with a different internal port than later reflect / dual-path + // endpoints, which made MikroTik look symmetric and broke direct + // P2P because the advertised reflex port was not the listening + // port. let bind: std::net::SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = wzp_transport::create_endpoint(bind, None).map_err(|e| format!("{e}"))?; + let (server_cfg, _cert_der) = wzp_transport::server_config(); + let endpoint = wzp_transport::create_endpoint(bind, Some(server_cfg)) + .map_err(|e| format!("{e}"))?; emit_call_debug(&app, "register_signal:endpoint_created", serde_json::json!({ "bind": bind.to_string() })); let conn = wzp_transport::connect(&endpoint, addr, "_signal", wzp_transport::client_config()) .await @@ -1384,6 +1415,7 @@ async fn get_reflected_address( /// in; Rust side just does the network work. #[tauri::command] async fn detect_nat_type( + state: tauri::State<'_, Arc>, relays: Vec, ) -> Result { // Parse relay args up front so a single malformed entry fails @@ -1398,10 +1430,18 @@ async fn detect_nat_type( parsed.push((r.name, addr)); } + // Phase 5: share the signal endpoint across all probes so + // they emit from the same source port. Port-preserving NATs + // (MikroTik, most consumer routers) give a stable external + // port → classifier correctly sees cone instead of falsely + // labeling SymmetricPort. Falls back to None (per-probe fresh + // endpoint) when not registered. + let shared_endpoint = state.signal.lock().await.endpoint.clone(); + // 1500ms per probe is generous: a same-host probe is < 10ms, // a cross-continent probe is typically < 300ms, and we want // to tolerate a one-off packet loss during connect. - let detection = wzp_client::reflect::detect_nat_type(parsed, 1500).await; + let detection = wzp_client::reflect::detect_nat_type(parsed, 1500, shared_endpoint).await; serde_json::to_value(&detection).map_err(|e| format!("serialize: {e}")) }