diff --git a/crates/wzp-client/src/dual_path.rs b/crates/wzp-client/src/dual_path.rs index e8478ac..ab3a40a 100644 --- a/crates/wzp-client/src/dual_path.rs +++ b/crates/wzp-client/src/dual_path.rs @@ -194,43 +194,53 @@ pub async fn race( let ep_for_fut = ep.clone(); let v6_ep_for_accept = ipv6_endpoint.clone(); direct_fut = Box::pin(async move { - // Phase 7: accept on both IPv4 and IPv6 endpoints. - // First incoming connection on either wins. - match v6_ep_for_accept { - Some(v6_ep) => { - tracing::debug!("dual_path: A-role accepting on both v4 + v6 endpoints"); - tokio::select! { - v4 = wzp_transport::accept(&ep_for_fut) => { - let conn = v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?; - tracing::info!( - remote = %conn.remote_address(), - stable_id = conn.stable_id(), - "dual_path: A-role accepted on IPv4 endpoint" - ); - Ok(QuinnTransport::new(conn)) - } - v6 = wzp_transport::accept(&v6_ep) => { - let conn = v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?; - tracing::info!( - remote = %conn.remote_address(), - stable_id = conn.stable_id(), - "dual_path: A-role accepted on IPv6 endpoint" - ); - Ok(QuinnTransport::new(conn)) + // Accept loop: retry if we get a stale/closed + // connection from a previous call. Between rapid + // successive calls, quinn's accept queue may + // contain connections that the peer has already + // dropped. Verify the connection is alive via + // max_datagram_size() before returning it. + loop { + let conn = match &v6_ep_for_accept { + Some(v6_ep) => { + tokio::select! { + v4 = wzp_transport::accept(&ep_for_fut) => { + v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))? + } + v6 = wzp_transport::accept(v6_ep) => { + v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))? + } } } - } - None => { - let conn = wzp_transport::accept(&ep_for_fut) - .await - .map_err(|e| anyhow::anyhow!("direct accept: {e}"))?; - tracing::info!( + None => { + wzp_transport::accept(&ep_for_fut) + .await + .map_err(|e| anyhow::anyhow!("direct accept: {e}"))? + } + }; + + // Validate the connection is alive. A stale + // connection from a previous call will report + // close_reason = Some(...) immediately. + if let Some(reason) = conn.close_reason() { + tracing::warn!( remote = %conn.remote_address(), stable_id = conn.stable_id(), - "dual_path: A-role accepted (v4-only)" + ?reason, + "dual_path: A-role skipping stale connection, re-accepting" ); - Ok(QuinnTransport::new(conn)) + continue; } + + let has_dgram = conn.max_datagram_size().is_some(); + tracing::info!( + remote = %conn.remote_address(), + stable_id = conn.stable_id(), + has_dgram, + "dual_path: A-role accepted direct connection" + ); + + break Ok(QuinnTransport::new(conn)); } }); direct_ep = ep; diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 02fc7ce..911f793 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -588,8 +588,17 @@ async fn connect( } let app_clone = app.clone(); + // Log transport details for debugging direct P2P media issues + let transport_info = pre_connected_transport.as_ref().map(|t| { + serde_json::json!({ + "remote": t.remote_address().to_string(), + "max_datagram": t.max_datagram_size(), + "close_reason": t.connection().close_reason().map(|r| format!("{r:?}")), + }) + }); emit_call_debug(&app, "connect:call_engine_starting", serde_json::json!({ "is_direct_p2p": is_direct_p2p_agreed, + "transport": transport_info, })); let app_for_engine = app.clone(); match CallEngine::start(relay, room, alias, os_aec, quality, reuse_endpoint, pre_connected_transport, is_direct_p2p_agreed, app_for_engine, move |event_kind, message| {