From 29cd23fe39f065f0ed78e7a51a8da0a6448954d3 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sun, 12 Apr 2026 15:11:50 +0400 Subject: [PATCH] =?UTF-8?q?fix(p2p):=20connection=20cleanup=20=E2=80=94=20?= =?UTF-8?q?4=20fixes=20for=20stale/dead=20connections?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PRD 4: Disable IPv6 direct dial/accept temporarily. IPv6 QUIC handshakes succeed but connections die immediately on datagram send ("connection lost"). IPv4 candidates work reliably. IPv6 candidates still gathered but filtered at dial time. PRD 1: Close losing transport after Phase 6 negotiation. The non-selected transport now gets an explicit QUIC close frame instead of silently dropping after 30s idle timeout. Prevents phantom connections from polluting future accept() calls. PRD 2: Harden accept loop with max 3 stale retries. Stale connections are explicitly closed (conn.close) and counted. After 3 stale connections, the accept loop aborts instead of spinning until the race timeout. PRD 3: Resource cleanup — close old IPv6 endpoint before creating a new one in place_call/answer_call. Add Drop impl to CallEngine so tasks are signalled to stop on ungraceful shutdown. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/dual_path.rs | 82 ++++++++++++++---------------- desktop/src-tauri/src/engine.rs | 9 ++++ desktop/src-tauri/src/lib.rs | 24 ++++++++- 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/crates/wzp-client/src/dual_path.rs b/crates/wzp-client/src/dual_path.rs index ab3a40a..4ea5cb6 100644 --- a/crates/wzp-client/src/dual_path.rs +++ b/crates/wzp-client/src/dual_path.rs @@ -192,43 +192,38 @@ pub async fn race( } }; let ep_for_fut = ep.clone(); - let v6_ep_for_accept = ipv6_endpoint.clone(); + // Phase 7: IPv6 accept temporarily disabled (same reason + // as dial — IPv6 connections die on datagram send). + // Accept on IPv4 shared endpoint only. + let _v6_ep_unused = ipv6_endpoint.clone(); direct_fut = Box::pin(async move { // Accept loop: retry if we get a stale/closed - // connection from a previous call. Between rapid - // successive calls, quinn's accept queue may - // contain connections that the peer has already - // dropped. Verify the connection is alive via - // max_datagram_size() before returning it. + // connection from a previous call. Max 3 retries + // to avoid spinning until the race timeout. + const MAX_STALE: usize = 3; + let mut stale_count: usize = 0; loop { - let conn = match &v6_ep_for_accept { - Some(v6_ep) => { - tokio::select! { - v4 = wzp_transport::accept(&ep_for_fut) => { - v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))? - } - v6 = wzp_transport::accept(v6_ep) => { - v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))? - } - } - } - None => { - wzp_transport::accept(&ep_for_fut) - .await - .map_err(|e| anyhow::anyhow!("direct accept: {e}"))? - } - }; + let conn = wzp_transport::accept(&ep_for_fut) + .await + .map_err(|e| anyhow::anyhow!("direct accept: {e}"))?; - // Validate the connection is alive. A stale - // connection from a previous call will report - // close_reason = Some(...) immediately. if let Some(reason) = conn.close_reason() { + // Explicitly close so the peer gets a + // close frame instead of idle timeout. + conn.close(0u32.into(), b"stale"); + stale_count += 1; tracing::warn!( remote = %conn.remote_address(), stable_id = conn.stable_id(), + stale_count, ?reason, - "dual_path: A-role skipping stale connection, re-accepting" + "dual_path: A-role skipping stale connection" ); + if stale_count >= MAX_STALE { + return Err(anyhow::anyhow!( + "A-role: {stale_count} stale connections, aborting" + )); + } continue; } @@ -274,7 +269,7 @@ pub async fn race( } }; let ep_for_fut = ep.clone(); - let v6_ep_for_dial = ipv6_endpoint.clone(); + let _v6_ep_for_dial = ipv6_endpoint.clone(); let dial_order = peer_candidates.dial_order(); let sni = call_sni.clone(); direct_fut = Box::pin(async move { @@ -297,21 +292,22 @@ pub async fn race( // Phase 7: route each candidate to the // endpoint matching its address family. let candidate = *candidate; - let ep = if candidate.is_ipv6() { - match &v6_ep_for_dial { - Some(v6) => v6.clone(), - None => { - tracing::debug!( - %candidate, - candidate_idx = idx, - "dual_path: skipping IPv6 candidate, no v6 endpoint" - ); - continue; - } - } - } else { - ep_for_fut.clone() - }; + // Phase 7: IPv6 dials temporarily disabled. + // IPv6 QUIC handshakes succeed but the + // connection dies immediately on datagram + // send ("connection lost"). Root cause is + // likely router-level IPv6 UDP filtering. + // Re-enable once IPv6 datagram delivery is + // verified on target networks. + if candidate.is_ipv6() { + tracing::debug!( + %candidate, + candidate_idx = idx, + "dual_path: skipping IPv6 candidate (disabled)" + ); + continue; + } + let ep = ep_for_fut.clone(); let client_cfg = wzp_transport::client_config(); let sni = sni.clone(); set.spawn(async move { diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 33712a7..4cd18fe 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -1493,3 +1493,12 @@ impl CallEngine { } } } + +impl Drop for CallEngine { + fn drop(&mut self) { + // Safety net: if stop() was never called (crash, app + // backgrounding), signal tasks to exit so they don't + // spin on a dropped transport. + self.running.store(false, Ordering::SeqCst); + } +} diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 4b2c86d..faf3bc4 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -544,8 +544,21 @@ async fn connect( // it for participant authentication. is_direct_p2p_agreed = use_direct; if use_direct { + // Close losing relay transport so the + // relay sees a clean disconnect instead + // of waiting 30s for idle timeout. + if let Some(loser) = race_result.relay_transport.as_ref() { + loser.connection().close(0u32.into(), b"not-selected"); + } race_result.direct_transport } else { + // Close losing direct transport so the + // peer's endpoint doesn't retain a + // phantom connection that pollutes + // future accept() calls. + if let Some(loser) = race_result.direct_transport.as_ref() { + loser.connection().close(0u32.into(), b"not-selected"); + } race_result.relay_transport } } @@ -1358,7 +1371,11 @@ async fn place_call( .map(|la| la.port()) .unwrap_or(0); - // Phase 7: create IPv6 endpoint, trying same port as v4 + // Phase 7: create IPv6 endpoint, trying same port as v4. + // Close any leftover from a previous call first. + if let Some(old) = sig.ipv6_endpoint.take() { + old.close(0u32.into(), b"new-call"); + } let (sc, _) = wzp_transport::server_config(); let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok(); let v6_port = v6_ep.as_ref() @@ -1477,7 +1494,10 @@ async fn answer_call( .map(|la| la.port()) .unwrap_or(0); - // Phase 7: create IPv6 endpoint + // Phase 7: create IPv6 endpoint. Close leftover first. + if let Some(old) = sig.ipv6_endpoint.take() { + old.close(0u32.into(), b"new-call"); + } let (sc, _) = wzp_transport::server_config(); let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok(); let v6_port = v6_ep.as_ref()