fix(direct): validate A-role accepted connection, skip stale ones
The Acceptor's accept() on the shared signal endpoint can dequeue a stale QUIC connection from a previous call that the Dialer has already dropped. This results in "connection lost" errors when media datagrams are sent — 100% drops on both sides. Fix: after accepting a connection, check close_reason(). If the connection is already closed, log a warning and re-accept. Also verify max_datagram_size() is available before returning. Additionally: emit transport details (remote addr, max_datagram, close_reason) in the call_engine_starting debug event so stale connection issues are visible in the user-facing debug log. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -194,43 +194,53 @@ pub async fn race(
|
|||||||
let ep_for_fut = ep.clone();
|
let ep_for_fut = ep.clone();
|
||||||
let v6_ep_for_accept = ipv6_endpoint.clone();
|
let v6_ep_for_accept = ipv6_endpoint.clone();
|
||||||
direct_fut = Box::pin(async move {
|
direct_fut = Box::pin(async move {
|
||||||
// Phase 7: accept on both IPv4 and IPv6 endpoints.
|
// Accept loop: retry if we get a stale/closed
|
||||||
// First incoming connection on either wins.
|
// connection from a previous call. Between rapid
|
||||||
match v6_ep_for_accept {
|
// successive calls, quinn's accept queue may
|
||||||
Some(v6_ep) => {
|
// contain connections that the peer has already
|
||||||
tracing::debug!("dual_path: A-role accepting on both v4 + v6 endpoints");
|
// dropped. Verify the connection is alive via
|
||||||
tokio::select! {
|
// max_datagram_size() before returning it.
|
||||||
v4 = wzp_transport::accept(&ep_for_fut) => {
|
loop {
|
||||||
let conn = v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?;
|
let conn = match &v6_ep_for_accept {
|
||||||
tracing::info!(
|
Some(v6_ep) => {
|
||||||
remote = %conn.remote_address(),
|
tokio::select! {
|
||||||
stable_id = conn.stable_id(),
|
v4 = wzp_transport::accept(&ep_for_fut) => {
|
||||||
"dual_path: A-role accepted on IPv4 endpoint"
|
v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?
|
||||||
);
|
}
|
||||||
Ok(QuinnTransport::new(conn))
|
v6 = wzp_transport::accept(v6_ep) => {
|
||||||
}
|
v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?
|
||||||
v6 = wzp_transport::accept(&v6_ep) => {
|
}
|
||||||
let conn = v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?;
|
|
||||||
tracing::info!(
|
|
||||||
remote = %conn.remote_address(),
|
|
||||||
stable_id = conn.stable_id(),
|
|
||||||
"dual_path: A-role accepted on IPv6 endpoint"
|
|
||||||
);
|
|
||||||
Ok(QuinnTransport::new(conn))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
None => {
|
||||||
None => {
|
wzp_transport::accept(&ep_for_fut)
|
||||||
let conn = wzp_transport::accept(&ep_for_fut)
|
.await
|
||||||
.await
|
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?
|
||||||
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
|
}
|
||||||
tracing::info!(
|
};
|
||||||
|
|
||||||
|
// Validate the connection is alive. A stale
|
||||||
|
// connection from a previous call will report
|
||||||
|
// close_reason = Some(...) immediately.
|
||||||
|
if let Some(reason) = conn.close_reason() {
|
||||||
|
tracing::warn!(
|
||||||
remote = %conn.remote_address(),
|
remote = %conn.remote_address(),
|
||||||
stable_id = conn.stable_id(),
|
stable_id = conn.stable_id(),
|
||||||
"dual_path: A-role accepted (v4-only)"
|
?reason,
|
||||||
|
"dual_path: A-role skipping stale connection, re-accepting"
|
||||||
);
|
);
|
||||||
Ok(QuinnTransport::new(conn))
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let has_dgram = conn.max_datagram_size().is_some();
|
||||||
|
tracing::info!(
|
||||||
|
remote = %conn.remote_address(),
|
||||||
|
stable_id = conn.stable_id(),
|
||||||
|
has_dgram,
|
||||||
|
"dual_path: A-role accepted direct connection"
|
||||||
|
);
|
||||||
|
|
||||||
|
break Ok(QuinnTransport::new(conn));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
direct_ep = ep;
|
direct_ep = ep;
|
||||||
|
|||||||
@@ -588,8 +588,17 @@ async fn connect(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let app_clone = app.clone();
|
let app_clone = app.clone();
|
||||||
|
// Log transport details for debugging direct P2P media issues
|
||||||
|
let transport_info = pre_connected_transport.as_ref().map(|t| {
|
||||||
|
serde_json::json!({
|
||||||
|
"remote": t.remote_address().to_string(),
|
||||||
|
"max_datagram": t.max_datagram_size(),
|
||||||
|
"close_reason": t.connection().close_reason().map(|r| format!("{r:?}")),
|
||||||
|
})
|
||||||
|
});
|
||||||
emit_call_debug(&app, "connect:call_engine_starting", serde_json::json!({
|
emit_call_debug(&app, "connect:call_engine_starting", serde_json::json!({
|
||||||
"is_direct_p2p": is_direct_p2p_agreed,
|
"is_direct_p2p": is_direct_p2p_agreed,
|
||||||
|
"transport": transport_info,
|
||||||
}));
|
}));
|
||||||
let app_for_engine = app.clone();
|
let app_for_engine = app.clone();
|
||||||
match CallEngine::start(relay, room, alias, os_aec, quality, reuse_endpoint, pre_connected_transport, is_direct_p2p_agreed, app_for_engine, move |event_kind, message| {
|
match CallEngine::start(relay, room, alias, os_aec, quality, reuse_endpoint, pre_connected_transport, is_direct_p2p_agreed, app_for_engine, move |event_kind, message| {
|
||||||
|
|||||||
Reference in New Issue
Block a user