feat(p2p): Phase 6 — ICE-style path negotiation

Before Phase 6, each side's dual-path race ran independently and
committed to whichever transport completed first. When one side
picked Direct and the other picked Relay, they sent media to
different places — TX > 0 RX: 0 on both, completely silent call.

Phase 6 adds a negotiation step: after the local race completes,
each side sends a MediaPathReport { call_id, direct_ok, winner }
to the peer through the relay. Both wait for the other's report
before committing a transport to the CallEngine. The decision
rule is simple: if BOTH report direct_ok = true, use direct; if
EITHER reports false, BOTH use relay.

## Wire protocol

New `SignalMessage::MediaPathReport { call_id, direct_ok,
race_winner }`. The relay forwards it to the call peer via the
same signal_hub routing used for DirectCallOffer/Answer. The
cross-relay dispatcher also forwards it.

## dual_path::race restructured

Returns `RaceResult` instead of `(Arc<QuinnTransport>, WinningPath)`:
- `direct_transport: Option<Arc<QuinnTransport>>`
- `relay_transport: Option<Arc<QuinnTransport>>`
- `local_winner: WinningPath`

Both paths are run as spawned tasks. After the first completes,
a 1s grace period lets the loser also finish. The connect
command gets BOTH transports (when available) and picks the
right one based on the negotiation outcome. The unused transport
is dropped.

## connect command flow (revised)

1. Run race() → RaceResult with both transports
2. Send MediaPathReport to relay with our direct_ok
3. Install oneshot; wait for peer's report (3s timeout)
4. Decision: both direct_ok → use direct; else → use relay
5. Start CallEngine with the agreed transport

If the peer never responds (old build, timeout), falls back to
relay — backward compatible.

## Relay forwarding

MediaPathReport is forwarded like DirectCallOffer/Answer: via
signal_hub.send_to(peer_fp) for same-relay calls, and via
cross-relay dispatcher for federated calls.

## Debug log events

- `connect:dual_path_race_done` — local race result
- `connect:path_report_sent` — our report to the peer
- `connect:peer_report_received` — peer's report
- `connect:peer_report_timeout` — peer didn't respond (3s)
- `connect:path_negotiated` — final agreed path with reasons

Full workspace test: 423 passing (no regressions).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-12 10:03:42 +04:00
parent de007ec2fd
commit f5542ef822
6 changed files with 320 additions and 71 deletions

View File

@@ -38,6 +38,24 @@ pub enum WinningPath {
Relay,
}
/// Phase 6: the race now returns BOTH transports (when available)
/// so the connect command can negotiate with the peer before
/// committing. The negotiation decides which transport to use
/// based on whether BOTH sides report `direct_ok = true`.
pub struct RaceResult {
/// The direct P2P transport, if the direct path completed.
/// `None` if the direct dial/accept failed or timed out.
pub direct_transport: Option<Arc<QuinnTransport>>,
/// The relay transport, if the relay dial completed.
/// `None` if the relay dial failed (shouldn't happen in
/// practice since relay is always reachable).
pub relay_transport: Option<Arc<QuinnTransport>>,
/// Which future completed first in the local race.
/// Informational — the actual path used is decided by the
/// Phase 6 negotiation after both sides exchange reports.
pub local_winner: WinningPath,
}
/// Attempt a direct QUIC connection to the peer in parallel with
/// the relay dial and return the winning `QuinnTransport`.
///
@@ -113,7 +131,7 @@ pub async fn race(
// When `None`, falls back to fresh endpoints per role.
// Used by tests.
shared_endpoint: Option<wzp_transport::Endpoint>,
) -> anyhow::Result<(Arc<QuinnTransport>, WinningPath)> {
) -> anyhow::Result<RaceResult> {
// Rustls provider must be installed before any quinn endpoint
// is created. Install attempt is idempotent.
let _ = rustls::crypto::ring::default_provider().install_default();
@@ -322,76 +340,135 @@ pub async fn race(
Ok::<_, anyhow::Error>(QuinnTransport::new(conn))
};
// Race the two with a shared 2s ceiling on the direct attempt.
// Pin both so we can poll them from multiple branches of the
// select without moving the futures — the "direct failed, wait
// for relay" and "relay failed, wait for direct" fallback paths
// below need to await the OPPOSITE future after the winning
// branch fires. Without pinning, tokio::select! moves the
// future out and we can't touch it again.
// Phase 6: run both paths concurrently via tokio::spawn and
// collect BOTH results. The old tokio::select! approach dropped
// the loser, which meant the connect command couldn't negotiate
// with the peer — it had to commit to whichever path won locally.
//
// Now we spawn both as tasks, wait for the first to complete
// (that determines `local_winner`), then give the loser a short
// grace period to also complete. The connect command gets a
// RaceResult with both transports (when available) and uses the
// Phase 6 MediaPathReport exchange to decide which one to
// actually use for media.
tracing::info!(
?role,
candidates = ?peer_candidates.dial_order(),
%relay_addr,
"dual_path: racing direct vs relay"
);
let direct_timed = tokio::time::timeout(Duration::from_secs(2), direct_fut);
tokio::pin!(direct_timed, relay_fut);
let result = tokio::select! {
biased; // prefer direct win if both arrive in the same tick
direct_result = &mut direct_timed => {
match direct_result {
Ok(Ok(transport)) => {
tracing::info!("dual_path: direct WON");
Ok((Arc::new(transport), WinningPath::Direct))
let mut direct_task = tokio::spawn(
tokio::time::timeout(Duration::from_secs(2), direct_fut),
);
let mut relay_task = tokio::spawn(async move {
// Keep the 500ms head start so direct has a chance
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::timeout(Duration::from_secs(5), relay_fut).await
});
// Wait for the first one to complete. This tells us the
// local_winner — but we DON'T commit to it yet. Phase 6
// negotiation decides the actual path.
let (mut direct_result, mut relay_result): (
Option<anyhow::Result<QuinnTransport>>,
Option<anyhow::Result<QuinnTransport>>,
) = (None, None);
let local_winner;
tokio::select! {
biased;
d = &mut direct_task => {
match d {
Ok(Ok(Ok(t))) => {
tracing::info!("dual_path: direct completed first");
direct_result = Some(Ok(t));
local_winner = WinningPath::Direct;
}
Ok(Err(e)) => {
// Direct failed — fall back to waiting for relay.
tracing::warn!(error = %e, "dual_path: direct failed, awaiting relay");
match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await {
Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)),
Ok(Err(e2)) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay={e2}")),
Err(_) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay=timeout(5s)")),
}
Ok(Ok(Err(e))) => {
tracing::warn!(error = %e, "dual_path: direct failed");
direct_result = Some(Err(anyhow::anyhow!("{e}")));
local_winner = WinningPath::Relay; // direct failed → relay is our only hope
}
Err(_elapsed) => {
tracing::warn!("dual_path: direct timed out (2s), awaiting relay");
match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await {
Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)),
Ok(Err(e2)) => Err(anyhow::anyhow!("direct timeout + relay failed: {e2}")),
Err(_) => Err(anyhow::anyhow!("direct timeout + relay timeout")),
}
}
}
}
relay_result = &mut relay_fut => {
match relay_result {
Ok(transport) => {
tracing::info!("dual_path: relay WON (direct still pending)");
Ok((Arc::new(transport), WinningPath::Relay))
Ok(Err(_)) => {
tracing::warn!("dual_path: direct timed out (2s)");
direct_result = Some(Err(anyhow::anyhow!("direct timeout")));
local_winner = WinningPath::Relay;
}
Err(e) => {
tracing::warn!(error = %e, "dual_path: relay failed, awaiting direct remainder");
match tokio::time::timeout(Duration::from_millis(1500), &mut direct_timed).await {
Ok(Ok(Ok(transport))) => Ok((Arc::new(transport), WinningPath::Direct)),
_ => Err(anyhow::anyhow!("relay failed + direct unavailable: {e}")),
}
tracing::warn!(error = %e, "dual_path: direct task panicked");
direct_result = Some(Err(anyhow::anyhow!("direct task panic")));
local_winner = WinningPath::Relay;
}
}
}
};
r = &mut relay_task => {
match r {
Ok(Ok(Ok(t))) => {
tracing::info!("dual_path: relay completed first");
relay_result = Some(Ok(t));
local_winner = WinningPath::Relay;
}
Ok(Ok(Err(e))) => {
tracing::warn!(error = %e, "dual_path: relay failed");
relay_result = Some(Err(anyhow::anyhow!("{e}")));
local_winner = WinningPath::Direct;
}
Ok(Err(_)) => {
tracing::warn!("dual_path: relay timed out");
relay_result = Some(Err(anyhow::anyhow!("relay timeout")));
local_winner = WinningPath::Direct;
}
Err(e) => {
relay_result = Some(Err(anyhow::anyhow!("relay task panic: {e}")));
local_winner = WinningPath::Direct;
}
}
}
}
// Give the loser a short grace period (1s) to also complete.
// If it does, we have both transports for Phase 6 negotiation.
// If it doesn't, we still proceed with just the winner.
if direct_result.is_none() {
match tokio::time::timeout(Duration::from_secs(1), direct_task).await {
Ok(Ok(Ok(Ok(t)))) => { direct_result = Some(Ok(t)); }
Ok(Ok(Ok(Err(e)))) => { direct_result = Some(Err(anyhow::anyhow!("{e}"))); }
_ => { direct_result = Some(Err(anyhow::anyhow!("direct: no result in grace period"))); }
}
}
if relay_result.is_none() {
match tokio::time::timeout(Duration::from_secs(1), relay_task).await {
Ok(Ok(Ok(Ok(t)))) => { relay_result = Some(Ok(t)); }
Ok(Ok(Ok(Err(e)))) => { relay_result = Some(Err(anyhow::anyhow!("{e}"))); }
_ => { relay_result = Some(Err(anyhow::anyhow!("relay: no result in grace period"))); }
}
}
let direct_ok = direct_result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
let relay_ok = relay_result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
tracing::info!(
?local_winner,
direct_ok,
relay_ok,
"dual_path: race finished, both results collected for Phase 6 negotiation"
);
if !direct_ok && !relay_ok {
return Err(anyhow::anyhow!("both paths failed: no media transport available"));
}
// Let both endpoint clones drop at end-of-scope. With the
// Phase 5 shared-endpoint path, these clones are Arc<Endpoint>
// clones of the signal endpoint — dropping them just decrements
// the ref count, the socket stays alive for the signal loop +
// any further direct-P2P attempts. With the fresh-endpoint
// fallback, the drops are the last refs so the sockets close
// promptly. Either way the winning transport already owns its
// own quinn::Connection reference which is independent of the
// Endpoint lifetime.
let _ = (direct_ep, relay_ep);
result
Ok(RaceResult {
direct_transport: direct_result
.and_then(|r| r.ok())
.map(|t| Arc::new(t)),
relay_transport: relay_result
.and_then(|r| r.ok())
.map(|t| Arc::new(t)),
local_winner,
})
}

View File

@@ -130,6 +130,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
// relay-to-relay message, never rides the featherChat
// bridge. Catch-all mapping for completeness.
SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer,
SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane
}
}

View File

@@ -122,7 +122,8 @@ async fn dual_path_direct_wins_on_loopback() {
.await
.expect("race must succeed");
assert_eq!(result.1, WinningPath::Direct, "direct should win on loopback");
assert!(result.direct_transport.is_some(), "direct transport should be available");
assert_eq!(result.local_winner, WinningPath::Direct, "direct should win on loopback");
// Cancel the acceptor accept task so the test finishes.
acceptor_accept_task.abort();
@@ -163,8 +164,9 @@ async fn dual_path_relay_wins_when_direct_is_dead() {
.await
.expect("race must succeed via relay fallback");
assert!(result.relay_transport.is_some(), "relay transport should be available");
assert_eq!(
result.1,
result.local_winner,
WinningPath::Relay,
"relay should win when direct dial has nowhere to land"
);

View File

@@ -846,6 +846,31 @@ pub enum SignalMessage {
observed_addr: String,
},
// ── Phase 6: ICE-style path negotiation ─────────────────────
/// Phase 6: each side reports the result of its local dual-
/// path race to the other side through the relay. Both peers
/// send this after their race completes; both wait for the
/// other's report before committing a transport to the
/// CallEngine.
///
/// The decision rule is: if BOTH sides report `direct_ok =
/// true`, use the direct P2P connection. If EITHER reports
/// `direct_ok = false`, BOTH fall back to relay. This
/// eliminates the race condition where one side picks Direct
/// and the other picks Relay — they now agree on the path
/// before any media flows.
MediaPathReport {
call_id: String,
/// Did the direct QUIC connection (P2P dial or accept)
/// complete successfully on this side?
direct_ok: bool,
/// Which future won the local tokio::select race?
/// "Direct" or "Relay" — informational for debug logs.
#[serde(default)]
race_winner: String,
},
// ── Phase 4: cross-relay direct-call signaling ────────────────────
/// Phase 4: relay-to-relay envelope for forwarding direct-call

View File

@@ -687,6 +687,23 @@ async fn main() -> anyhow::Result<()> {
}
}
// Phase 6: MediaPathReport forwarded across
// federation — deliver to the local participant
// of the matching call.
SignalMessage::MediaPathReport { ref call_id, .. } => {
// Deliver to the local caller (the cross-relay
// dispatcher only handles calls where the caller
// is local and the callee is remote, or vice versa)
let caller_fp = {
let reg = call_registry_d.lock().await;
reg.get(call_id).map(|c| c.caller_fingerprint.clone())
};
if let Some(fp) = caller_fp {
let hub = signal_hub_d.lock().await;
let _ = hub.send_to(&fp, &inner).await;
}
}
SignalMessage::Hangup { .. } => {
// Best-effort: broadcast the hangup to every
// local participant of any call that currently
@@ -1294,6 +1311,21 @@ async fn main() -> anyhow::Result<()> {
}
}
// Phase 6: forward MediaPathReport to the
// call peer so both sides can negotiate
// the media path before committing.
SignalMessage::MediaPathReport { ref call_id, .. } => {
let peer_fp = {
let reg = call_registry.lock().await;
reg.peer_fingerprint(call_id, &client_fp)
.map(|s| s.to_string())
};
if let Some(fp) = peer_fp {
let hub = signal_hub.lock().await;
let _ = hub.send_to(&fp, &msg).await;
}
}
SignalMessage::Ping { timestamp_ms } => {
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
}

View File

@@ -454,8 +454,6 @@ async fn connect(
}));
let room_sni = room.clone();
let call_sni = format!("call-{room}");
// Phase 5: pass the signal endpoint so the race
// reuses ONE socket for listen + dial + relay.
match wzp_client::dual_path::race(
r,
candidates,
@@ -466,20 +464,107 @@ async fn connect(
)
.await
{
Ok((transport, path)) => {
tracing::info!(?path, "connect: dual-path race resolved");
emit_call_debug(&app, "connect:dual_path_race_won", serde_json::json!({
"path": format!("{:?}", path),
Ok(race_result) => {
let local_direct_ok = race_result.direct_transport.is_some();
let local_winner = race_result.local_winner;
tracing::info!(
?local_winner,
local_direct_ok,
has_relay = race_result.relay_transport.is_some(),
"connect: race finished, starting Phase 6 negotiation"
);
emit_call_debug(&app, "connect:dual_path_race_done", serde_json::json!({
"local_winner": format!("{:?}", local_winner),
"local_direct_ok": local_direct_ok,
"has_relay": race_result.relay_transport.is_some(),
}));
Some(transport)
// Phase 6: send our report to the peer and
// wait for theirs before committing. Both
// sides must agree on the same path to
// prevent the one-picks-Direct-other-picks-
// Relay race condition that causes TX>0 RX=0
// on both sides.
//
// Extract call_id from the room name
// ("call-<id>" → "<id>").
let call_id_for_report = room.strip_prefix("call-")
.unwrap_or(&room)
.to_string();
// Install the oneshot for receiving the peer's report
let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
let peer_direct_ok = {
let transport_for_report = {
let mut sig = state.signal.lock().await;
sig.pending_path_report = Some(tx);
sig.transport.as_ref().cloned()
};
// Send our report
if let Some(ref t) = transport_for_report {
let report = wzp_proto::SignalMessage::MediaPathReport {
call_id: call_id_for_report.clone(),
direct_ok: local_direct_ok,
race_winner: format!("{:?}", local_winner),
};
let _ = t.send_signal(&report).await;
emit_call_debug(&app, "connect:path_report_sent", serde_json::json!({
"direct_ok": local_direct_ok,
"race_winner": format!("{:?}", local_winner),
}));
}
// Wait for peer's report (3s timeout)
match tokio::time::timeout(
std::time::Duration::from_secs(3),
rx,
).await {
Ok(Ok(peer_ok)) => {
emit_call_debug(&app, "connect:peer_report_received", serde_json::json!({
"peer_direct_ok": peer_ok,
}));
peer_ok
}
_ => {
// Timeout or channel error — peer
// may be on an old build without
// Phase 6. Fall back to relay.
emit_call_debug(&app, "connect:peer_report_timeout", serde_json::json!({}));
let mut sig = state.signal.lock().await;
sig.pending_path_report = None;
false
}
}
};
// Phase 6 decision: BOTH must agree on direct
let use_direct = local_direct_ok && peer_direct_ok;
let chosen_path = if use_direct {
wzp_client::dual_path::WinningPath::Direct
} else {
wzp_client::dual_path::WinningPath::Relay
};
emit_call_debug(&app, "connect:path_negotiated", serde_json::json!({
"use_direct": use_direct,
"local_direct_ok": local_direct_ok,
"peer_direct_ok": peer_direct_ok,
"chosen_path": format!("{:?}", chosen_path),
}));
tracing::info!(
?chosen_path,
use_direct,
local_direct_ok,
peer_direct_ok,
"connect: Phase 6 path agreed"
);
// Pick the agreed transport
if use_direct {
race_result.direct_transport
} else {
race_result.relay_transport
}
}
Err(e) => {
// Both paths failed — surface to the user.
// CallEngine::start below with None will try
// the relay once more using the old code path
// (which reuses the signal endpoint and has a
// longer timeout) so we don't unconditionally
// fail the call on a transient race blip.
tracing::warn!(error = %e, "connect: dual-path race failed, falling back to classic relay connect");
emit_call_debug(&app, "connect:dual_path_race_failed", serde_json::json!({
"error": e.to_string(),
@@ -744,6 +829,11 @@ struct SignalState {
/// connection. Prevents duplicate supervisors from spawning
/// (recv loop exit races with a manual register_signal call).
reconnect_in_progress: bool,
/// Phase 6: pending MediaPathReport from the peer. When the
/// connect command sends its own report and waits for the
/// peer's, it installs a oneshot sender here. The recv loop
/// fires it when MediaPathReport arrives.
pending_path_report: Option<tokio::sync::oneshot::Sender<bool>>,
}
#[tauri::command]
@@ -981,6 +1071,27 @@ fn do_register_signal(
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None;
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
}
Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => {
// Phase 6: the peer is telling us whether
// their direct path succeeded. Fire the
// pending oneshot so the connect command can
// make the agreed decision.
tracing::info!(
%call_id,
direct_ok,
%race_winner,
"signal: MediaPathReport from peer"
);
emit_call_debug(&app_clone, "recv:MediaPathReport", serde_json::json!({
"call_id": call_id,
"peer_direct_ok": direct_ok,
"peer_race_winner": race_winner,
}));
let mut sig = signal_state.lock().await;
if let Some(tx) = sig.pending_path_report.take() {
let _ = tx.send(direct_ok);
}
}
Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => {
// "STUN for QUIC" response — the relay told us our
// own server-reflexive address. If a Tauri command
@@ -1665,6 +1776,7 @@ pub fn run() {
own_reflex_addr: None,
desired_relay_addr: None,
reconnect_in_progress: false,
pending_path_report: None,
})),
});