fix(p2p): connection cleanup — 4 fixes for stale/dead connections
PRD 4: Disable IPv6 direct dial/accept temporarily. IPv6 QUIC
handshakes succeed but connections die immediately on datagram
send ("connection lost"). IPv4 candidates work reliably. IPv6
candidates still gathered but filtered at dial time.
PRD 1: Close losing transport after Phase 6 negotiation. The
non-selected transport now gets an explicit QUIC close frame
instead of silently dropping after 30s idle timeout. Prevents
phantom connections from polluting future accept() calls.
PRD 2: Harden accept loop with max 3 stale retries. Stale
connections are explicitly closed (conn.close) and counted.
After 3 stale connections, the accept loop aborts instead of
spinning until the race timeout.
PRD 3: Resource cleanup — close old IPv6 endpoint before
creating a new one in place_call/answer_call. Add Drop impl
to CallEngine so tasks are signalled to stop on ungraceful
shutdown.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -192,43 +192,38 @@ pub async fn race(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let ep_for_fut = ep.clone();
|
let ep_for_fut = ep.clone();
|
||||||
let v6_ep_for_accept = ipv6_endpoint.clone();
|
// Phase 7: IPv6 accept temporarily disabled (same reason
|
||||||
|
// as dial — IPv6 connections die on datagram send).
|
||||||
|
// Accept on IPv4 shared endpoint only.
|
||||||
|
let _v6_ep_unused = ipv6_endpoint.clone();
|
||||||
direct_fut = Box::pin(async move {
|
direct_fut = Box::pin(async move {
|
||||||
// Accept loop: retry if we get a stale/closed
|
// Accept loop: retry if we get a stale/closed
|
||||||
// connection from a previous call. Between rapid
|
// connection from a previous call. Max 3 retries
|
||||||
// successive calls, quinn's accept queue may
|
// to avoid spinning until the race timeout.
|
||||||
// contain connections that the peer has already
|
const MAX_STALE: usize = 3;
|
||||||
// dropped. Verify the connection is alive via
|
let mut stale_count: usize = 0;
|
||||||
// max_datagram_size() before returning it.
|
|
||||||
loop {
|
loop {
|
||||||
let conn = match &v6_ep_for_accept {
|
let conn = wzp_transport::accept(&ep_for_fut)
|
||||||
Some(v6_ep) => {
|
.await
|
||||||
tokio::select! {
|
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
|
||||||
v4 = wzp_transport::accept(&ep_for_fut) => {
|
|
||||||
v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?
|
|
||||||
}
|
|
||||||
v6 = wzp_transport::accept(v6_ep) => {
|
|
||||||
v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
wzp_transport::accept(&ep_for_fut)
|
|
||||||
.await
|
|
||||||
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Validate the connection is alive. A stale
|
|
||||||
// connection from a previous call will report
|
|
||||||
// close_reason = Some(...) immediately.
|
|
||||||
if let Some(reason) = conn.close_reason() {
|
if let Some(reason) = conn.close_reason() {
|
||||||
|
// Explicitly close so the peer gets a
|
||||||
|
// close frame instead of idle timeout.
|
||||||
|
conn.close(0u32.into(), b"stale");
|
||||||
|
stale_count += 1;
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
remote = %conn.remote_address(),
|
remote = %conn.remote_address(),
|
||||||
stable_id = conn.stable_id(),
|
stable_id = conn.stable_id(),
|
||||||
|
stale_count,
|
||||||
?reason,
|
?reason,
|
||||||
"dual_path: A-role skipping stale connection, re-accepting"
|
"dual_path: A-role skipping stale connection"
|
||||||
);
|
);
|
||||||
|
if stale_count >= MAX_STALE {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"A-role: {stale_count} stale connections, aborting"
|
||||||
|
));
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -274,7 +269,7 @@ pub async fn race(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let ep_for_fut = ep.clone();
|
let ep_for_fut = ep.clone();
|
||||||
let v6_ep_for_dial = ipv6_endpoint.clone();
|
let _v6_ep_for_dial = ipv6_endpoint.clone();
|
||||||
let dial_order = peer_candidates.dial_order();
|
let dial_order = peer_candidates.dial_order();
|
||||||
let sni = call_sni.clone();
|
let sni = call_sni.clone();
|
||||||
direct_fut = Box::pin(async move {
|
direct_fut = Box::pin(async move {
|
||||||
@@ -297,21 +292,22 @@ pub async fn race(
|
|||||||
// Phase 7: route each candidate to the
|
// Phase 7: route each candidate to the
|
||||||
// endpoint matching its address family.
|
// endpoint matching its address family.
|
||||||
let candidate = *candidate;
|
let candidate = *candidate;
|
||||||
let ep = if candidate.is_ipv6() {
|
// Phase 7: IPv6 dials temporarily disabled.
|
||||||
match &v6_ep_for_dial {
|
// IPv6 QUIC handshakes succeed but the
|
||||||
Some(v6) => v6.clone(),
|
// connection dies immediately on datagram
|
||||||
None => {
|
// send ("connection lost"). Root cause is
|
||||||
tracing::debug!(
|
// likely router-level IPv6 UDP filtering.
|
||||||
%candidate,
|
// Re-enable once IPv6 datagram delivery is
|
||||||
candidate_idx = idx,
|
// verified on target networks.
|
||||||
"dual_path: skipping IPv6 candidate, no v6 endpoint"
|
if candidate.is_ipv6() {
|
||||||
);
|
tracing::debug!(
|
||||||
continue;
|
%candidate,
|
||||||
}
|
candidate_idx = idx,
|
||||||
}
|
"dual_path: skipping IPv6 candidate (disabled)"
|
||||||
} else {
|
);
|
||||||
ep_for_fut.clone()
|
continue;
|
||||||
};
|
}
|
||||||
|
let ep = ep_for_fut.clone();
|
||||||
let client_cfg = wzp_transport::client_config();
|
let client_cfg = wzp_transport::client_config();
|
||||||
let sni = sni.clone();
|
let sni = sni.clone();
|
||||||
set.spawn(async move {
|
set.spawn(async move {
|
||||||
|
|||||||
@@ -1493,3 +1493,12 @@ impl CallEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for CallEngine {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// Safety net: if stop() was never called (crash, app
|
||||||
|
// backgrounding), signal tasks to exit so they don't
|
||||||
|
// spin on a dropped transport.
|
||||||
|
self.running.store(false, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -544,8 +544,21 @@ async fn connect(
|
|||||||
// it for participant authentication.
|
// it for participant authentication.
|
||||||
is_direct_p2p_agreed = use_direct;
|
is_direct_p2p_agreed = use_direct;
|
||||||
if use_direct {
|
if use_direct {
|
||||||
|
// Close losing relay transport so the
|
||||||
|
// relay sees a clean disconnect instead
|
||||||
|
// of waiting 30s for idle timeout.
|
||||||
|
if let Some(loser) = race_result.relay_transport.as_ref() {
|
||||||
|
loser.connection().close(0u32.into(), b"not-selected");
|
||||||
|
}
|
||||||
race_result.direct_transport
|
race_result.direct_transport
|
||||||
} else {
|
} else {
|
||||||
|
// Close losing direct transport so the
|
||||||
|
// peer's endpoint doesn't retain a
|
||||||
|
// phantom connection that pollutes
|
||||||
|
// future accept() calls.
|
||||||
|
if let Some(loser) = race_result.direct_transport.as_ref() {
|
||||||
|
loser.connection().close(0u32.into(), b"not-selected");
|
||||||
|
}
|
||||||
race_result.relay_transport
|
race_result.relay_transport
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1358,7 +1371,11 @@ async fn place_call(
|
|||||||
.map(|la| la.port())
|
.map(|la| la.port())
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
// Phase 7: create IPv6 endpoint, trying same port as v4
|
// Phase 7: create IPv6 endpoint, trying same port as v4.
|
||||||
|
// Close any leftover from a previous call first.
|
||||||
|
if let Some(old) = sig.ipv6_endpoint.take() {
|
||||||
|
old.close(0u32.into(), b"new-call");
|
||||||
|
}
|
||||||
let (sc, _) = wzp_transport::server_config();
|
let (sc, _) = wzp_transport::server_config();
|
||||||
let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok();
|
let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok();
|
||||||
let v6_port = v6_ep.as_ref()
|
let v6_port = v6_ep.as_ref()
|
||||||
@@ -1477,7 +1494,10 @@ async fn answer_call(
|
|||||||
.map(|la| la.port())
|
.map(|la| la.port())
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
// Phase 7: create IPv6 endpoint
|
// Phase 7: create IPv6 endpoint. Close leftover first.
|
||||||
|
if let Some(old) = sig.ipv6_endpoint.take() {
|
||||||
|
old.close(0u32.into(), b"new-call");
|
||||||
|
}
|
||||||
let (sc, _) = wzp_transport::server_config();
|
let (sc, _) = wzp_transport::server_config();
|
||||||
let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok();
|
let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok();
|
||||||
let v6_port = v6_ep.as_ref()
|
let v6_port = v6_ep.as_ref()
|
||||||
|
|||||||
Reference in New Issue
Block a user