feat(p2p): Phase 5 — single-socket architecture (Nebula-style)
Before Phase 5 WarzonePhone used THREE separate UDP sockets per
client:
1. Signal endpoint (register_signal, client-only)
2. Reflect probe endpoints (one fresh socket per relay probe)
3. Dual-path race endpoint (fresh per call setup)
This broke two things in production on port-preserving NATs
(MikroTik masquerade, most consumer routers):
a. Phase 2 NAT detection was WRONG. Each probe used a fresh
internal port, so MikroTik mapped each one to a different
external port, and the classifier saw "different port per
relay" and labeled it SymmetricPort. The real NAT was
cone-like but measurement via fresh sockets hid that.
b. Phase 3.5 dual-path P2P race was BROKEN. The reflex addr
we advertised in DirectCallOffer was observed by the signal
endpoint's socket. The actual dual-path race listened on a
DIFFERENT fresh socket, on a different internal (and
therefore external) port. Peers dialed the advertised addr
and hit MikroTik's mapping for the signal socket, which
forwarded to the signal endpoint — a client-only endpoint
that doesn't accept incoming connections. Direct path
silently failed, relay always won the race.
Nebula-style fix: one socket for everything. The signal endpoint
is now dual-purpose (client + server_config), and both the
reflect probes and the dual-path race reuse it instead of
creating fresh ones. MikroTik's port-preservation then gives us
a stable external port across all flows → classifier correctly
sees Cone NAT → advertised reflex addr is the actual listening
port → direct dials from peers land on the right socket →
`endpoint.accept()` in the A-role branch of the dual-path race
picks up the incoming connection.
## Changes
### `register_signal` (desktop/src-tauri/src/lib.rs)
- Endpoint now created with `Some(server_config())` instead of
`None`. The socket can now accept incoming QUIC connections as
well as dial outbound.
- Every code path that previously read `sig.endpoint` for the
relay-dial reuse benefits automatically — same socket is now
ALSO listening for peer dials.
### `probe_reflect_addr` (wzp-client/src/reflect.rs)
- New `existing_endpoint: Option<Endpoint>` arg. `Some` reuses
the caller's socket (production: pass the signal endpoint).
`None` creates a fresh one (tests + pre-registration).
- Removed the `drop(endpoint)` at the end — was correct for
fresh endpoints (explicit early socket close) but incorrect
for shared ones. End-of-scope drop does the right thing in
both cases via Arc semantics.
### `detect_nat_type` (wzp-client/src/reflect.rs)
- New `shared_endpoint: Option<Endpoint>` arg, forwarded to
every probe in the JoinSet fan-out. One shared socket means
the classifier sees the true NAT type.
### `detect_nat_type` Tauri command (desktop/src-tauri/src/lib.rs)
- Reads `state.signal.endpoint` and passes it as the shared
endpoint. Falls back to None when not registered. NAT detection
now produces accurate classifications against MikroTik / most
consumer NATs.
### `dual_path::race` (wzp-client/src/dual_path.rs)
- New `shared_endpoint: Option<Endpoint>` arg.
- A-role: when `Some`, reuses it for `accept()`. This is the
critical change — the reflex addr advertised to peers is now
the address listening for incoming direct dials.
- D-role: when `Some`, reuses it for the outbound direct dial.
MikroTik keeps the same external port for the dial as for
the signal flow → direct dial through a cone-mapped NAT.
- Relay path: also reuses the shared endpoint so MikroTik has
a single consistent mapping across the whole call (saves one
extra external port and makes firewall traces cleaner).
- When `None`, falls back to fresh per-role endpoints as before.
### `connect` Tauri command (desktop/src-tauri/src/lib.rs)
- Reads `state.signal.endpoint` once when acquiring own reflex
addr and passes it through to `dual_path::race`.
### Tests
- `wzp-client/tests/dual_path.rs` and
`wzp-relay/tests/multi_reflect.rs` updated to pass `None` for
the new endpoint arg — tests use fresh sockets and that's
fine because the loopback harness doesn't care about
port-preserving NAT behavior.
Full workspace test: 423 passing (no regressions).
## Expected behavior after this commit on real hardware
Behind MikroTik + Starlink-bypass (the reporter's setup):
- Phase 2 NAT detect → **Cone NAT** (was SymmetricPort — false
positive from the measurement artifact)
- Phase 3.5 direct-P2P dial → succeeds for both cone-cone and
cone-CGNAT cases where the remote side was previously blocked
by our own socket mismatch
- LTE ↔ LTE cross-carrier → still likely relay fallback; that's
genuinely strict symmetric and needs Phase 5.5 port prediction.
## Phase 5.5 (next, separate PRD)
Multi-candidate port prediction + ICE-style candidate aggregation
for truly strict symmetric NATs. Not needed for the 95% case —
Phase 5 alone fixes most consumer-router setups.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -59,6 +59,22 @@ pub async fn race(
|
||||
relay_addr: SocketAddr,
|
||||
room_sni: String,
|
||||
call_sni: String,
|
||||
// Phase 5: when `Some`, reuse this endpoint for BOTH the
|
||||
// direct-path branch AND the relay dial. This is critical
|
||||
// for hole-punching through port-preserving NATs — the
|
||||
// advertised reflex addr only matches what peers can dial if
|
||||
// the listening socket is the SAME one that registered with
|
||||
// the relay. Pass the signal endpoint here.
|
||||
//
|
||||
// The endpoint MUST have been created with a server config
|
||||
// (`create_endpoint(bind, Some(server_config()))`) if the
|
||||
// A-role branch is going to run, otherwise `accept()` will
|
||||
// return None immediately.
|
||||
//
|
||||
// When `None`, falls back to the pre-Phase-5 behavior of
|
||||
// creating fresh endpoints per role. Used by tests and by
|
||||
// paths where we're not registered to a relay.
|
||||
shared_endpoint: Option<wzp_transport::Endpoint>,
|
||||
) -> anyhow::Result<(Arc<QuinnTransport>, WinningPath)> {
|
||||
// Rustls provider must be installed before any quinn endpoint
|
||||
// is created. Install attempt is idempotent.
|
||||
@@ -75,18 +91,37 @@ pub async fn race(
|
||||
|
||||
match role {
|
||||
Role::Acceptor => {
|
||||
let (sc, _cert_der) = wzp_transport::server_config();
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let ep = wzp_transport::create_endpoint(bind, Some(sc))?;
|
||||
tracing::info!(
|
||||
local_addr = ?ep.local_addr().ok(),
|
||||
"dual_path: A-role endpoint up, awaiting peer dial"
|
||||
);
|
||||
let ep = match shared_endpoint.clone() {
|
||||
Some(ep) => {
|
||||
tracing::info!(
|
||||
local_addr = ?ep.local_addr().ok(),
|
||||
"dual_path: A-role reusing shared endpoint for accept"
|
||||
);
|
||||
ep
|
||||
}
|
||||
None => {
|
||||
let (sc, _cert_der) = wzp_transport::server_config();
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let fresh = wzp_transport::create_endpoint(bind, Some(sc))?;
|
||||
tracing::info!(
|
||||
local_addr = ?fresh.local_addr().ok(),
|
||||
"dual_path: A-role fresh endpoint up, awaiting peer dial"
|
||||
);
|
||||
fresh
|
||||
}
|
||||
};
|
||||
let ep_for_fut = ep.clone();
|
||||
direct_fut = Box::pin(async move {
|
||||
// `wzp_transport::accept` wraps the same
|
||||
// `endpoint.accept().await?.await?` dance we want
|
||||
// and maps errors into TransportError for us.
|
||||
//
|
||||
// If `ep_for_fut` is the shared signal endpoint,
|
||||
// this accept pulls the NEXT incoming connection
|
||||
// — normally that's the peer's direct-P2P dial.
|
||||
// Signal recv is done via the existing signal
|
||||
// CONNECTION (accept_bi), not the endpoint, so
|
||||
// there's no conflict.
|
||||
let conn = wzp_transport::accept(&ep_for_fut)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
|
||||
@@ -95,13 +130,26 @@ pub async fn race(
|
||||
direct_ep = ep;
|
||||
}
|
||||
Role::Dialer => {
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let ep = wzp_transport::create_endpoint(bind, None)?;
|
||||
tracing::info!(
|
||||
local_addr = ?ep.local_addr().ok(),
|
||||
%peer_direct_addr,
|
||||
"dual_path: D-role endpoint up, dialing peer"
|
||||
);
|
||||
let ep = match shared_endpoint.clone() {
|
||||
Some(ep) => {
|
||||
tracing::info!(
|
||||
local_addr = ?ep.local_addr().ok(),
|
||||
%peer_direct_addr,
|
||||
"dual_path: D-role reusing shared endpoint to dial peer"
|
||||
);
|
||||
ep
|
||||
}
|
||||
None => {
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let fresh = wzp_transport::create_endpoint(bind, None)?;
|
||||
tracing::info!(
|
||||
local_addr = ?fresh.local_addr().ok(),
|
||||
%peer_direct_addr,
|
||||
"dual_path: D-role fresh endpoint up, dialing peer"
|
||||
);
|
||||
fresh
|
||||
}
|
||||
};
|
||||
let ep_for_fut = ep.clone();
|
||||
let client_cfg = wzp_transport::client_config();
|
||||
let sni = call_sni.clone();
|
||||
@@ -116,9 +164,17 @@ pub async fn race(
|
||||
}
|
||||
}
|
||||
|
||||
// Relay path: classic dial to the relay's media room.
|
||||
let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let relay_ep = wzp_transport::create_endpoint(relay_bind, None)?;
|
||||
// Relay path: classic dial to the relay's media room. Phase 5:
|
||||
// reuse the shared endpoint here too so MikroTik-style NATs
|
||||
// keep a stable external port across all flows from this
|
||||
// client. Falls back to a fresh endpoint when not shared.
|
||||
let relay_ep = match shared_endpoint.clone() {
|
||||
Some(ep) => ep,
|
||||
None => {
|
||||
let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
wzp_transport::create_endpoint(relay_bind, None)?
|
||||
}
|
||||
};
|
||||
let relay_ep_for_fut = relay_ep.clone();
|
||||
let relay_client_cfg = wzp_transport::client_config();
|
||||
let relay_sni = room_sni.clone();
|
||||
@@ -185,11 +241,16 @@ pub async fn race(
|
||||
}
|
||||
};
|
||||
|
||||
// Drop both endpoints once the winner is stored in result. The
|
||||
// winning transport owns its own connection so dropping the
|
||||
// endpoint won't kill it.
|
||||
drop(direct_ep);
|
||||
drop(relay_ep);
|
||||
// Let both endpoint clones drop at end-of-scope. With the
|
||||
// Phase 5 shared-endpoint path, these clones are Arc<Endpoint>
|
||||
// clones of the signal endpoint — dropping them just decrements
|
||||
// the ref count, the socket stays alive for the signal loop +
|
||||
// any further direct-P2P attempts. With the fresh-endpoint
|
||||
// fallback, the drops are the last refs so the sockets close
|
||||
// promptly. Either way the winning transport already owns its
|
||||
// own quinn::Connection reference which is independent of the
|
||||
// Endpoint lifetime.
|
||||
let _ = (direct_ep, relay_ep);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
@@ -67,22 +67,45 @@ pub enum NatType {
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Probe a single relay with a throwaway QUIC connection.
|
||||
/// Probe a single relay with a QUIC connection.
|
||||
///
|
||||
/// Each call creates a fresh `quinn::Endpoint` so the OS hands out a
|
||||
/// fresh ephemeral source port — essential for NAT-type detection
|
||||
/// because a shared socket would produce the same mapping against
|
||||
/// every relay and mask symmetric NAT.
|
||||
/// # Endpoint reuse (Phase 5 — Nebula-style architecture)
|
||||
///
|
||||
/// If `existing_endpoint` is `Some`, the probe uses that socket
|
||||
/// instead of creating a fresh one. This is the desired mode in
|
||||
/// production: a port-preserving NAT (MikroTik masquerade, most
|
||||
/// consumer routers) gives a **stable** external port for the
|
||||
/// one socket, so the reflex addr observed by ANY relay is the
|
||||
/// SAME addr and matches what a peer would see on a direct dial.
|
||||
/// Pass the signal endpoint here.
|
||||
///
|
||||
/// If `None`, creates a fresh one-shot endpoint. Kept for:
|
||||
/// - tests that spin up isolated probes
|
||||
/// - the "I'm not registered yet" case where there's no signal
|
||||
/// endpoint to reuse
|
||||
///
|
||||
/// NOTE on NAT-type detection: the pre-Phase-5 behavior of
|
||||
/// forcing a fresh endpoint per probe was wrong — it made every
|
||||
/// port-preserving NAT look symmetric because the classifier saw
|
||||
/// a different external port for each fresh source port. With
|
||||
/// one shared socket, the classifier reflects the REAL NAT
|
||||
/// behavior.
|
||||
pub async fn probe_reflect_addr(
|
||||
relay: SocketAddr,
|
||||
timeout_ms: u64,
|
||||
existing_endpoint: Option<wzp_transport::Endpoint>,
|
||||
) -> Result<(SocketAddr, u32), String> {
|
||||
// Install rustls provider idempotently — a second install on the
|
||||
// same thread is a no-op.
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let endpoint = create_endpoint(bind, None).map_err(|e| format!("endpoint: {e}"))?;
|
||||
let endpoint = match existing_endpoint {
|
||||
Some(ep) => ep,
|
||||
None => {
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
create_endpoint(bind, None).map_err(|e| format!("endpoint: {e}"))?
|
||||
}
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
let probe = async {
|
||||
@@ -153,9 +176,10 @@ pub async fn probe_reflect_addr(
|
||||
.await
|
||||
.map_err(|_| format!("probe timeout ({timeout_ms}ms)"))??;
|
||||
|
||||
// Drop the endpoint explicitly AFTER the probe finishes so the
|
||||
// UDP socket is released before we return.
|
||||
drop(endpoint);
|
||||
// `endpoint` is a quinn::Endpoint clone — an Arc under the
|
||||
// hood. Letting it drop at end-of-scope is correct whether it
|
||||
// was fresh (last ref → socket closes) or shared (ref count
|
||||
// decrements, socket stays alive for the signal loop).
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
@@ -163,17 +187,32 @@ pub async fn probe_reflect_addr(
|
||||
/// classifying the returned addresses. Never errors — failing
|
||||
/// probes surface via `NatProbeResult.error`; aggregate is always
|
||||
/// returned.
|
||||
///
|
||||
/// # Endpoint reuse (Phase 5)
|
||||
///
|
||||
/// If `shared_endpoint` is `Some`, every probe reuses it. This is
|
||||
/// the PRODUCTION behavior: all probes source from the same UDP
|
||||
/// port, so port-preserving NATs map them to the same external
|
||||
/// port, and the classifier reflects the real NAT type. Pass the
|
||||
/// signal endpoint.
|
||||
///
|
||||
/// If `None`, each probe creates its own fresh endpoint — useful
|
||||
/// in tests that don't have a signal endpoint, but produces
|
||||
/// spurious `SymmetricPort` classifications against NATs that
|
||||
/// would otherwise look cone-like.
|
||||
pub async fn detect_nat_type(
|
||||
relays: Vec<(String, SocketAddr)>,
|
||||
timeout_ms: u64,
|
||||
shared_endpoint: Option<wzp_transport::Endpoint>,
|
||||
) -> NatDetection {
|
||||
// Parallel probes via tokio::task::JoinSet so the wall-clock is
|
||||
// bounded by the slowest probe, not the sum. JoinSet keeps the
|
||||
// dep surface at just tokio — we already depend on it.
|
||||
let mut set = tokio::task::JoinSet::new();
|
||||
for (name, addr) in relays {
|
||||
let ep = shared_endpoint.clone();
|
||||
set.spawn(async move {
|
||||
let result = probe_reflect_addr(addr, timeout_ms).await;
|
||||
let result = probe_reflect_addr(addr, timeout_ms, ep).await;
|
||||
(name, addr, result)
|
||||
});
|
||||
}
|
||||
|
||||
@@ -114,6 +114,7 @@ async fn dual_path_direct_wins_on_loopback() {
|
||||
relay_addr,
|
||||
"test-room".into(),
|
||||
"call-test".into(),
|
||||
None, // Phase 5: tests use fresh endpoints (no shared signal)
|
||||
)
|
||||
.await
|
||||
.expect("race must succeed");
|
||||
@@ -151,6 +152,7 @@ async fn dual_path_relay_wins_when_direct_is_dead() {
|
||||
relay_addr,
|
||||
"test-room".into(),
|
||||
"call-test".into(),
|
||||
None, // Phase 5: tests use fresh endpoints (no shared signal)
|
||||
)
|
||||
.await
|
||||
.expect("race must succeed via relay fallback");
|
||||
@@ -184,6 +186,7 @@ async fn dual_path_errors_cleanly_when_both_paths_dead() {
|
||||
dead_relay,
|
||||
"test-room".into(),
|
||||
"call-test".into(),
|
||||
None, // Phase 5: tests use fresh endpoints (no shared signal)
|
||||
)
|
||||
.await;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
@@ -97,7 +97,7 @@ async fn probe_reflect_addr_happy_path() {
|
||||
|
||||
let (observed, latency_ms) = tokio::time::timeout(
|
||||
Duration::from_secs(3),
|
||||
probe_reflect_addr(relay_addr, 2000),
|
||||
probe_reflect_addr(relay_addr, 2000, None),
|
||||
)
|
||||
.await
|
||||
.expect("probe must complete within 3s")
|
||||
@@ -138,6 +138,7 @@ async fn detect_nat_type_two_loopback_relays_probes_work_but_classify_unknown()
|
||||
("RelayB".into(), addr_b),
|
||||
],
|
||||
2000,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -195,6 +196,7 @@ async fn detect_nat_type_dead_relay_is_unknown() {
|
||||
("Dead".into(), dead_addr),
|
||||
],
|
||||
600, // tight timeout so the dead probe fails fast
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -360,7 +360,10 @@ async fn connect(
|
||||
// own_reflex_addr, unparseable addrs, equal addrs), we skip
|
||||
// the race entirely and fall back to the pure-relay path —
|
||||
// identical to Phase 0 behavior.
|
||||
let own_reflex_addr = state.signal.lock().await.own_reflex_addr.clone();
|
||||
let (own_reflex_addr, signal_endpoint_for_race) = {
|
||||
let sig = state.signal.lock().await;
|
||||
(sig.own_reflex_addr.clone(), sig.endpoint.clone())
|
||||
};
|
||||
let peer_addr_parsed: Option<std::net::SocketAddr> = peer_direct_addr
|
||||
.as_deref()
|
||||
.and_then(|s| s.parse().ok());
|
||||
@@ -389,7 +392,20 @@ async fn connect(
|
||||
}));
|
||||
let room_sni = room.clone();
|
||||
let call_sni = format!("call-{room}");
|
||||
match wzp_client::dual_path::race(r, peer_addr, relay_sockaddr, room_sni, call_sni).await {
|
||||
// Phase 5: pass the signal endpoint so the race
|
||||
// reuses ONE socket for listen + dial + relay.
|
||||
// The advertised reflex addr then matches the
|
||||
// actual listening port and peers can reach us.
|
||||
match wzp_client::dual_path::race(
|
||||
r,
|
||||
peer_addr,
|
||||
relay_sockaddr,
|
||||
room_sni,
|
||||
call_sni,
|
||||
signal_endpoint_for_race.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((transport, path)) => {
|
||||
tracing::info!(?path, "connect: dual-path race resolved");
|
||||
emit_call_debug(&app, "connect:dual_path_race_won", serde_json::json!({
|
||||
@@ -760,8 +776,23 @@ fn do_register_signal(
|
||||
let identity_pub = *pub_id.signing.as_bytes();
|
||||
emit_call_debug(&app, "register_signal:identity_loaded", serde_json::json!({ "fingerprint": fp }));
|
||||
|
||||
// Phase 5: single-socket Nebula-style architecture. The signal
|
||||
// endpoint is dual-purpose (client + server config). Every outbound
|
||||
// flow — signal, reflect probes, relay media dials, direct-P2P
|
||||
// dials — uses this same socket, so port-preserving NATs (MikroTik
|
||||
// masquerade is the big one) give us a stable external port that
|
||||
// peers can actually dial. The same socket also accepts incoming
|
||||
// direct-P2P connections during the dual-path race.
|
||||
//
|
||||
// Was `None` before Phase 5 — that produced a client-only endpoint
|
||||
// with a different internal port than later reflect / dual-path
|
||||
// endpoints, which made MikroTik look symmetric and broke direct
|
||||
// P2P because the advertised reflex port was not the listening
|
||||
// port.
|
||||
let bind: std::net::SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let endpoint = wzp_transport::create_endpoint(bind, None).map_err(|e| format!("{e}"))?;
|
||||
let (server_cfg, _cert_der) = wzp_transport::server_config();
|
||||
let endpoint = wzp_transport::create_endpoint(bind, Some(server_cfg))
|
||||
.map_err(|e| format!("{e}"))?;
|
||||
emit_call_debug(&app, "register_signal:endpoint_created", serde_json::json!({ "bind": bind.to_string() }));
|
||||
let conn = wzp_transport::connect(&endpoint, addr, "_signal", wzp_transport::client_config())
|
||||
.await
|
||||
@@ -1384,6 +1415,7 @@ async fn get_reflected_address(
|
||||
/// in; Rust side just does the network work.
|
||||
#[tauri::command]
|
||||
async fn detect_nat_type(
|
||||
state: tauri::State<'_, Arc<AppState>>,
|
||||
relays: Vec<RelayArg>,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
// Parse relay args up front so a single malformed entry fails
|
||||
@@ -1398,10 +1430,18 @@ async fn detect_nat_type(
|
||||
parsed.push((r.name, addr));
|
||||
}
|
||||
|
||||
// Phase 5: share the signal endpoint across all probes so
|
||||
// they emit from the same source port. Port-preserving NATs
|
||||
// (MikroTik, most consumer routers) give a stable external
|
||||
// port → classifier correctly sees cone instead of falsely
|
||||
// labeling SymmetricPort. Falls back to None (per-probe fresh
|
||||
// endpoint) when not registered.
|
||||
let shared_endpoint = state.signal.lock().await.endpoint.clone();
|
||||
|
||||
// 1500ms per probe is generous: a same-host probe is < 10ms,
|
||||
// a cross-continent probe is typically < 300ms, and we want
|
||||
// to tolerate a one-off packet loss during connect.
|
||||
let detection = wzp_client::reflect::detect_nat_type(parsed, 1500).await;
|
||||
let detection = wzp_client::reflect::detect_nat_type(parsed, 1500, shared_endpoint).await;
|
||||
serde_json::to_value(&detection).map_err(|e| format!("serialize: {e}"))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user