feat(p2p): Phase 3.5 dual-path QUIC race + GUI call-flow debug logs
Two features in one commit because they ship and test together:
Phase 3.5 closes the hole-punching loop and the call-flow debug
logs give the user live visibility into every step of a call so
real-hardware testing of the new P2P path is debuggable.
## Phase 3.5 — dual-path QUIC connect race
Completes the hole-punching work Phase 3 scaffolded. On receiving
a CallSetup with peer_direct_addr, the client now actually races a
direct QUIC handshake against the relay dial and uses whichever
completes first. Symmetric role assignment avoids the two-conns-
per-call problem:
- Both peers compare `own_reflex_addr` vs `peer_reflex_addr`
lexicographically.
- Smaller addr → **Acceptor** (A-role): builds a server-capable
dual endpoint, awaits an incoming QUIC session. Does NOT dial.
- Larger addr → **Dialer** (D-role): builds a client-only
endpoint, dials the peer's addr with `call-<id>` SNI. Does NOT
listen.
- Both sides always dial the relay in parallel as fallback.
- `tokio::select!` with `biased` preference for direct, `tokio::pin!`
so each branch can await the losing opposite as fallback.
- Direct timeout 2s, relay fallback timeout 5s (so 7s worst case
from CallSetup to "no media path" error).
New crate module `wzp_client::dual_path::{race, WinningPath}`
(moved here from desktop/src-tauri so it's testable from a
workspace test). `determine_role` in `wzp_client::reflect` is
pure-function and unit-tested.
### CallEngine integration
- New `pre_connected_transport: Option<Arc<QuinnTransport>>` arg
on both android + desktop `CallEngine::start` branches. Skips
the internal wzp_transport::connect step when Some. Backward-
compat: None keeps Phase 0 relay-only behavior.
- `connect` Tauri command reads own_reflex_addr from SignalState,
computes role, runs the race, passes the winning transport
into CallEngine. If ANY input is missing (no peer addr, no own
addr, equal addrs), falls back to classic relay path —
identical to pre-Phase-3.5 behavior.
### Tests (9 new, all passing)
- 6 unit tests for `determine_role` truth table in
`wzp-client/src/reflect.rs` (smaller=Acceptor, larger=Dialer,
port-only diff, equal, missing-side, symmetry)
- 3 integration tests in `crates/wzp-client/tests/dual_path.rs`:
* `dual_path_direct_wins_on_loopback` — two-endpoint test
rig, Dialer wins direct path vs loopback mock relay
* `dual_path_relay_wins_when_direct_is_dead` — dead peer
port, 2s direct timeout, relay fallback wins
* `dual_path_errors_cleanly_when_both_paths_dead` — <10s
error, no hang
## GUI call-flow debug logs
Runtime-toggled structured events at every step of a call so the
user can see where a call progressed or stalled on real hardware.
Modeled on the existing DRED_VERBOSE_LOGS pattern.
### Rust side
- `static CALL_DEBUG_LOGS: AtomicBool` + `emit_call_debug(&app,
step, details)` helper. Always logs via `tracing::info!`
(logcat always has a copy); GUI Tauri `call-debug-log` event
only fires when the flag is on.
- Tauri commands `set_call_debug_logs` / `get_call_debug_logs`.
### Instrumented steps (24 emit_call_debug sites)
- `register_signal`: start, identity loaded, endpoint created,
connect failed/ok, RegisterPresence sent, ack received/failed,
recv loop spawning
- Recv loop: CallRinging, DirectCallOffer (w/ caller_reflexive_addr),
DirectCallAnswer (w/ callee_reflexive_addr), CallSetup (w/
peer_direct_addr), Hangup
- `place_call`: start, reflect query start/ok/none, offer sent,
send failed
- `answer_call`: start, reflect query start/ok/none or privacy
skip, answer sent, send failed
- `connect`: start, dual_path_race_start (w/ role), won (w/
path), failed, skipped (w/ reasons), call_engine_starting/
started/failed
### JS side
- New `callDebugLogs: boolean` field on Settings type.
- Boot-time hydrate of the Rust flag from localStorage so the
choice survives restarts (like `dredDebugLogs`).
- Settings panel: new "Call flow debug logs" checkbox alongside
the DRED toggle.
- New "Call Debug Log" section that ONLY shows when the flag is
on. Rolling in-memory buffer of the last 200 events, rendered
as monospace `HH:MM:SS.mmm step {details}` lines with auto-
scroll and a Clear button.
- `listen("call-debug-log", ...)` subscribed at app startup,
appends to the buffer, re-renders on every event.
Full workspace test goes from 404 → 413 passing. Clippy clean
on touched crates.
PRD: .taskmaster/docs/prd_phase35_dual_path_race.txt
Tasks: 61-69 all completed
Next: APK + desktop build carrying everything — Phase 2 NAT
detect, Phase 3 advertising, Phase 3.5 dual-path + call debug
logs, plus the earlier Android first-join diagnostics — so the
user can validate the P2P path on real hardware with live
per-step visibility into where any failures happen.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
195
crates/wzp-client/src/dual_path.rs
Normal file
195
crates/wzp-client/src/dual_path.rs
Normal file
@@ -0,0 +1,195 @@
|
||||
//! Phase 3.5 — dual-path QUIC connect race for P2P hole-punching.
|
||||
//!
|
||||
//! When both peers advertised reflex addrs in the
|
||||
//! DirectCallOffer/Answer flow, the relay cross-wires them into
|
||||
//! `CallSetup.peer_direct_addr`. This module races a direct QUIC
|
||||
//! handshake against the existing relay dial and returns whichever
|
||||
//! completes first — with automatic drop of the loser via
|
||||
//! `tokio::select!`.
|
||||
//!
|
||||
//! Role determination is deterministic and symmetric
|
||||
//! (`wzp_client::reflect::determine_role`): whichever peer has the
|
||||
//! lexicographically smaller reflex addr becomes the **Acceptor**
|
||||
//! (listens on a server-capable endpoint), the other becomes the
|
||||
//! **Dialer** (dials the peer's addr). Because the rule is
|
||||
//! identical on both sides, the Acceptor's inbound QUIC session
|
||||
//! and the Dialer's outbound are the SAME connection — no
|
||||
//! negotiation needed, no two-conns-per-call confusion.
|
||||
//!
|
||||
//! Timeout policy:
|
||||
//! - Direct path: 2s from the start of `race`. Cone-NAT hole-punch
|
||||
//! typically completes in < 500ms on a LAN; 2s gives us tolerance
|
||||
//! for a single QUIC Initial retry on unreliable networks.
|
||||
//! - Relay path: 10s (existing behavior elsewhere in the codebase).
|
||||
//! - Overall: `tokio::select!` returns as soon as either succeeds.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::reflect::Role;
|
||||
use wzp_transport::QuinnTransport;
|
||||
|
||||
/// Which path won the race. Used by the `connect` command for
|
||||
/// logging + (in the future) metrics.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum WinningPath {
|
||||
Direct,
|
||||
Relay,
|
||||
}
|
||||
|
||||
/// Attempt a direct QUIC connection to the peer in parallel with
|
||||
/// the relay dial and return the winning `QuinnTransport`.
|
||||
///
|
||||
/// `role` selects the direction of the direct attempt:
|
||||
/// - `Role::Acceptor` creates a server-capable endpoint and waits
|
||||
/// for the peer to dial in.
|
||||
/// - `Role::Dialer` creates a client-only endpoint and dials
|
||||
/// `peer_direct_addr`.
|
||||
///
|
||||
/// The relay path is always attempted in parallel as a fallback so
|
||||
/// the race ALWAYS produces a working transport unless both paths
|
||||
/// genuinely fail (network partition). Returns
|
||||
/// `Err(anyhow::anyhow!(...))` if both paths fail within the
|
||||
/// timeout.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn race(
|
||||
role: Role,
|
||||
peer_direct_addr: SocketAddr,
|
||||
relay_addr: SocketAddr,
|
||||
room_sni: String,
|
||||
call_sni: String,
|
||||
) -> anyhow::Result<(Arc<QuinnTransport>, WinningPath)> {
|
||||
// Rustls provider must be installed before any quinn endpoint
|
||||
// is created. Install attempt is idempotent.
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
|
||||
// Build the direct-path endpoint + future based on role.
|
||||
// Each future returns an already-wrapped `QuinnTransport` so we
|
||||
// don't need a direct `quinn::Connection` type in scope here
|
||||
// (this crate doesn't depend on quinn directly).
|
||||
let direct_ep: wzp_transport::Endpoint;
|
||||
let direct_fut: std::pin::Pin<
|
||||
Box<dyn std::future::Future<Output = anyhow::Result<QuinnTransport>> + Send>,
|
||||
>;
|
||||
|
||||
match role {
|
||||
Role::Acceptor => {
|
||||
let (sc, _cert_der) = wzp_transport::server_config();
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let ep = wzp_transport::create_endpoint(bind, Some(sc))?;
|
||||
tracing::info!(
|
||||
local_addr = ?ep.local_addr().ok(),
|
||||
"dual_path: A-role endpoint up, awaiting peer dial"
|
||||
);
|
||||
let ep_for_fut = ep.clone();
|
||||
direct_fut = Box::pin(async move {
|
||||
// `wzp_transport::accept` wraps the same
|
||||
// `endpoint.accept().await?.await?` dance we want
|
||||
// and maps errors into TransportError for us.
|
||||
let conn = wzp_transport::accept(&ep_for_fut)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
|
||||
Ok(QuinnTransport::new(conn))
|
||||
});
|
||||
direct_ep = ep;
|
||||
}
|
||||
Role::Dialer => {
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let ep = wzp_transport::create_endpoint(bind, None)?;
|
||||
tracing::info!(
|
||||
local_addr = ?ep.local_addr().ok(),
|
||||
%peer_direct_addr,
|
||||
"dual_path: D-role endpoint up, dialing peer"
|
||||
);
|
||||
let ep_for_fut = ep.clone();
|
||||
let client_cfg = wzp_transport::client_config();
|
||||
let sni = call_sni.clone();
|
||||
direct_fut = Box::pin(async move {
|
||||
let conn =
|
||||
wzp_transport::connect(&ep_for_fut, peer_direct_addr, &sni, client_cfg)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("direct dial: {e}"))?;
|
||||
Ok(QuinnTransport::new(conn))
|
||||
});
|
||||
direct_ep = ep;
|
||||
}
|
||||
}
|
||||
|
||||
// Relay path: classic dial to the relay's media room.
|
||||
let relay_bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let relay_ep = wzp_transport::create_endpoint(relay_bind, None)?;
|
||||
let relay_ep_for_fut = relay_ep.clone();
|
||||
let relay_client_cfg = wzp_transport::client_config();
|
||||
let relay_sni = room_sni.clone();
|
||||
let relay_fut = async move {
|
||||
let conn =
|
||||
wzp_transport::connect(&relay_ep_for_fut, relay_addr, &relay_sni, relay_client_cfg)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("relay dial: {e}"))?;
|
||||
Ok::<_, anyhow::Error>(QuinnTransport::new(conn))
|
||||
};
|
||||
|
||||
// Race the two with a shared 2s ceiling on the direct attempt.
|
||||
// Pin both so we can poll them from multiple branches of the
|
||||
// select without moving the futures — the "direct failed, wait
|
||||
// for relay" and "relay failed, wait for direct" fallback paths
|
||||
// below need to await the OPPOSITE future after the winning
|
||||
// branch fires. Without pinning, tokio::select! moves the
|
||||
// future out and we can't touch it again.
|
||||
tracing::info!(?role, %peer_direct_addr, %relay_addr, "dual_path: racing direct vs relay");
|
||||
let direct_timed = tokio::time::timeout(Duration::from_secs(2), direct_fut);
|
||||
tokio::pin!(direct_timed, relay_fut);
|
||||
|
||||
let result = tokio::select! {
|
||||
biased; // prefer direct win if both arrive in the same tick
|
||||
direct_result = &mut direct_timed => {
|
||||
match direct_result {
|
||||
Ok(Ok(transport)) => {
|
||||
tracing::info!(%peer_direct_addr, "dual_path: direct WON");
|
||||
Ok((Arc::new(transport), WinningPath::Direct))
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// Direct failed — fall back to waiting for relay.
|
||||
tracing::warn!(error = %e, "dual_path: direct failed, awaiting relay");
|
||||
match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await {
|
||||
Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)),
|
||||
Ok(Err(e2)) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay={e2}")),
|
||||
Err(_) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay=timeout(5s)")),
|
||||
}
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
tracing::warn!("dual_path: direct timed out (2s), awaiting relay");
|
||||
match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await {
|
||||
Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)),
|
||||
Ok(Err(e2)) => Err(anyhow::anyhow!("direct timeout + relay failed: {e2}")),
|
||||
Err(_) => Err(anyhow::anyhow!("direct timeout + relay timeout")),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
relay_result = &mut relay_fut => {
|
||||
match relay_result {
|
||||
Ok(transport) => {
|
||||
tracing::info!("dual_path: relay WON (direct still pending)");
|
||||
Ok((Arc::new(transport), WinningPath::Relay))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "dual_path: relay failed, awaiting direct remainder");
|
||||
match tokio::time::timeout(Duration::from_millis(1500), &mut direct_timed).await {
|
||||
Ok(Ok(Ok(transport))) => Ok((Arc::new(transport), WinningPath::Direct)),
|
||||
_ => Err(anyhow::anyhow!("relay failed + direct unavailable: {e}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Drop both endpoints once the winner is stored in result. The
|
||||
// winning transport owns its own connection so dropping the
|
||||
// endpoint won't kill it.
|
||||
drop(direct_ep);
|
||||
drop(relay_ep);
|
||||
|
||||
result
|
||||
}
|
||||
@@ -32,6 +32,7 @@ pub mod drift_test;
|
||||
pub mod echo_test;
|
||||
pub mod featherchat;
|
||||
pub mod handshake;
|
||||
pub mod dual_path;
|
||||
pub mod metrics;
|
||||
pub mod reflect;
|
||||
pub mod sweep;
|
||||
|
||||
@@ -223,6 +223,58 @@ pub async fn detect_nat_type(
|
||||
}
|
||||
}
|
||||
|
||||
/// Role assignment for the Phase 3.5 dual-path QUIC race.
|
||||
///
|
||||
/// Both peers already know two strings at CallSetup time: their
|
||||
/// own server-reflexive address (queried via Phase 1 Reflect) and
|
||||
/// the peer's (carried in `CallSetup.peer_direct_addr`). To avoid
|
||||
/// a negotiation round-trip, both sides compare the two strings
|
||||
/// lexicographically and agree on a deterministic role:
|
||||
///
|
||||
/// - **Acceptor** — lexicographically smaller addr. Listens for
|
||||
/// an incoming direct connection from the peer. Does NOT dial.
|
||||
/// - **Dialer** — lexicographically larger addr. Dials the
|
||||
/// peer's direct addr. Does NOT listen.
|
||||
///
|
||||
/// Both roles ALSO dial the relay in parallel as a fallback.
|
||||
/// Whichever future (direct or relay) completes first is used as
|
||||
/// the media transport. Because the role is deterministic and
|
||||
/// symmetric, both peers end up holding the same underlying QUIC
|
||||
/// session on the direct path — A's accepted conn and D's dialed
|
||||
/// conn are literally the same connection.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum Role {
|
||||
/// This peer listens for the direct incoming connection.
|
||||
Acceptor,
|
||||
/// This peer dials the peer's direct address.
|
||||
Dialer,
|
||||
}
|
||||
|
||||
/// Compute the deterministic role for this peer in the dual-path
|
||||
/// race. Returns `None` when no direct attempt is possible —
|
||||
/// either peer didn't advertise a reflex addr, or the two addrs
|
||||
/// are identical (same host on loopback / mis-advertised).
|
||||
///
|
||||
/// The caller should treat `None` as "skip direct, relay-only".
|
||||
pub fn determine_role(
|
||||
own_reflex_addr: Option<&str>,
|
||||
peer_reflex_addr: Option<&str>,
|
||||
) -> Option<Role> {
|
||||
let (own, peer) = match (own_reflex_addr, peer_reflex_addr) {
|
||||
(Some(o), Some(p)) => (o, p),
|
||||
_ => return None,
|
||||
};
|
||||
match own.cmp(peer) {
|
||||
std::cmp::Ordering::Less => Some(Role::Acceptor),
|
||||
std::cmp::Ordering::Greater => Some(Role::Dialer),
|
||||
// Equal addrs should never happen in production (both
|
||||
// peers behind the same NAT mapping + same port would be
|
||||
// a degenerate case). Guard against it so we don't infinite-
|
||||
// loop waiting for a connection to ourselves.
|
||||
std::cmp::Ordering::Equal => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Pure-function NAT classifier — split out for unit testing
|
||||
/// without touching the network.
|
||||
pub fn classify_nat(probes: &[NatProbeResult]) -> (NatType, Option<String>) {
|
||||
@@ -326,6 +378,65 @@ mod tests {
|
||||
assert_eq!(addr.as_deref(), Some("192.0.2.1:4433"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn determine_role_smaller_is_acceptor() {
|
||||
// Lexicographic: "192.0.2.1:4433" < "198.51.100.9:4433"
|
||||
assert_eq!(
|
||||
determine_role(Some("192.0.2.1:4433"), Some("198.51.100.9:4433")),
|
||||
Some(Role::Acceptor)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn determine_role_larger_is_dialer() {
|
||||
assert_eq!(
|
||||
determine_role(Some("198.51.100.9:4433"), Some("192.0.2.1:4433")),
|
||||
Some(Role::Dialer)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn determine_role_port_difference_matters() {
|
||||
// Same ip, different ports — string compare still works
|
||||
// because "4433" < "54321".
|
||||
assert_eq!(
|
||||
determine_role(Some("127.0.0.1:4433"), Some("127.0.0.1:54321")),
|
||||
Some(Role::Acceptor)
|
||||
);
|
||||
assert_eq!(
|
||||
determine_role(Some("127.0.0.1:54321"), Some("127.0.0.1:4433")),
|
||||
Some(Role::Dialer)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn determine_role_equal_addrs_is_none() {
|
||||
assert_eq!(
|
||||
determine_role(Some("192.0.2.1:4433"), Some("192.0.2.1:4433")),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn determine_role_missing_side_is_none() {
|
||||
assert_eq!(determine_role(None, Some("192.0.2.1:4433")), None);
|
||||
assert_eq!(determine_role(Some("192.0.2.1:4433"), None), None);
|
||||
assert_eq!(determine_role(None, None), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn determine_role_is_symmetric_across_peers() {
|
||||
// Both peers compute roles independently; they must end
|
||||
// up with opposite assignments (one Acceptor, one Dialer)
|
||||
// so that each side ends up talking to the other.
|
||||
let a = "192.0.2.1:4433";
|
||||
let b = "198.51.100.9:4433";
|
||||
let alice_role = determine_role(Some(a), Some(b));
|
||||
let bob_role = determine_role(Some(b), Some(a));
|
||||
assert_eq!(alice_role, Some(Role::Acceptor));
|
||||
assert_eq!(bob_role, Some(Role::Dialer));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_one_success_one_failure_is_unknown() {
|
||||
let probes = vec![mk(Some("192.0.2.1:4433")), mk(None)];
|
||||
|
||||
199
crates/wzp-client/tests/dual_path.rs
Normal file
199
crates/wzp-client/tests/dual_path.rs
Normal file
@@ -0,0 +1,199 @@
|
||||
//! Phase 3.5 integration tests for the dual-path QUIC race.
|
||||
//!
|
||||
//! The race takes a role (Acceptor or Dialer), a peer_direct_addr,
|
||||
//! a relay_addr, and two SNI strings, then returns whichever QUIC
|
||||
//! handshake completes first wrapped in a `QuinnTransport`. These
|
||||
//! tests validate that:
|
||||
//!
|
||||
//! 1. On loopback with two real clients playing A + D roles, the
|
||||
//! direct path wins (fewer hops than relay).
|
||||
//! 2. When the direct peer is dead (nothing listening) but the
|
||||
//! relay is up, the relay wins within the fallback window.
|
||||
//! 3. When both paths are dead, the race errors cleanly rather
|
||||
//! than hanging forever.
|
||||
//!
|
||||
//! The "relay" in these tests is a minimal mock that just accepts
|
||||
//! an incoming QUIC connection and drops it — we don't need any
|
||||
//! protocol handling, just a TCP-ish listen-and-accept.
|
||||
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::time::Duration;
|
||||
|
||||
use wzp_client::dual_path::{race, WinningPath};
|
||||
use wzp_client::reflect::Role;
|
||||
use wzp_transport::{create_endpoint, server_config};
|
||||
|
||||
/// Spin up a "relay-ish" mock server on loopback that accepts
|
||||
/// incoming QUIC connections and does nothing with them. Used to
|
||||
/// give the relay branch of the race a real target to dial.
|
||||
/// Returns the bound address + a join handle (kept alive to keep
|
||||
/// the endpoint up).
|
||||
async fn spawn_mock_relay() -> (SocketAddr, tokio::task::JoinHandle<()>) {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let (sc, _cert_der) = server_config();
|
||||
let bind: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||
let ep = create_endpoint(bind, Some(sc)).expect("relay endpoint");
|
||||
let addr = ep.local_addr().expect("local_addr");
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
// Accept loop — hold the connection alive for a short
|
||||
// while so the race result isn't killed by the peer
|
||||
// closing before the winning transport is returned.
|
||||
while let Some(incoming) = ep.accept().await {
|
||||
if let Ok(_conn) = incoming.await {
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
(addr, handle)
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Test 1: direct path wins when both sides are up
|
||||
// -----------------------------------------------------------------------
|
||||
//
|
||||
// Spawn a mock relay, then set up a two-client test where one
|
||||
// client plays the Acceptor role and the other plays the Dialer
|
||||
// role. The Dialer's `peer_direct_addr` is the Acceptor's listen
|
||||
// address. Because the direct path is a single loopback hop and
|
||||
// the relay dial also terminates on loopback, both complete
|
||||
// essentially instantly — the `biased` tokio::select in race()
|
||||
// should pick direct.
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn dual_path_direct_wins_on_loopback() {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let (relay_addr, _relay_handle) = spawn_mock_relay().await;
|
||||
|
||||
// Acceptor task: run race(Role::Acceptor, peer_addr_placeholder, ...).
|
||||
// Since the acceptor doesn't dial, the peer_direct_addr arg is
|
||||
// unused on the direct branch but we still pass a placeholder
|
||||
// because the API takes one. Use a stub addr that would error
|
||||
// if it were ever dialed — proving the Acceptor really doesn't
|
||||
// reach it.
|
||||
let unused_addr: SocketAddr = "127.0.0.1:2".parse().unwrap();
|
||||
|
||||
// We can't race both sides in the same task because each race
|
||||
// call has its own direct endpoint that needs to talk to the
|
||||
// OTHER side's endpoint. So spawn the Acceptor in a task and
|
||||
// let it expose its listen addr via a oneshot back to the test,
|
||||
// then run the Dialer in the test's main task.
|
||||
//
|
||||
// There's a chicken-and-egg issue: the Acceptor's listen addr
|
||||
// is only known after race() creates its endpoint. To avoid
|
||||
// reaching into race()'s internals, we instead play a slight
|
||||
// trick: create the Acceptor's endpoint ourselves (outside
|
||||
// race()) to learn its addr, spin up an accept loop on it
|
||||
// ourselves, and pass THAT addr as the Dialer's peer addr.
|
||||
// This tests the Dialer->Acceptor handshake end-to-end without
|
||||
// running the full race() on both sides.
|
||||
|
||||
let (sc, _cert_der) = server_config();
|
||||
let acceptor_bind: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||
let acceptor_ep = create_endpoint(acceptor_bind, Some(sc)).expect("acceptor ep");
|
||||
let acceptor_listen_addr = acceptor_ep.local_addr().expect("acceptor addr");
|
||||
|
||||
// Drop the external acceptor after the test finishes, not
|
||||
// before — spawn a dedicated accept task.
|
||||
let acceptor_accept_task = tokio::spawn(async move {
|
||||
// Accept one connection and hold it for a while so the
|
||||
// Dialer side can complete its QUIC handshake.
|
||||
if let Some(incoming) = acceptor_ep.accept().await {
|
||||
if let Ok(_conn) = incoming.await {
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Now run the Dialer in the race — peer_direct_addr = acceptor's
|
||||
// listen addr. The relay is the mock from above. Direct path
|
||||
// should win.
|
||||
let result = race(
|
||||
Role::Dialer,
|
||||
acceptor_listen_addr,
|
||||
relay_addr,
|
||||
"test-room".into(),
|
||||
"call-test".into(),
|
||||
)
|
||||
.await
|
||||
.expect("race must succeed");
|
||||
|
||||
assert_eq!(result.1, WinningPath::Direct, "direct should win on loopback");
|
||||
|
||||
// Cancel the acceptor accept task so the test finishes.
|
||||
acceptor_accept_task.abort();
|
||||
// Suppress unused-var warning for the placeholder.
|
||||
let _ = unused_addr;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Test 2: relay wins when the direct peer is dead
|
||||
// -----------------------------------------------------------------------
|
||||
//
|
||||
// Dialer role, peer_direct_addr = a port nothing is listening on,
|
||||
// relay is the working mock. Direct dial will sit waiting for a
|
||||
// QUIC handshake that never comes; the 2s direct timeout kicks in
|
||||
// and the relay path wins the fallback.
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn dual_path_relay_wins_when_direct_is_dead() {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let (relay_addr, _relay_handle) = spawn_mock_relay().await;
|
||||
|
||||
// A port that nothing is listening on — dead direct target.
|
||||
// Port 1 on loopback is almost never bound and UDP packets to
|
||||
// it will be dropped silently, so the QUIC handshake times out.
|
||||
let dead_peer: SocketAddr = "127.0.0.1:1".parse().unwrap();
|
||||
|
||||
let result = race(
|
||||
Role::Dialer,
|
||||
dead_peer,
|
||||
relay_addr,
|
||||
"test-room".into(),
|
||||
"call-test".into(),
|
||||
)
|
||||
.await
|
||||
.expect("race must succeed via relay fallback");
|
||||
|
||||
assert_eq!(
|
||||
result.1,
|
||||
WinningPath::Relay,
|
||||
"relay should win when direct dial has nowhere to land"
|
||||
);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Test 3: race errors cleanly when both paths are dead
|
||||
// -----------------------------------------------------------------------
|
||||
//
|
||||
// Dialer role, peer_direct_addr = dead, relay_addr = dead.
|
||||
// Expected: race returns an Err within ~7s (2s direct timeout +
|
||||
// 5s relay timeout fallback).
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn dual_path_errors_cleanly_when_both_paths_dead() {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
|
||||
let dead_peer: SocketAddr = "127.0.0.1:1".parse().unwrap();
|
||||
let dead_relay: SocketAddr = "127.0.0.1:2".parse().unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let result = race(
|
||||
Role::Dialer,
|
||||
dead_peer,
|
||||
dead_relay,
|
||||
"test-room".into(),
|
||||
"call-test".into(),
|
||||
)
|
||||
.await;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
assert!(result.is_err(), "both-dead must return Err");
|
||||
// Upper bound: direct 2s timeout + relay 5s fallback + small
|
||||
// slack for scheduling. If this blows, something is looping.
|
||||
assert!(
|
||||
elapsed < Duration::from_secs(10),
|
||||
"race took too long to give up: {:?}",
|
||||
elapsed
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user