From 2d58bae9ba2e76b9b623fbcc766c2dbd7a20345f Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 25 May 2026 20:42:24 +0400 Subject: [PATCH] chore(relay): log video forwarding decisions in debug tap --- crates/wzp-relay/src/room.rs | 139 +++++++++++++++++++++++++++++++---- 1 file changed, 126 insertions(+), 13 deletions(-) diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 4afcb55..7e5b662 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -51,9 +51,13 @@ impl DebugTap { dir = dir, addr = %addr, seq = h.seq, + media = ?h.media_type, codec = ?h.codec_id, + stream_id = h.stream_id, ts = h.timestamp, fec_block = h.fec_block, + keyframe = h.is_keyframe(), + frame_end = h.is_frame_end(), repair = h.is_repair(), len = pkt.payload.len(), fan_out, @@ -61,6 +65,35 @@ impl DebugTap { ); } + pub fn log_video_route( + &self, + room: &str, + addr: &std::net::SocketAddr, + peer_id: ParticipantId, + pkt: &wzp_proto::MediaPacket, + selected_layer: u8, + forwarded: bool, + reason: &str, + ) { + let h = &pkt.header; + info!( + target: "debug_tap", + room = %room, + addr = %addr, + peer_id, + seq = h.seq, + stream_id = h.stream_id, + selected_layer, + codec = ?h.codec_id, + keyframe = h.is_keyframe(), + frame_end = h.is_frame_end(), + len = pkt.payload.len(), + forwarded, + reason, + "TAP VIDEO ROUTE" + ); + } + pub fn log_signal(&self, room: &str, signal: &wzp_proto::SignalMessage) { match signal { wzp_proto::SignalMessage::RoomUpdate { @@ -1142,6 +1175,7 @@ pub async fn run_participant( transport, metrics, session_id, + debug_tap, is_authenticated, ) .await; @@ -1324,20 +1358,11 @@ async fn run_participant_plain( broadcast_signal(&all_senders, &directive).await; } - // Debug tap: log packet metadata + record stats - if let Some(ref tap) = debug_tap { - if tap.matches(&room_name) { - tap.log_packet(&room_name, "in", &addr, &pkt, others.len()); - } - } - if let Some(ref mut ts) = tap_stats { - ts.record_in(&pkt, others.len()); - } - // Forward to all others, applying simulcast layer selection for video. let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; let is_video = pkt.header.media_type == wzp_proto::MediaType::Video; + let mut actual_fan_out = 0usize; for (other_id, other) in &others { // Simulcast layer selection (T5.6): video packets are filtered // by the receiver's selected layer. Audio and non-simulcast @@ -1345,8 +1370,34 @@ async fn run_participant_plain( if is_video { let selected = room_mgr.selected_layer(&room_name, *other_id); if pkt.header.stream_id != selected { + if let Some(ref tap) = debug_tap { + if tap.matches(&room_name) { + tap.log_video_route( + &room_name, + &addr, + *other_id, + &pkt, + selected, + false, + "simulcast_layer_mismatch", + ); + } + } continue; } + if let Some(ref tap) = debug_tap { + if tap.matches(&room_name) { + tap.log_video_route( + &room_name, + &addr, + *other_id, + &pkt, + selected, + true, + "selected_layer", + ); + } + } } match other { ParticipantSender::Quic(t) => { @@ -1361,14 +1412,28 @@ async fn run_participant_plain( "send_media error: {e}" ); } + } else { + actual_fan_out += 1; } } ParticipantSender::WebSocket(_) => { let _ = other.send_raw(&pkt.payload).await; + actual_fan_out += 1; } } } + // Debug tap: log packet metadata + record stats after forwarding so + // fan_out reflects actual sends after video layer filtering. + if let Some(ref tap) = debug_tap { + if tap.matches(&room_name) { + tap.log_packet(&room_name, "in", &addr, &pkt, actual_fan_out); + } + } + if let Some(ref mut ts) = tap_stats { + ts.record_in(&pkt, actual_fan_out); + } + // Federation: forward to active peer relays via channel if let Some(ref fed_tx) = federation_tx { let data = pkt.to_bytes(); @@ -1394,7 +1459,7 @@ async fn run_participant_plain( ); } - let fan_out = others.len() as u64; + let fan_out = actual_fan_out as u64; metrics.packets_forwarded.inc_by(fan_out); metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); packets_forwarded += 1; @@ -1457,6 +1522,7 @@ async fn run_participant_trunked( transport: Arc, metrics: Arc, session_id: String, + debug_tap: Option, _is_authenticated: bool, ) { use std::collections::HashMap; @@ -1472,6 +1538,11 @@ async fn run_participant_trunked( ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session()); let mut video_scorer_trunked = VideoScorer::new(); let mut last_bwe_kbps_trunked: Option = None; + let mut tap_stats = if debug_tap.as_ref().map_or(false, |t| t.matches(&room_name)) { + Some(TapStats::new()) + } else { + None + }; info!( room = %room_name, @@ -1605,12 +1676,39 @@ async fn run_participant_trunked( let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; let is_video = pkt.header.media_type == wzp_proto::MediaType::Video; + let mut actual_fan_out = 0usize; for (other_id, other) in &others { if is_video { let selected = room_mgr.selected_layer(&room_name, *other_id); if pkt.header.stream_id != selected { + if let Some(ref tap) = debug_tap { + if tap.matches(&room_name) { + tap.log_video_route( + &room_name, + &addr, + *other_id, + &pkt, + selected, + false, + "simulcast_layer_mismatch", + ); + } + } continue; } + if let Some(ref tap) = debug_tap { + if tap.matches(&room_name) { + tap.log_video_route( + &room_name, + &addr, + *other_id, + &pkt, + selected, + true, + "selected_layer", + ); + } + } } match other { ParticipantSender::Quic(t) => { @@ -1629,13 +1727,24 @@ async fn run_participant_trunked( "trunked send error: {e}" ); } + } else { + actual_fan_out += 1; } } ParticipantSender::WebSocket(_) => { let _ = other.send_raw(&pkt.payload).await; + actual_fan_out += 1; } } } + if let Some(ref tap) = debug_tap { + if tap.matches(&room_name) { + tap.log_packet(&room_name, "in", &addr, &pkt, actual_fan_out); + } + } + if let Some(ref mut ts) = tap_stats { + ts.record_in(&pkt, actual_fan_out); + } let fwd_ms = fwd_start.elapsed().as_millis() as u64; if fwd_ms > max_forward_ms { max_forward_ms = fwd_ms; @@ -1645,12 +1754,12 @@ async fn run_participant_trunked( room = %room_name, participant = participant_id, fwd_ms, - fan_out = others.len(), + fan_out = actual_fan_out, "slow forward (trunked)" ); } - let fan_out = others.len() as u64; + let fan_out = actual_fan_out as u64; metrics.packets_forwarded.inc_by(fan_out); metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); packets_forwarded += 1; @@ -1669,6 +1778,10 @@ async fn run_participant_trunked( send_errors, "participant stats (trunked)" ); + if let (Some(tap), Some(ts)) = (&debug_tap, &mut tap_stats) { + tap.log_stats(&room_name, ts); + ts.reset_period(); + } max_recv_gap_ms = 0; max_forward_ms = 0; last_log_instant = std::time::Instant::now();