feat(p2p): Phase 6 — ICE-style path negotiation
Before Phase 6, each side's dual-path race ran independently and
committed to whichever transport completed first. When one side
picked Direct and the other picked Relay, they sent media to
different places — TX > 0 RX: 0 on both, completely silent call.
Phase 6 adds a negotiation step: after the local race completes,
each side sends a MediaPathReport { call_id, direct_ok, winner }
to the peer through the relay. Both wait for the other's report
before committing a transport to the CallEngine. The decision
rule is simple: if BOTH report direct_ok = true, use direct; if
EITHER reports false, BOTH use relay.
## Wire protocol
New `SignalMessage::MediaPathReport { call_id, direct_ok,
race_winner }`. The relay forwards it to the call peer via the
same signal_hub routing used for DirectCallOffer/Answer. The
cross-relay dispatcher also forwards it.
## dual_path::race restructured
Returns `RaceResult` instead of `(Arc<QuinnTransport>, WinningPath)`:
- `direct_transport: Option<Arc<QuinnTransport>>`
- `relay_transport: Option<Arc<QuinnTransport>>`
- `local_winner: WinningPath`
Both paths are run as spawned tasks. After the first completes,
a 1s grace period lets the loser also finish. The connect
command gets BOTH transports (when available) and picks the
right one based on the negotiation outcome. The unused transport
is dropped.
## connect command flow (revised)
1. Run race() → RaceResult with both transports
2. Send MediaPathReport to relay with our direct_ok
3. Install oneshot; wait for peer's report (3s timeout)
4. Decision: both direct_ok → use direct; else → use relay
5. Start CallEngine with the agreed transport
If the peer never responds (old build, timeout), falls back to
relay — backward compatible.
## Relay forwarding
MediaPathReport is forwarded like DirectCallOffer/Answer: via
signal_hub.send_to(peer_fp) for same-relay calls, and via
cross-relay dispatcher for federated calls.
## Debug log events
- `connect:dual_path_race_done` — local race result
- `connect:path_report_sent` — our report to the peer
- `connect:peer_report_received` — peer's report
- `connect:peer_report_timeout` — peer didn't respond (3s)
- `connect:path_negotiated` — final agreed path with reasons
Full workspace test: 423 passing (no regressions).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -38,6 +38,24 @@ pub enum WinningPath {
|
||||
Relay,
|
||||
}
|
||||
|
||||
/// Phase 6: the race now returns BOTH transports (when available)
|
||||
/// so the connect command can negotiate with the peer before
|
||||
/// committing. The negotiation decides which transport to use
|
||||
/// based on whether BOTH sides report `direct_ok = true`.
|
||||
pub struct RaceResult {
|
||||
/// The direct P2P transport, if the direct path completed.
|
||||
/// `None` if the direct dial/accept failed or timed out.
|
||||
pub direct_transport: Option<Arc<QuinnTransport>>,
|
||||
/// The relay transport, if the relay dial completed.
|
||||
/// `None` if the relay dial failed (shouldn't happen in
|
||||
/// practice since relay is always reachable).
|
||||
pub relay_transport: Option<Arc<QuinnTransport>>,
|
||||
/// Which future completed first in the local race.
|
||||
/// Informational — the actual path used is decided by the
|
||||
/// Phase 6 negotiation after both sides exchange reports.
|
||||
pub local_winner: WinningPath,
|
||||
}
|
||||
|
||||
/// Attempt a direct QUIC connection to the peer in parallel with
|
||||
/// the relay dial and return the winning `QuinnTransport`.
|
||||
///
|
||||
@@ -113,7 +131,7 @@ pub async fn race(
|
||||
// When `None`, falls back to fresh endpoints per role.
|
||||
// Used by tests.
|
||||
shared_endpoint: Option<wzp_transport::Endpoint>,
|
||||
) -> anyhow::Result<(Arc<QuinnTransport>, WinningPath)> {
|
||||
) -> anyhow::Result<RaceResult> {
|
||||
// Rustls provider must be installed before any quinn endpoint
|
||||
// is created. Install attempt is idempotent.
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
@@ -322,76 +340,135 @@ pub async fn race(
|
||||
Ok::<_, anyhow::Error>(QuinnTransport::new(conn))
|
||||
};
|
||||
|
||||
// Race the two with a shared 2s ceiling on the direct attempt.
|
||||
// Pin both so we can poll them from multiple branches of the
|
||||
// select without moving the futures — the "direct failed, wait
|
||||
// for relay" and "relay failed, wait for direct" fallback paths
|
||||
// below need to await the OPPOSITE future after the winning
|
||||
// branch fires. Without pinning, tokio::select! moves the
|
||||
// future out and we can't touch it again.
|
||||
// Phase 6: run both paths concurrently via tokio::spawn and
|
||||
// collect BOTH results. The old tokio::select! approach dropped
|
||||
// the loser, which meant the connect command couldn't negotiate
|
||||
// with the peer — it had to commit to whichever path won locally.
|
||||
//
|
||||
// Now we spawn both as tasks, wait for the first to complete
|
||||
// (that determines `local_winner`), then give the loser a short
|
||||
// grace period to also complete. The connect command gets a
|
||||
// RaceResult with both transports (when available) and uses the
|
||||
// Phase 6 MediaPathReport exchange to decide which one to
|
||||
// actually use for media.
|
||||
tracing::info!(
|
||||
?role,
|
||||
candidates = ?peer_candidates.dial_order(),
|
||||
%relay_addr,
|
||||
"dual_path: racing direct vs relay"
|
||||
);
|
||||
let direct_timed = tokio::time::timeout(Duration::from_secs(2), direct_fut);
|
||||
tokio::pin!(direct_timed, relay_fut);
|
||||
|
||||
let result = tokio::select! {
|
||||
biased; // prefer direct win if both arrive in the same tick
|
||||
direct_result = &mut direct_timed => {
|
||||
match direct_result {
|
||||
Ok(Ok(transport)) => {
|
||||
tracing::info!("dual_path: direct WON");
|
||||
Ok((Arc::new(transport), WinningPath::Direct))
|
||||
let mut direct_task = tokio::spawn(
|
||||
tokio::time::timeout(Duration::from_secs(2), direct_fut),
|
||||
);
|
||||
let mut relay_task = tokio::spawn(async move {
|
||||
// Keep the 500ms head start so direct has a chance
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
tokio::time::timeout(Duration::from_secs(5), relay_fut).await
|
||||
});
|
||||
|
||||
// Wait for the first one to complete. This tells us the
|
||||
// local_winner — but we DON'T commit to it yet. Phase 6
|
||||
// negotiation decides the actual path.
|
||||
let (mut direct_result, mut relay_result): (
|
||||
Option<anyhow::Result<QuinnTransport>>,
|
||||
Option<anyhow::Result<QuinnTransport>>,
|
||||
) = (None, None);
|
||||
|
||||
let local_winner;
|
||||
|
||||
tokio::select! {
|
||||
biased;
|
||||
d = &mut direct_task => {
|
||||
match d {
|
||||
Ok(Ok(Ok(t))) => {
|
||||
tracing::info!("dual_path: direct completed first");
|
||||
direct_result = Some(Ok(t));
|
||||
local_winner = WinningPath::Direct;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// Direct failed — fall back to waiting for relay.
|
||||
tracing::warn!(error = %e, "dual_path: direct failed, awaiting relay");
|
||||
match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await {
|
||||
Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)),
|
||||
Ok(Err(e2)) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay={e2}")),
|
||||
Err(_) => Err(anyhow::anyhow!("both paths failed: direct={e}, relay=timeout(5s)")),
|
||||
}
|
||||
Ok(Ok(Err(e))) => {
|
||||
tracing::warn!(error = %e, "dual_path: direct failed");
|
||||
direct_result = Some(Err(anyhow::anyhow!("{e}")));
|
||||
local_winner = WinningPath::Relay; // direct failed → relay is our only hope
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
tracing::warn!("dual_path: direct timed out (2s), awaiting relay");
|
||||
match tokio::time::timeout(Duration::from_secs(5), &mut relay_fut).await {
|
||||
Ok(Ok(transport)) => Ok((Arc::new(transport), WinningPath::Relay)),
|
||||
Ok(Err(e2)) => Err(anyhow::anyhow!("direct timeout + relay failed: {e2}")),
|
||||
Err(_) => Err(anyhow::anyhow!("direct timeout + relay timeout")),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
relay_result = &mut relay_fut => {
|
||||
match relay_result {
|
||||
Ok(transport) => {
|
||||
tracing::info!("dual_path: relay WON (direct still pending)");
|
||||
Ok((Arc::new(transport), WinningPath::Relay))
|
||||
Ok(Err(_)) => {
|
||||
tracing::warn!("dual_path: direct timed out (2s)");
|
||||
direct_result = Some(Err(anyhow::anyhow!("direct timeout")));
|
||||
local_winner = WinningPath::Relay;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "dual_path: relay failed, awaiting direct remainder");
|
||||
match tokio::time::timeout(Duration::from_millis(1500), &mut direct_timed).await {
|
||||
Ok(Ok(Ok(transport))) => Ok((Arc::new(transport), WinningPath::Direct)),
|
||||
_ => Err(anyhow::anyhow!("relay failed + direct unavailable: {e}")),
|
||||
}
|
||||
tracing::warn!(error = %e, "dual_path: direct task panicked");
|
||||
direct_result = Some(Err(anyhow::anyhow!("direct task panic")));
|
||||
local_winner = WinningPath::Relay;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
r = &mut relay_task => {
|
||||
match r {
|
||||
Ok(Ok(Ok(t))) => {
|
||||
tracing::info!("dual_path: relay completed first");
|
||||
relay_result = Some(Ok(t));
|
||||
local_winner = WinningPath::Relay;
|
||||
}
|
||||
Ok(Ok(Err(e))) => {
|
||||
tracing::warn!(error = %e, "dual_path: relay failed");
|
||||
relay_result = Some(Err(anyhow::anyhow!("{e}")));
|
||||
local_winner = WinningPath::Direct;
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
tracing::warn!("dual_path: relay timed out");
|
||||
relay_result = Some(Err(anyhow::anyhow!("relay timeout")));
|
||||
local_winner = WinningPath::Direct;
|
||||
}
|
||||
Err(e) => {
|
||||
relay_result = Some(Err(anyhow::anyhow!("relay task panic: {e}")));
|
||||
local_winner = WinningPath::Direct;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Give the loser a short grace period (1s) to also complete.
|
||||
// If it does, we have both transports for Phase 6 negotiation.
|
||||
// If it doesn't, we still proceed with just the winner.
|
||||
if direct_result.is_none() {
|
||||
match tokio::time::timeout(Duration::from_secs(1), direct_task).await {
|
||||
Ok(Ok(Ok(Ok(t)))) => { direct_result = Some(Ok(t)); }
|
||||
Ok(Ok(Ok(Err(e)))) => { direct_result = Some(Err(anyhow::anyhow!("{e}"))); }
|
||||
_ => { direct_result = Some(Err(anyhow::anyhow!("direct: no result in grace period"))); }
|
||||
}
|
||||
}
|
||||
if relay_result.is_none() {
|
||||
match tokio::time::timeout(Duration::from_secs(1), relay_task).await {
|
||||
Ok(Ok(Ok(Ok(t)))) => { relay_result = Some(Ok(t)); }
|
||||
Ok(Ok(Ok(Err(e)))) => { relay_result = Some(Err(anyhow::anyhow!("{e}"))); }
|
||||
_ => { relay_result = Some(Err(anyhow::anyhow!("relay: no result in grace period"))); }
|
||||
}
|
||||
}
|
||||
|
||||
let direct_ok = direct_result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
|
||||
let relay_ok = relay_result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
|
||||
|
||||
tracing::info!(
|
||||
?local_winner,
|
||||
direct_ok,
|
||||
relay_ok,
|
||||
"dual_path: race finished, both results collected for Phase 6 negotiation"
|
||||
);
|
||||
|
||||
if !direct_ok && !relay_ok {
|
||||
return Err(anyhow::anyhow!("both paths failed: no media transport available"));
|
||||
}
|
||||
|
||||
// Let both endpoint clones drop at end-of-scope. With the
|
||||
// Phase 5 shared-endpoint path, these clones are Arc<Endpoint>
|
||||
// clones of the signal endpoint — dropping them just decrements
|
||||
// the ref count, the socket stays alive for the signal loop +
|
||||
// any further direct-P2P attempts. With the fresh-endpoint
|
||||
// fallback, the drops are the last refs so the sockets close
|
||||
// promptly. Either way the winning transport already owns its
|
||||
// own quinn::Connection reference which is independent of the
|
||||
// Endpoint lifetime.
|
||||
let _ = (direct_ep, relay_ep);
|
||||
|
||||
result
|
||||
Ok(RaceResult {
|
||||
direct_transport: direct_result
|
||||
.and_then(|r| r.ok())
|
||||
.map(|t| Arc::new(t)),
|
||||
relay_transport: relay_result
|
||||
.and_then(|r| r.ok())
|
||||
.map(|t| Arc::new(t)),
|
||||
local_winner,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -130,6 +130,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
|
||||
// relay-to-relay message, never rides the featherChat
|
||||
// bridge. Catch-all mapping for completeness.
|
||||
SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer,
|
||||
SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -122,7 +122,8 @@ async fn dual_path_direct_wins_on_loopback() {
|
||||
.await
|
||||
.expect("race must succeed");
|
||||
|
||||
assert_eq!(result.1, WinningPath::Direct, "direct should win on loopback");
|
||||
assert!(result.direct_transport.is_some(), "direct transport should be available");
|
||||
assert_eq!(result.local_winner, WinningPath::Direct, "direct should win on loopback");
|
||||
|
||||
// Cancel the acceptor accept task so the test finishes.
|
||||
acceptor_accept_task.abort();
|
||||
@@ -163,8 +164,9 @@ async fn dual_path_relay_wins_when_direct_is_dead() {
|
||||
.await
|
||||
.expect("race must succeed via relay fallback");
|
||||
|
||||
assert!(result.relay_transport.is_some(), "relay transport should be available");
|
||||
assert_eq!(
|
||||
result.1,
|
||||
result.local_winner,
|
||||
WinningPath::Relay,
|
||||
"relay should win when direct dial has nowhere to land"
|
||||
);
|
||||
|
||||
@@ -846,6 +846,31 @@ pub enum SignalMessage {
|
||||
observed_addr: String,
|
||||
},
|
||||
|
||||
// ── Phase 6: ICE-style path negotiation ─────────────────────
|
||||
|
||||
/// Phase 6: each side reports the result of its local dual-
|
||||
/// path race to the other side through the relay. Both peers
|
||||
/// send this after their race completes; both wait for the
|
||||
/// other's report before committing a transport to the
|
||||
/// CallEngine.
|
||||
///
|
||||
/// The decision rule is: if BOTH sides report `direct_ok =
|
||||
/// true`, use the direct P2P connection. If EITHER reports
|
||||
/// `direct_ok = false`, BOTH fall back to relay. This
|
||||
/// eliminates the race condition where one side picks Direct
|
||||
/// and the other picks Relay — they now agree on the path
|
||||
/// before any media flows.
|
||||
MediaPathReport {
|
||||
call_id: String,
|
||||
/// Did the direct QUIC connection (P2P dial or accept)
|
||||
/// complete successfully on this side?
|
||||
direct_ok: bool,
|
||||
/// Which future won the local tokio::select race?
|
||||
/// "Direct" or "Relay" — informational for debug logs.
|
||||
#[serde(default)]
|
||||
race_winner: String,
|
||||
},
|
||||
|
||||
// ── Phase 4: cross-relay direct-call signaling ────────────────────
|
||||
|
||||
/// Phase 4: relay-to-relay envelope for forwarding direct-call
|
||||
|
||||
@@ -687,6 +687,23 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 6: MediaPathReport forwarded across
|
||||
// federation — deliver to the local participant
|
||||
// of the matching call.
|
||||
SignalMessage::MediaPathReport { ref call_id, .. } => {
|
||||
// Deliver to the local caller (the cross-relay
|
||||
// dispatcher only handles calls where the caller
|
||||
// is local and the callee is remote, or vice versa)
|
||||
let caller_fp = {
|
||||
let reg = call_registry_d.lock().await;
|
||||
reg.get(call_id).map(|c| c.caller_fingerprint.clone())
|
||||
};
|
||||
if let Some(fp) = caller_fp {
|
||||
let hub = signal_hub_d.lock().await;
|
||||
let _ = hub.send_to(&fp, &inner).await;
|
||||
}
|
||||
}
|
||||
|
||||
SignalMessage::Hangup { .. } => {
|
||||
// Best-effort: broadcast the hangup to every
|
||||
// local participant of any call that currently
|
||||
@@ -1294,6 +1311,21 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 6: forward MediaPathReport to the
|
||||
// call peer so both sides can negotiate
|
||||
// the media path before committing.
|
||||
SignalMessage::MediaPathReport { ref call_id, .. } => {
|
||||
let peer_fp = {
|
||||
let reg = call_registry.lock().await;
|
||||
reg.peer_fingerprint(call_id, &client_fp)
|
||||
.map(|s| s.to_string())
|
||||
};
|
||||
if let Some(fp) = peer_fp {
|
||||
let hub = signal_hub.lock().await;
|
||||
let _ = hub.send_to(&fp, &msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
SignalMessage::Ping { timestamp_ms } => {
|
||||
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
|
||||
}
|
||||
|
||||
@@ -454,8 +454,6 @@ async fn connect(
|
||||
}));
|
||||
let room_sni = room.clone();
|
||||
let call_sni = format!("call-{room}");
|
||||
// Phase 5: pass the signal endpoint so the race
|
||||
// reuses ONE socket for listen + dial + relay.
|
||||
match wzp_client::dual_path::race(
|
||||
r,
|
||||
candidates,
|
||||
@@ -466,20 +464,107 @@ async fn connect(
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((transport, path)) => {
|
||||
tracing::info!(?path, "connect: dual-path race resolved");
|
||||
emit_call_debug(&app, "connect:dual_path_race_won", serde_json::json!({
|
||||
"path": format!("{:?}", path),
|
||||
Ok(race_result) => {
|
||||
let local_direct_ok = race_result.direct_transport.is_some();
|
||||
let local_winner = race_result.local_winner;
|
||||
tracing::info!(
|
||||
?local_winner,
|
||||
local_direct_ok,
|
||||
has_relay = race_result.relay_transport.is_some(),
|
||||
"connect: race finished, starting Phase 6 negotiation"
|
||||
);
|
||||
emit_call_debug(&app, "connect:dual_path_race_done", serde_json::json!({
|
||||
"local_winner": format!("{:?}", local_winner),
|
||||
"local_direct_ok": local_direct_ok,
|
||||
"has_relay": race_result.relay_transport.is_some(),
|
||||
}));
|
||||
Some(transport)
|
||||
|
||||
// Phase 6: send our report to the peer and
|
||||
// wait for theirs before committing. Both
|
||||
// sides must agree on the same path to
|
||||
// prevent the one-picks-Direct-other-picks-
|
||||
// Relay race condition that causes TX>0 RX=0
|
||||
// on both sides.
|
||||
//
|
||||
// Extract call_id from the room name
|
||||
// ("call-<id>" → "<id>").
|
||||
let call_id_for_report = room.strip_prefix("call-")
|
||||
.unwrap_or(&room)
|
||||
.to_string();
|
||||
|
||||
// Install the oneshot for receiving the peer's report
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
|
||||
let peer_direct_ok = {
|
||||
let transport_for_report = {
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_path_report = Some(tx);
|
||||
sig.transport.as_ref().cloned()
|
||||
};
|
||||
// Send our report
|
||||
if let Some(ref t) = transport_for_report {
|
||||
let report = wzp_proto::SignalMessage::MediaPathReport {
|
||||
call_id: call_id_for_report.clone(),
|
||||
direct_ok: local_direct_ok,
|
||||
race_winner: format!("{:?}", local_winner),
|
||||
};
|
||||
let _ = t.send_signal(&report).await;
|
||||
emit_call_debug(&app, "connect:path_report_sent", serde_json::json!({
|
||||
"direct_ok": local_direct_ok,
|
||||
"race_winner": format!("{:?}", local_winner),
|
||||
}));
|
||||
}
|
||||
// Wait for peer's report (3s timeout)
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(3),
|
||||
rx,
|
||||
).await {
|
||||
Ok(Ok(peer_ok)) => {
|
||||
emit_call_debug(&app, "connect:peer_report_received", serde_json::json!({
|
||||
"peer_direct_ok": peer_ok,
|
||||
}));
|
||||
peer_ok
|
||||
}
|
||||
_ => {
|
||||
// Timeout or channel error — peer
|
||||
// may be on an old build without
|
||||
// Phase 6. Fall back to relay.
|
||||
emit_call_debug(&app, "connect:peer_report_timeout", serde_json::json!({}));
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_path_report = None;
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Phase 6 decision: BOTH must agree on direct
|
||||
let use_direct = local_direct_ok && peer_direct_ok;
|
||||
let chosen_path = if use_direct {
|
||||
wzp_client::dual_path::WinningPath::Direct
|
||||
} else {
|
||||
wzp_client::dual_path::WinningPath::Relay
|
||||
};
|
||||
emit_call_debug(&app, "connect:path_negotiated", serde_json::json!({
|
||||
"use_direct": use_direct,
|
||||
"local_direct_ok": local_direct_ok,
|
||||
"peer_direct_ok": peer_direct_ok,
|
||||
"chosen_path": format!("{:?}", chosen_path),
|
||||
}));
|
||||
tracing::info!(
|
||||
?chosen_path,
|
||||
use_direct,
|
||||
local_direct_ok,
|
||||
peer_direct_ok,
|
||||
"connect: Phase 6 path agreed"
|
||||
);
|
||||
|
||||
// Pick the agreed transport
|
||||
if use_direct {
|
||||
race_result.direct_transport
|
||||
} else {
|
||||
race_result.relay_transport
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Both paths failed — surface to the user.
|
||||
// CallEngine::start below with None will try
|
||||
// the relay once more using the old code path
|
||||
// (which reuses the signal endpoint and has a
|
||||
// longer timeout) so we don't unconditionally
|
||||
// fail the call on a transient race blip.
|
||||
tracing::warn!(error = %e, "connect: dual-path race failed, falling back to classic relay connect");
|
||||
emit_call_debug(&app, "connect:dual_path_race_failed", serde_json::json!({
|
||||
"error": e.to_string(),
|
||||
@@ -744,6 +829,11 @@ struct SignalState {
|
||||
/// connection. Prevents duplicate supervisors from spawning
|
||||
/// (recv loop exit races with a manual register_signal call).
|
||||
reconnect_in_progress: bool,
|
||||
/// Phase 6: pending MediaPathReport from the peer. When the
|
||||
/// connect command sends its own report and waits for the
|
||||
/// peer's, it installs a oneshot sender here. The recv loop
|
||||
/// fires it when MediaPathReport arrives.
|
||||
pending_path_report: Option<tokio::sync::oneshot::Sender<bool>>,
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -981,6 +1071,27 @@ fn do_register_signal(
|
||||
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None;
|
||||
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
|
||||
}
|
||||
Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => {
|
||||
// Phase 6: the peer is telling us whether
|
||||
// their direct path succeeded. Fire the
|
||||
// pending oneshot so the connect command can
|
||||
// make the agreed decision.
|
||||
tracing::info!(
|
||||
%call_id,
|
||||
direct_ok,
|
||||
%race_winner,
|
||||
"signal: MediaPathReport from peer"
|
||||
);
|
||||
emit_call_debug(&app_clone, "recv:MediaPathReport", serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"peer_direct_ok": direct_ok,
|
||||
"peer_race_winner": race_winner,
|
||||
}));
|
||||
let mut sig = signal_state.lock().await;
|
||||
if let Some(tx) = sig.pending_path_report.take() {
|
||||
let _ = tx.send(direct_ok);
|
||||
}
|
||||
}
|
||||
Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => {
|
||||
// "STUN for QUIC" response — the relay told us our
|
||||
// own server-reflexive address. If a Tauri command
|
||||
@@ -1665,6 +1776,7 @@ pub fn run() {
|
||||
own_reflex_addr: None,
|
||||
desired_relay_addr: None,
|
||||
reconnect_in_progress: false,
|
||||
pending_path_report: None,
|
||||
})),
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user