Files
wz-phone/crates/wzp-relay/tests/multi_reflect.rs
Siavash Sameni 1618ff6c9d feat(p2p): Phase 5 — single-socket architecture (Nebula-style)
Before Phase 5 WarzonePhone used THREE separate UDP sockets per
client:

  1. Signal endpoint         (register_signal, client-only)
  2. Reflect probe endpoints (one fresh socket per relay probe)
  3. Dual-path race endpoint (fresh per call setup)

This broke two things in production on port-preserving NATs
(MikroTik masquerade, most consumer routers):

  a. Phase 2 NAT detection was WRONG. Each probe used a fresh
     internal port, so MikroTik mapped each one to a different
     external port, and the classifier saw "different port per
     relay" and labeled it SymmetricPort. The real NAT was
     cone-like but measurement via fresh sockets hid that.

  b. Phase 3.5 dual-path P2P race was BROKEN. The reflex addr
     we advertised in DirectCallOffer was observed by the signal
     endpoint's socket. The actual dual-path race listened on a
     DIFFERENT fresh socket, on a different internal (and
     therefore external) port. Peers dialed the advertised addr
     and hit MikroTik's mapping for the signal socket, which
     forwarded to the signal endpoint — a client-only endpoint
     that doesn't accept incoming connections. Direct path
     silently failed, relay always won the race.

Nebula-style fix: one socket for everything. The signal endpoint
is now dual-purpose (client + server_config), and both the
reflect probes and the dual-path race reuse it instead of
creating fresh ones. MikroTik's port-preservation then gives us
a stable external port across all flows → classifier correctly
sees Cone NAT → advertised reflex addr is the actual listening
port → direct dials from peers land on the right socket →
`endpoint.accept()` in the A-role branch of the dual-path race
picks up the incoming connection.

## Changes

### `register_signal` (desktop/src-tauri/src/lib.rs)
- Endpoint now created with `Some(server_config())` instead of
  `None`. The socket can now accept incoming QUIC connections as
  well as dial outbound.
- Every code path that previously read `sig.endpoint` for the
  relay-dial reuse benefits automatically — same socket is now
  ALSO listening for peer dials.

### `probe_reflect_addr` (wzp-client/src/reflect.rs)
- New `existing_endpoint: Option<Endpoint>` arg. `Some` reuses
  the caller's socket (production: pass the signal endpoint).
  `None` creates a fresh one (tests + pre-registration).
- Removed the `drop(endpoint)` at the end — was correct for
  fresh endpoints (explicit early socket close) but incorrect
  for shared ones. End-of-scope drop does the right thing in
  both cases via Arc semantics.

### `detect_nat_type` (wzp-client/src/reflect.rs)
- New `shared_endpoint: Option<Endpoint>` arg, forwarded to
  every probe in the JoinSet fan-out. One shared socket means
  the classifier sees the true NAT type.

### `detect_nat_type` Tauri command (desktop/src-tauri/src/lib.rs)
- Reads `state.signal.endpoint` and passes it as the shared
  endpoint. Falls back to None when not registered. NAT detection
  now produces accurate classifications against MikroTik / most
  consumer NATs.

### `dual_path::race` (wzp-client/src/dual_path.rs)
- New `shared_endpoint: Option<Endpoint>` arg.
- A-role: when `Some`, reuses it for `accept()`. This is the
  critical change — the reflex addr advertised to peers is now
  the address listening for incoming direct dials.
- D-role: when `Some`, reuses it for the outbound direct dial.
  MikroTik keeps the same external port for the dial as for
  the signal flow → direct dial through a cone-mapped NAT.
- Relay path: also reuses the shared endpoint so MikroTik has
  a single consistent mapping across the whole call (saves one
  extra external port and makes firewall traces cleaner).
- When `None`, falls back to fresh per-role endpoints as before.

### `connect` Tauri command (desktop/src-tauri/src/lib.rs)
- Reads `state.signal.endpoint` once when acquiring own reflex
  addr and passes it through to `dual_path::race`.

### Tests
- `wzp-client/tests/dual_path.rs` and
  `wzp-relay/tests/multi_reflect.rs` updated to pass `None` for
  the new endpoint arg — tests use fresh sockets and that's
  fine because the loopback harness doesn't care about
  port-preserving NAT behavior.

Full workspace test: 423 passing (no regressions).

## Expected behavior after this commit on real hardware

Behind MikroTik + Starlink-bypass (the reporter's setup):
- Phase 2 NAT detect → **Cone NAT** (was SymmetricPort — false
  positive from the measurement artifact)
- Phase 3.5 direct-P2P dial → succeeds for both cone-cone and
  cone-CGNAT cases where the remote side was previously blocked
  by our own socket mismatch
- LTE ↔ LTE cross-carrier → still likely relay fallback; that's
  genuinely strict symmetric and needs Phase 5.5 port prediction.

## Phase 5.5 (next, separate PRD)

Multi-candidate port prediction + ICE-style candidate aggregation
for truly strict symmetric NATs. Not needed for the 95% case —
Phase 5 alone fixes most consumer-router setups.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 19:47:20 +04:00

229 lines
8.6 KiB
Rust

//! Phase 2 integration tests for multi-relay NAT reflection
//! (PRD: .taskmaster/docs/prd_multi_relay_reflect.txt).
//!
//! These spin up one or two mock relays that implement the full
//! pre-reflect dance — RegisterPresence → RegisterPresenceAck →
//! Reflect → ReflectResponse — which is what the transient
//! probe helper in `wzp_client::reflect::probe_reflect_addr` does
//! against a real relay.
//!
//! Test matrix:
//! 1. `probe_reflect_addr_happy_path`
//! — single mock relay, assert the probe helper returns the
//! observed addr as 127.0.0.1:<client ephemeral port>
//! 2. `detect_nat_type_two_loopback_relays_is_cone`
//! — two mock relays, one client; loopback single-host means
//! every probe sees the same (127.0.0.1, same_port) so the
//! classifier returns `Cone` + a consensus addr
//! 3. `detect_nat_type_dead_relay_is_unknown`
//! — one alive relay + one dead address; aggregator returns
//! `Unknown` with a non-empty `error` field on the failed
//! probe
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use wzp_client::reflect::{detect_nat_type, probe_reflect_addr, NatType};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::{create_endpoint, server_config, QuinnTransport};
/// Minimal mock relay that loops accepting connections, handles
/// RegisterPresence + Reflect, and responds correctly. Mirrors the
/// two match arms from `wzp-relay/src/main.rs` that matter here.
///
/// Each accepted connection gets its own inner task so multiple
/// simultaneous probes work.
async fn spawn_mock_relay() -> (SocketAddr, tokio::task::JoinHandle<()>) {
let _ = rustls::crypto::ring::default_provider().install_default();
let (sc, _cert_der) = server_config();
let bind: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
let endpoint = create_endpoint(bind, Some(sc)).expect("server endpoint");
let listen_addr = endpoint.local_addr().expect("local_addr");
let handle = tokio::spawn(async move {
loop {
// Accept the next incoming connection. `wzp_transport::accept`
// returns the established `quinn::Connection`.
let conn = match wzp_transport::accept(&endpoint).await {
Ok(c) => c,
Err(_) => break, // endpoint closed
};
let observed_addr = conn.remote_address();
let transport = Arc::new(QuinnTransport::new(conn));
// Per-connection handler. Keep servicing messages until
// the peer closes so one probe connection can do
// RegisterPresence → Ack → Reflect → Response without
// racing other incoming connections.
let t = transport;
tokio::spawn(async move {
loop {
match t.recv_signal().await {
Ok(Some(SignalMessage::RegisterPresence { .. })) => {
let _ = t
.send_signal(&SignalMessage::RegisterPresenceAck {
success: true,
error: None,
})
.await;
}
Ok(Some(SignalMessage::Reflect)) => {
let _ = t
.send_signal(&SignalMessage::ReflectResponse {
observed_addr: observed_addr.to_string(),
})
.await;
}
Ok(Some(_other)) => { /* ignore */ }
Ok(None) => break,
Err(_) => break,
}
}
});
}
});
(listen_addr, handle)
}
// -----------------------------------------------------------------------
// Test 1: probe_reflect_addr against a single mock relay
// -----------------------------------------------------------------------
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn probe_reflect_addr_happy_path() {
let (relay_addr, _relay_handle) = spawn_mock_relay().await;
let (observed, latency_ms) = tokio::time::timeout(
Duration::from_secs(3),
probe_reflect_addr(relay_addr, 2000, None),
)
.await
.expect("probe must complete within 3s")
.expect("probe must succeed");
assert_eq!(
observed.ip().to_string(),
"127.0.0.1",
"loopback test should see 127.0.0.1"
);
assert_ne!(observed.port(), 0, "observed port must be non-zero");
// Latency on same host is dominated by the handshake — generously
// allow up to 2s (the timeout) rather than picking a tight number
// that would be flaky on busy CI runners.
assert!(latency_ms < 2000, "latency {latency_ms}ms too high");
}
// -----------------------------------------------------------------------
// Test 2: two loopback relays → probes succeed, classification is Unknown
// -----------------------------------------------------------------------
//
// With the private-IP filter added in the NAT classifier, loopback
// reflex addrs (127.0.0.1) are dropped before classification —
// they can't possibly indicate public-internet NAT state. So the
// test now asserts:
// - both probes succeed end-to-end (wire plumbing works)
// - both return 127.0.0.1 (same-host is visible)
// - the aggregated verdict is Unknown (no public probes)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn detect_nat_type_two_loopback_relays_probes_work_but_classify_unknown() {
let (addr_a, _h_a) = spawn_mock_relay().await;
let (addr_b, _h_b) = spawn_mock_relay().await;
let detection = detect_nat_type(
vec![
("RelayA".into(), addr_a),
("RelayB".into(), addr_b),
],
2000,
None,
)
.await;
assert_eq!(detection.probes.len(), 2);
for p in &detection.probes {
assert!(
p.observed_addr.is_some(),
"probe {:?} failed: {:?}",
p.relay_name,
p.error
);
}
let observed_ips: Vec<String> = detection
.probes
.iter()
.map(|p| {
p.observed_addr
.as_ref()
.and_then(|s| s.parse::<SocketAddr>().ok())
.map(|a| a.ip().to_string())
.unwrap_or_default()
})
.collect();
assert_eq!(observed_ips[0], "127.0.0.1");
assert_eq!(observed_ips[1], "127.0.0.1");
// Classification: loopback probes are filtered out of the
// public-NAT classifier, so with 0 public probes the result
// is Unknown.
assert_eq!(
detection.nat_type,
NatType::Unknown,
"loopback-only probes must not contribute to public NAT classification"
);
assert!(detection.consensus_addr.is_none());
}
// -----------------------------------------------------------------------
// Test 3: one alive relay + one dead address → Unknown
// -----------------------------------------------------------------------
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn detect_nat_type_dead_relay_is_unknown() {
let (alive_addr, _alive_handle) = spawn_mock_relay().await;
// Dead relay: a port that nothing is listening on. OS will drop
// the packets, the probe should time out within the 600ms budget
// we give it. Pick a port unlikely to be in use — port 1 on
// loopback works on every OS I care about and fails fast.
let dead_addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let detection = detect_nat_type(
vec![
("Alive".into(), alive_addr),
("Dead".into(), dead_addr),
],
600, // tight timeout so the dead probe fails fast
None,
)
.await;
assert_eq!(detection.probes.len(), 2);
// Find the alive and dead probes by name (order of JoinSet
// completions is not guaranteed).
let alive = detection.probes.iter().find(|p| p.relay_name == "Alive").unwrap();
let dead = detection.probes.iter().find(|p| p.relay_name == "Dead").unwrap();
assert!(
alive.observed_addr.is_some(),
"alive probe must succeed: {:?}",
alive.error
);
assert!(
dead.observed_addr.is_none(),
"dead probe must fail, got addr {:?}",
dead.observed_addr
);
assert!(
dead.error.is_some(),
"dead probe must surface an error string"
);
// With only 1 successful probe, the classifier returns Unknown.
assert_eq!(detection.nat_type, NatType::Unknown);
assert!(detection.consensus_addr.is_none());
}