From f5542ef8222bc1cc9624356bf7badecba92618be Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sun, 12 Apr 2026 10:03:42 +0400 Subject: [PATCH] =?UTF-8?q?feat(p2p):=20Phase=206=20=E2=80=94=20ICE-style?= =?UTF-8?q?=20path=20negotiation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before Phase 6, each side's dual-path race ran independently and committed to whichever transport completed first. When one side picked Direct and the other picked Relay, they sent media to different places — TX > 0 RX: 0 on both, completely silent call. Phase 6 adds a negotiation step: after the local race completes, each side sends a MediaPathReport { call_id, direct_ok, winner } to the peer through the relay. Both wait for the other's report before committing a transport to the CallEngine. The decision rule is simple: if BOTH report direct_ok = true, use direct; if EITHER reports false, BOTH use relay. ## Wire protocol New `SignalMessage::MediaPathReport { call_id, direct_ok, race_winner }`. The relay forwards it to the call peer via the same signal_hub routing used for DirectCallOffer/Answer. The cross-relay dispatcher also forwards it. ## dual_path::race restructured Returns `RaceResult` instead of `(Arc, WinningPath)`: - `direct_transport: Option>` - `relay_transport: Option>` - `local_winner: WinningPath` Both paths are run as spawned tasks. After the first completes, a 1s grace period lets the loser also finish. The connect command gets BOTH transports (when available) and picks the right one based on the negotiation outcome. The unused transport is dropped. ## connect command flow (revised) 1. Run race() → RaceResult with both transports 2. Send MediaPathReport to relay with our direct_ok 3. Install oneshot; wait for peer's report (3s timeout) 4. Decision: both direct_ok → use direct; else → use relay 5. Start CallEngine with the agreed transport If the peer never responds (old build, timeout), falls back to relay — backward compatible. ## Relay forwarding MediaPathReport is forwarded like DirectCallOffer/Answer: via signal_hub.send_to(peer_fp) for same-relay calls, and via cross-relay dispatcher for federated calls. ## Debug log events - `connect:dual_path_race_done` — local race result - `connect:path_report_sent` — our report to the peer - `connect:peer_report_received` — peer's report - `connect:peer_report_timeout` — peer didn't respond (3s) - `connect:path_negotiated` — final agreed path with reasons Full workspace test: 423 passing (no regressions). Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/dual_path.rs | 189 +++++++++++++++++++-------- crates/wzp-client/src/featherchat.rs | 1 + crates/wzp-client/tests/dual_path.rs | 6 +- crates/wzp-proto/src/packet.rs | 25 ++++ crates/wzp-relay/src/main.rs | 32 +++++ desktop/src-tauri/src/lib.rs | 138 +++++++++++++++++-- 6 files changed, 320 insertions(+), 71 deletions(-) diff --git a/crates/wzp-client/src/dual_path.rs b/crates/wzp-client/src/dual_path.rs index 05c28de..061f99b 100644 --- a/crates/wzp-client/src/dual_path.rs +++ b/crates/wzp-client/src/dual_path.rs @@ -38,6 +38,24 @@ pub enum WinningPath { Relay, } +/// Phase 6: the race now returns BOTH transports (when available) +/// so the connect command can negotiate with the peer before +/// committing. The negotiation decides which transport to use +/// based on whether BOTH sides report `direct_ok = true`. +pub struct RaceResult { + /// The direct P2P transport, if the direct path completed. + /// `None` if the direct dial/accept failed or timed out. + pub direct_transport: Option>, + /// The relay transport, if the relay dial completed. + /// `None` if the relay dial failed (shouldn't happen in + /// practice since relay is always reachable). + pub relay_transport: Option>, + /// Which future completed first in the local race. + /// Informational — the actual path used is decided by the + /// Phase 6 negotiation after both sides exchange reports. + pub local_winner: WinningPath, +} + /// Attempt a direct QUIC connection to the peer in parallel with /// the relay dial and return the winning `QuinnTransport`. /// @@ -113,7 +131,7 @@ pub async fn race( // When `None`, falls back to fresh endpoints per role. // Used by tests. shared_endpoint: Option, -) -> anyhow::Result<(Arc, WinningPath)> { +) -> anyhow::Result { // Rustls provider must be installed before any quinn endpoint // is created. Install attempt is idempotent. let _ = rustls::crypto::ring::default_provider().install_default(); @@ -322,76 +340,135 @@ pub async fn race( Ok::<_, anyhow::Error>(QuinnTransport::new(conn)) }; - // Race the two with a shared 2s ceiling on the direct attempt. - // Pin both so we can poll them from multiple branches of the - // select without moving the futures — the "direct failed, wait - // for relay" and "relay failed, wait for direct" fallback paths - // below need to await the OPPOSITE future after the winning - // branch fires. Without pinning, tokio::select! moves the - // future out and we can't touch it again. + // Phase 6: run both paths concurrently via tokio::spawn and + // collect BOTH results. The old tokio::select! approach dropped + // the loser, which meant the connect command couldn't negotiate + // with the peer — it had to commit to whichever path won locally. + // + // Now we spawn both as tasks, wait for the first to complete + // (that determines `local_winner`), then give the loser a short + // grace period to also complete. The connect command gets a + // RaceResult with both transports (when available) and uses the + // Phase 6 MediaPathReport exchange to decide which one to + // actually use for media. tracing::info!( ?role, candidates = ?peer_candidates.dial_order(), %relay_addr, "dual_path: racing direct vs relay" ); - let direct_timed = tokio::time::timeout(Duration::from_secs(2), direct_fut); - tokio::pin!(direct_timed, relay_fut); - let result = tokio::select! { - biased; // prefer direct win if both arrive in the same tick - direct_result = &mut direct_timed => { - match direct_result { - Ok(Ok(transport)) => { - tracing::info!("dual_path: direct WON"); - Ok((Arc::new(transport), WinningPath::Direct)) + let mut direct_task = tokio::spawn( + tokio::time::timeout(Duration::from_secs(2), direct_fut), + ); + let mut relay_task = tokio::spawn(async move { + // Keep the 500ms head start so direct has a chance + tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::timeout(Duration::from_secs(5), relay_fut).await + }); + + // Wait for the first one to complete. This tells us the + // local_winner — but we DON'T commit to it yet. Phase 6 + // negotiation decides the actual path. + let (mut direct_result, mut relay_result): ( + Option>, + Option>, + ) = (None, None); + + let local_winner; + + tokio::select! { + biased; + d = &mut direct_task => { + match d { + Ok(Ok(Ok(t))) => { + tracing::info!("dual_path: direct completed first"); + direct_result = Some(Ok(t)); + local_winner = WinningPath::Direct; } - Ok(Err(e)) => { - // Direct failed — fall back to waiting for relay. - tracing::warn!(error = %e, "dual_path: direct failed, awaiting relay"); - match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await { - Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)), - Ok(Err(e2)) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay={e2}")), - Err(_) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay=timeout(5s)")), - } + Ok(Ok(Err(e))) => { + tracing::warn!(error = %e, "dual_path: direct failed"); + direct_result = Some(Err(anyhow::anyhow!("{e}"))); + local_winner = WinningPath::Relay; // direct failed → relay is our only hope } - Err(_elapsed) => { - tracing::warn!("dual_path: direct timed out (2s), awaiting relay"); - match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await { - Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)), - Ok(Err(e2)) => Err(anyhow::anyhow!("direct timeout + relay failed: {e2}")), - Err(_) => Err(anyhow::anyhow!("direct timeout + relay timeout")), - } - } - } - } - relay_result = &mut relay_fut => { - match relay_result { - Ok(transport) => { - tracing::info!("dual_path: relay WON (direct still pending)"); - Ok((Arc::new(transport), WinningPath::Relay)) + Ok(Err(_)) => { + tracing::warn!("dual_path: direct timed out (2s)"); + direct_result = Some(Err(anyhow::anyhow!("direct timeout"))); + local_winner = WinningPath::Relay; } Err(e) => { - tracing::warn!(error = %e, "dual_path: relay failed, awaiting direct remainder"); - match tokio::time::timeout(Duration::from_millis(1500), &mut direct_timed).await { - Ok(Ok(Ok(transport))) => Ok((Arc::new(transport), WinningPath::Direct)), - _ => Err(anyhow::anyhow!("relay failed + direct unavailable: {e}")), - } + tracing::warn!(error = %e, "dual_path: direct task panicked"); + direct_result = Some(Err(anyhow::anyhow!("direct task panic"))); + local_winner = WinningPath::Relay; } } } - }; + r = &mut relay_task => { + match r { + Ok(Ok(Ok(t))) => { + tracing::info!("dual_path: relay completed first"); + relay_result = Some(Ok(t)); + local_winner = WinningPath::Relay; + } + Ok(Ok(Err(e))) => { + tracing::warn!(error = %e, "dual_path: relay failed"); + relay_result = Some(Err(anyhow::anyhow!("{e}"))); + local_winner = WinningPath::Direct; + } + Ok(Err(_)) => { + tracing::warn!("dual_path: relay timed out"); + relay_result = Some(Err(anyhow::anyhow!("relay timeout"))); + local_winner = WinningPath::Direct; + } + Err(e) => { + relay_result = Some(Err(anyhow::anyhow!("relay task panic: {e}"))); + local_winner = WinningPath::Direct; + } + } + } + } + + // Give the loser a short grace period (1s) to also complete. + // If it does, we have both transports for Phase 6 negotiation. + // If it doesn't, we still proceed with just the winner. + if direct_result.is_none() { + match tokio::time::timeout(Duration::from_secs(1), direct_task).await { + Ok(Ok(Ok(Ok(t)))) => { direct_result = Some(Ok(t)); } + Ok(Ok(Ok(Err(e)))) => { direct_result = Some(Err(anyhow::anyhow!("{e}"))); } + _ => { direct_result = Some(Err(anyhow::anyhow!("direct: no result in grace period"))); } + } + } + if relay_result.is_none() { + match tokio::time::timeout(Duration::from_secs(1), relay_task).await { + Ok(Ok(Ok(Ok(t)))) => { relay_result = Some(Ok(t)); } + Ok(Ok(Ok(Err(e)))) => { relay_result = Some(Err(anyhow::anyhow!("{e}"))); } + _ => { relay_result = Some(Err(anyhow::anyhow!("relay: no result in grace period"))); } + } + } + + let direct_ok = direct_result.as_ref().map(|r| r.is_ok()).unwrap_or(false); + let relay_ok = relay_result.as_ref().map(|r| r.is_ok()).unwrap_or(false); + + tracing::info!( + ?local_winner, + direct_ok, + relay_ok, + "dual_path: race finished, both results collected for Phase 6 negotiation" + ); + + if !direct_ok && !relay_ok { + return Err(anyhow::anyhow!("both paths failed: no media transport available")); + } - // Let both endpoint clones drop at end-of-scope. With the - // Phase 5 shared-endpoint path, these clones are Arc - // clones of the signal endpoint — dropping them just decrements - // the ref count, the socket stays alive for the signal loop + - // any further direct-P2P attempts. With the fresh-endpoint - // fallback, the drops are the last refs so the sockets close - // promptly. Either way the winning transport already owns its - // own quinn::Connection reference which is independent of the - // Endpoint lifetime. let _ = (direct_ep, relay_ep); - result + Ok(RaceResult { + direct_transport: direct_result + .and_then(|r| r.ok()) + .map(|t| Arc::new(t)), + relay_transport: relay_result + .and_then(|r| r.ok()) + .map(|t| Arc::new(t)), + local_winner, + }) } diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index 7433a5d..1166b4b 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -130,6 +130,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { // relay-to-relay message, never rides the featherChat // bridge. Catch-all mapping for completeness. SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer, + SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane } } diff --git a/crates/wzp-client/tests/dual_path.rs b/crates/wzp-client/tests/dual_path.rs index b7cc537..4b1f993 100644 --- a/crates/wzp-client/tests/dual_path.rs +++ b/crates/wzp-client/tests/dual_path.rs @@ -122,7 +122,8 @@ async fn dual_path_direct_wins_on_loopback() { .await .expect("race must succeed"); - assert_eq!(result.1, WinningPath::Direct, "direct should win on loopback"); + assert!(result.direct_transport.is_some(), "direct transport should be available"); + assert_eq!(result.local_winner, WinningPath::Direct, "direct should win on loopback"); // Cancel the acceptor accept task so the test finishes. acceptor_accept_task.abort(); @@ -163,8 +164,9 @@ async fn dual_path_relay_wins_when_direct_is_dead() { .await .expect("race must succeed via relay fallback"); + assert!(result.relay_transport.is_some(), "relay transport should be available"); assert_eq!( - result.1, + result.local_winner, WinningPath::Relay, "relay should win when direct dial has nowhere to land" ); diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index df50848..fe788cd 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -846,6 +846,31 @@ pub enum SignalMessage { observed_addr: String, }, + // ── Phase 6: ICE-style path negotiation ───────────────────── + + /// Phase 6: each side reports the result of its local dual- + /// path race to the other side through the relay. Both peers + /// send this after their race completes; both wait for the + /// other's report before committing a transport to the + /// CallEngine. + /// + /// The decision rule is: if BOTH sides report `direct_ok = + /// true`, use the direct P2P connection. If EITHER reports + /// `direct_ok = false`, BOTH fall back to relay. This + /// eliminates the race condition where one side picks Direct + /// and the other picks Relay — they now agree on the path + /// before any media flows. + MediaPathReport { + call_id: String, + /// Did the direct QUIC connection (P2P dial or accept) + /// complete successfully on this side? + direct_ok: bool, + /// Which future won the local tokio::select race? + /// "Direct" or "Relay" — informational for debug logs. + #[serde(default)] + race_winner: String, + }, + // ── Phase 4: cross-relay direct-call signaling ──────────────────── /// Phase 4: relay-to-relay envelope for forwarding direct-call diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 0899fd8..59a7053 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -687,6 +687,23 @@ async fn main() -> anyhow::Result<()> { } } + // Phase 6: MediaPathReport forwarded across + // federation — deliver to the local participant + // of the matching call. + SignalMessage::MediaPathReport { ref call_id, .. } => { + // Deliver to the local caller (the cross-relay + // dispatcher only handles calls where the caller + // is local and the callee is remote, or vice versa) + let caller_fp = { + let reg = call_registry_d.lock().await; + reg.get(call_id).map(|c| c.caller_fingerprint.clone()) + }; + if let Some(fp) = caller_fp { + let hub = signal_hub_d.lock().await; + let _ = hub.send_to(&fp, &inner).await; + } + } + SignalMessage::Hangup { .. } => { // Best-effort: broadcast the hangup to every // local participant of any call that currently @@ -1294,6 +1311,21 @@ async fn main() -> anyhow::Result<()> { } } + // Phase 6: forward MediaPathReport to the + // call peer so both sides can negotiate + // the media path before committing. + SignalMessage::MediaPathReport { ref call_id, .. } => { + let peer_fp = { + let reg = call_registry.lock().await; + reg.peer_fingerprint(call_id, &client_fp) + .map(|s| s.to_string()) + }; + if let Some(fp) = peer_fp { + let hub = signal_hub.lock().await; + let _ = hub.send_to(&fp, &msg).await; + } + } + SignalMessage::Ping { timestamp_ms } => { let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await; } diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 2851506..35cd8d1 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -454,8 +454,6 @@ async fn connect( })); let room_sni = room.clone(); let call_sni = format!("call-{room}"); - // Phase 5: pass the signal endpoint so the race - // reuses ONE socket for listen + dial + relay. match wzp_client::dual_path::race( r, candidates, @@ -466,20 +464,107 @@ async fn connect( ) .await { - Ok((transport, path)) => { - tracing::info!(?path, "connect: dual-path race resolved"); - emit_call_debug(&app, "connect:dual_path_race_won", serde_json::json!({ - "path": format!("{:?}", path), + Ok(race_result) => { + let local_direct_ok = race_result.direct_transport.is_some(); + let local_winner = race_result.local_winner; + tracing::info!( + ?local_winner, + local_direct_ok, + has_relay = race_result.relay_transport.is_some(), + "connect: race finished, starting Phase 6 negotiation" + ); + emit_call_debug(&app, "connect:dual_path_race_done", serde_json::json!({ + "local_winner": format!("{:?}", local_winner), + "local_direct_ok": local_direct_ok, + "has_relay": race_result.relay_transport.is_some(), })); - Some(transport) + + // Phase 6: send our report to the peer and + // wait for theirs before committing. Both + // sides must agree on the same path to + // prevent the one-picks-Direct-other-picks- + // Relay race condition that causes TX>0 RX=0 + // on both sides. + // + // Extract call_id from the room name + // ("call-" → ""). + let call_id_for_report = room.strip_prefix("call-") + .unwrap_or(&room) + .to_string(); + + // Install the oneshot for receiving the peer's report + let (tx, rx) = tokio::sync::oneshot::channel::(); + let peer_direct_ok = { + let transport_for_report = { + let mut sig = state.signal.lock().await; + sig.pending_path_report = Some(tx); + sig.transport.as_ref().cloned() + }; + // Send our report + if let Some(ref t) = transport_for_report { + let report = wzp_proto::SignalMessage::MediaPathReport { + call_id: call_id_for_report.clone(), + direct_ok: local_direct_ok, + race_winner: format!("{:?}", local_winner), + }; + let _ = t.send_signal(&report).await; + emit_call_debug(&app, "connect:path_report_sent", serde_json::json!({ + "direct_ok": local_direct_ok, + "race_winner": format!("{:?}", local_winner), + })); + } + // Wait for peer's report (3s timeout) + match tokio::time::timeout( + std::time::Duration::from_secs(3), + rx, + ).await { + Ok(Ok(peer_ok)) => { + emit_call_debug(&app, "connect:peer_report_received", serde_json::json!({ + "peer_direct_ok": peer_ok, + })); + peer_ok + } + _ => { + // Timeout or channel error — peer + // may be on an old build without + // Phase 6. Fall back to relay. + emit_call_debug(&app, "connect:peer_report_timeout", serde_json::json!({})); + let mut sig = state.signal.lock().await; + sig.pending_path_report = None; + false + } + } + }; + + // Phase 6 decision: BOTH must agree on direct + let use_direct = local_direct_ok && peer_direct_ok; + let chosen_path = if use_direct { + wzp_client::dual_path::WinningPath::Direct + } else { + wzp_client::dual_path::WinningPath::Relay + }; + emit_call_debug(&app, "connect:path_negotiated", serde_json::json!({ + "use_direct": use_direct, + "local_direct_ok": local_direct_ok, + "peer_direct_ok": peer_direct_ok, + "chosen_path": format!("{:?}", chosen_path), + })); + tracing::info!( + ?chosen_path, + use_direct, + local_direct_ok, + peer_direct_ok, + "connect: Phase 6 path agreed" + ); + + // Pick the agreed transport + if use_direct { + race_result.direct_transport + } else { + race_result.relay_transport + } } Err(e) => { - // Both paths failed — surface to the user. - // CallEngine::start below with None will try - // the relay once more using the old code path - // (which reuses the signal endpoint and has a - // longer timeout) so we don't unconditionally - // fail the call on a transient race blip. tracing::warn!(error = %e, "connect: dual-path race failed, falling back to classic relay connect"); emit_call_debug(&app, "connect:dual_path_race_failed", serde_json::json!({ "error": e.to_string(), @@ -744,6 +829,11 @@ struct SignalState { /// connection. Prevents duplicate supervisors from spawning /// (recv loop exit races with a manual register_signal call). reconnect_in_progress: bool, + /// Phase 6: pending MediaPathReport from the peer. When the + /// connect command sends its own report and waits for the + /// peer's, it installs a oneshot sender here. The recv loop + /// fires it when MediaPathReport arrives. + pending_path_report: Option>, } #[tauri::command] @@ -981,6 +1071,27 @@ fn do_register_signal( let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None; let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"})); } + Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => { + // Phase 6: the peer is telling us whether + // their direct path succeeded. Fire the + // pending oneshot so the connect command can + // make the agreed decision. + tracing::info!( + %call_id, + direct_ok, + %race_winner, + "signal: MediaPathReport from peer" + ); + emit_call_debug(&app_clone, "recv:MediaPathReport", serde_json::json!({ + "call_id": call_id, + "peer_direct_ok": direct_ok, + "peer_race_winner": race_winner, + })); + let mut sig = signal_state.lock().await; + if let Some(tx) = sig.pending_path_report.take() { + let _ = tx.send(direct_ok); + } + } Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => { // "STUN for QUIC" response — the relay told us our // own server-reflexive address. If a Tauri command @@ -1665,6 +1776,7 @@ pub fn run() { own_reflex_addr: None, desired_relay_addr: None, reconnect_in_progress: false, + pending_path_report: None, })), });