debug(media): add connection diagnostics for direct P2P drops
When direct P2P calls show 100% datagram drops, we need to know
WHY send_media() fails. This commit adds:
- Remote address + stable_id logging on A-role accept and D-role
dial success (dual_path.rs) — tells us which candidate won
- Remote address + max_datagram_size on engine transport init —
verifies datagrams are negotiated
- last_send_err in send heartbeat — captures the actual error
from send_datagram() failures
- QuinnTransport::remote_address() helper
Also fixes UI badge: was looking for wrong event name
("dual_path_race_won" → "path_negotiated").
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -202,12 +202,20 @@ pub async fn race(
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
v4 = wzp_transport::accept(&ep_for_fut) => {
|
v4 = wzp_transport::accept(&ep_for_fut) => {
|
||||||
let conn = v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?;
|
let conn = v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?;
|
||||||
tracing::info!("dual_path: A-role accepted on IPv4 endpoint");
|
tracing::info!(
|
||||||
|
remote = %conn.remote_address(),
|
||||||
|
stable_id = conn.stable_id(),
|
||||||
|
"dual_path: A-role accepted on IPv4 endpoint"
|
||||||
|
);
|
||||||
Ok(QuinnTransport::new(conn))
|
Ok(QuinnTransport::new(conn))
|
||||||
}
|
}
|
||||||
v6 = wzp_transport::accept(&v6_ep) => {
|
v6 = wzp_transport::accept(&v6_ep) => {
|
||||||
let conn = v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?;
|
let conn = v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?;
|
||||||
tracing::info!("dual_path: A-role accepted on IPv6 endpoint");
|
tracing::info!(
|
||||||
|
remote = %conn.remote_address(),
|
||||||
|
stable_id = conn.stable_id(),
|
||||||
|
"dual_path: A-role accepted on IPv6 endpoint"
|
||||||
|
);
|
||||||
Ok(QuinnTransport::new(conn))
|
Ok(QuinnTransport::new(conn))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -216,6 +224,11 @@ pub async fn race(
|
|||||||
let conn = wzp_transport::accept(&ep_for_fut)
|
let conn = wzp_transport::accept(&ep_for_fut)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
|
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
|
||||||
|
tracing::info!(
|
||||||
|
remote = %conn.remote_address(),
|
||||||
|
stable_id = conn.stable_id(),
|
||||||
|
"dual_path: A-role accepted (v4-only)"
|
||||||
|
);
|
||||||
Ok(QuinnTransport::new(conn))
|
Ok(QuinnTransport::new(conn))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -316,6 +329,8 @@ pub async fn race(
|
|||||||
tracing::info!(
|
tracing::info!(
|
||||||
%candidate,
|
%candidate,
|
||||||
candidate_idx = idx,
|
candidate_idx = idx,
|
||||||
|
remote = %conn.remote_address(),
|
||||||
|
stable_id = conn.stable_id(),
|
||||||
"dual_path: direct dial succeeded on candidate"
|
"dual_path: direct dial succeeded on candidate"
|
||||||
);
|
);
|
||||||
// Abort the remaining in-flight
|
// Abort the remaining in-flight
|
||||||
|
|||||||
@@ -33,6 +33,11 @@ impl QuinnTransport {
|
|||||||
&self.connection
|
&self.connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remote address of the peer on this connection.
|
||||||
|
pub fn remote_address(&self) -> std::net::SocketAddr {
|
||||||
|
self.connection.remote_address()
|
||||||
|
}
|
||||||
|
|
||||||
/// Send raw bytes as a QUIC datagram (no MediaPacket framing).
|
/// Send raw bytes as a QUIC datagram (no MediaPacket framing).
|
||||||
pub fn send_raw_datagram(&self, data: &[u8]) -> Result<(), TransportError> {
|
pub fn send_raw_datagram(&self, data: &[u8]) -> Result<(), TransportError> {
|
||||||
self.connection
|
self.connection
|
||||||
|
|||||||
@@ -477,6 +477,7 @@ impl CallEngine {
|
|||||||
let send_fs = frames_sent.clone();
|
let send_fs = frames_sent.clone();
|
||||||
let send_level = audio_level.clone();
|
let send_level = audio_level.clone();
|
||||||
let send_drops = Arc::new(AtomicU64::new(0));
|
let send_drops = Arc::new(AtomicU64::new(0));
|
||||||
|
let send_last_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||||
let send_quality = quality.clone();
|
let send_quality = quality.clone();
|
||||||
let send_tx_codec = tx_codec.clone();
|
let send_tx_codec = tx_codec.clone();
|
||||||
let send_t0 = call_t0;
|
let send_t0 = call_t0;
|
||||||
@@ -561,9 +562,14 @@ impl CallEngine {
|
|||||||
last_pkt_bytes = pkt.payload.len();
|
last_pkt_bytes = pkt.payload.len();
|
||||||
if let Err(e) = send_t.send_media(pkt).await {
|
if let Err(e) = send_t.send_media(pkt).await {
|
||||||
send_drops.fetch_add(1, Ordering::Relaxed);
|
send_drops.fetch_add(1, Ordering::Relaxed);
|
||||||
if send_drops.load(Ordering::Relaxed) <= 3 {
|
let count = send_drops.load(Ordering::Relaxed);
|
||||||
|
if count <= 3 {
|
||||||
tracing::warn!("send_media error (dropping packet): {e}");
|
tracing::warn!("send_media error (dropping packet): {e}");
|
||||||
}
|
}
|
||||||
|
// Latch last error for heartbeat
|
||||||
|
if count == 1 {
|
||||||
|
*send_last_err.lock().await = Some(format!("{e}"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let before = send_fs.fetch_add(1, Ordering::Relaxed);
|
let before = send_fs.fetch_add(1, Ordering::Relaxed);
|
||||||
@@ -605,6 +611,7 @@ impl CallEngine {
|
|||||||
// you capture/mic is broken; a live one with
|
// you capture/mic is broken; a live one with
|
||||||
// no peer recv tells you outbound is being
|
// no peer recv tells you outbound is being
|
||||||
// dropped somewhere in the media path.
|
// dropped somewhere in the media path.
|
||||||
|
let err_str = send_last_err.lock().await.clone();
|
||||||
crate::emit_call_debug(
|
crate::emit_call_debug(
|
||||||
&send_app,
|
&send_app,
|
||||||
"media:send_heartbeat",
|
"media:send_heartbeat",
|
||||||
@@ -614,6 +621,7 @@ impl CallEngine {
|
|||||||
"last_pkt_bytes": last_pkt_bytes,
|
"last_pkt_bytes": last_pkt_bytes,
|
||||||
"short_reads": short_reads,
|
"short_reads": short_reads,
|
||||||
"drops": drops,
|
"drops": drops,
|
||||||
|
"last_send_err": err_str,
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
heartbeat = std::time::Instant::now();
|
heartbeat = std::time::Instant::now();
|
||||||
@@ -1105,7 +1113,12 @@ impl CallEngine {
|
|||||||
|
|
||||||
// Transport source: either pre-connected or fresh.
|
// Transport source: either pre-connected or fresh.
|
||||||
let transport = if let Some(t) = pre_connected_transport {
|
let transport = if let Some(t) = pre_connected_transport {
|
||||||
info!(is_direct_p2p, "using pre-connected transport");
|
info!(
|
||||||
|
is_direct_p2p,
|
||||||
|
remote = %t.remote_address(),
|
||||||
|
max_datagram = ?t.max_datagram_size(),
|
||||||
|
"using pre-connected transport"
|
||||||
|
);
|
||||||
t
|
t
|
||||||
} else {
|
} else {
|
||||||
// Connect — reuse the signal endpoint if the direct-call path gave
|
// Connect — reuse the signal endpoint if the direct-call path gave
|
||||||
|
|||||||
Reference in New Issue
Block a user