From 1904b19d058a625d41b6608bc1d6fceacbff17d4 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sun, 12 Apr 2026 13:50:21 +0400 Subject: [PATCH] fix(direct): validate A-role accepted connection, skip stale ones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Acceptor's accept() on the shared signal endpoint can dequeue a stale QUIC connection from a previous call that the Dialer has already dropped. This results in "connection lost" errors when media datagrams are sent — 100% drops on both sides. Fix: after accepting a connection, check close_reason(). If the connection is already closed, log a warning and re-accept. Also verify max_datagram_size() is available before returning. Additionally: emit transport details (remote addr, max_datagram, close_reason) in the call_engine_starting debug event so stale connection issues are visible in the user-facing debug log. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/dual_path.rs | 72 +++++++++++++++++------------- desktop/src-tauri/src/lib.rs | 9 ++++ 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/crates/wzp-client/src/dual_path.rs b/crates/wzp-client/src/dual_path.rs index e8478ac..ab3a40a 100644 --- a/crates/wzp-client/src/dual_path.rs +++ b/crates/wzp-client/src/dual_path.rs @@ -194,43 +194,53 @@ pub async fn race( let ep_for_fut = ep.clone(); let v6_ep_for_accept = ipv6_endpoint.clone(); direct_fut = Box::pin(async move { - // Phase 7: accept on both IPv4 and IPv6 endpoints. - // First incoming connection on either wins. - match v6_ep_for_accept { - Some(v6_ep) => { - tracing::debug!("dual_path: A-role accepting on both v4 + v6 endpoints"); - tokio::select! { - v4 = wzp_transport::accept(&ep_for_fut) => { - let conn = v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?; - tracing::info!( - remote = %conn.remote_address(), - stable_id = conn.stable_id(), - "dual_path: A-role accepted on IPv4 endpoint" - ); - Ok(QuinnTransport::new(conn)) - } - v6 = wzp_transport::accept(&v6_ep) => { - let conn = v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?; - tracing::info!( - remote = %conn.remote_address(), - stable_id = conn.stable_id(), - "dual_path: A-role accepted on IPv6 endpoint" - ); - Ok(QuinnTransport::new(conn)) + // Accept loop: retry if we get a stale/closed + // connection from a previous call. Between rapid + // successive calls, quinn's accept queue may + // contain connections that the peer has already + // dropped. Verify the connection is alive via + // max_datagram_size() before returning it. + loop { + let conn = match &v6_ep_for_accept { + Some(v6_ep) => { + tokio::select! { + v4 = wzp_transport::accept(&ep_for_fut) => { + v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))? + } + v6 = wzp_transport::accept(v6_ep) => { + v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))? + } } } - } - None => { - let conn = wzp_transport::accept(&ep_for_fut) - .await - .map_err(|e| anyhow::anyhow!("direct accept: {e}"))?; - tracing::info!( + None => { + wzp_transport::accept(&ep_for_fut) + .await + .map_err(|e| anyhow::anyhow!("direct accept: {e}"))? + } + }; + + // Validate the connection is alive. A stale + // connection from a previous call will report + // close_reason = Some(...) immediately. + if let Some(reason) = conn.close_reason() { + tracing::warn!( remote = %conn.remote_address(), stable_id = conn.stable_id(), - "dual_path: A-role accepted (v4-only)" + ?reason, + "dual_path: A-role skipping stale connection, re-accepting" ); - Ok(QuinnTransport::new(conn)) + continue; } + + let has_dgram = conn.max_datagram_size().is_some(); + tracing::info!( + remote = %conn.remote_address(), + stable_id = conn.stable_id(), + has_dgram, + "dual_path: A-role accepted direct connection" + ); + + break Ok(QuinnTransport::new(conn)); } }); direct_ep = ep; diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 02fc7ce..911f793 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -588,8 +588,17 @@ async fn connect( } let app_clone = app.clone(); + // Log transport details for debugging direct P2P media issues + let transport_info = pre_connected_transport.as_ref().map(|t| { + serde_json::json!({ + "remote": t.remote_address().to_string(), + "max_datagram": t.max_datagram_size(), + "close_reason": t.connection().close_reason().map(|r| format!("{r:?}")), + }) + }); emit_call_debug(&app, "connect:call_engine_starting", serde_json::json!({ "is_direct_p2p": is_direct_p2p_agreed, + "transport": transport_info, })); let app_for_engine = app.clone(); match CallEngine::start(relay, room, alias, os_aec, quality, reuse_endpoint, pre_connected_transport, is_direct_p2p_agreed, app_for_engine, move |event_kind, message| {