chore(relay): log video forwarding decisions in debug tap
Some checks failed
Mirror to GitHub / mirror (push) Failing after 27s
Build Release Binaries / build-amd64 (push) Failing after 3m41s

This commit is contained in:
Siavash Sameni
2026-05-25 20:42:24 +04:00
parent e1ca6ca6e6
commit 2d58bae9ba

View File

@@ -51,9 +51,13 @@ impl DebugTap {
dir = dir,
addr = %addr,
seq = h.seq,
media = ?h.media_type,
codec = ?h.codec_id,
stream_id = h.stream_id,
ts = h.timestamp,
fec_block = h.fec_block,
keyframe = h.is_keyframe(),
frame_end = h.is_frame_end(),
repair = h.is_repair(),
len = pkt.payload.len(),
fan_out,
@@ -61,6 +65,35 @@ impl DebugTap {
);
}
pub fn log_video_route(
&self,
room: &str,
addr: &std::net::SocketAddr,
peer_id: ParticipantId,
pkt: &wzp_proto::MediaPacket,
selected_layer: u8,
forwarded: bool,
reason: &str,
) {
let h = &pkt.header;
info!(
target: "debug_tap",
room = %room,
addr = %addr,
peer_id,
seq = h.seq,
stream_id = h.stream_id,
selected_layer,
codec = ?h.codec_id,
keyframe = h.is_keyframe(),
frame_end = h.is_frame_end(),
len = pkt.payload.len(),
forwarded,
reason,
"TAP VIDEO ROUTE"
);
}
pub fn log_signal(&self, room: &str, signal: &wzp_proto::SignalMessage) {
match signal {
wzp_proto::SignalMessage::RoomUpdate {
@@ -1142,6 +1175,7 @@ pub async fn run_participant(
transport,
metrics,
session_id,
debug_tap,
is_authenticated,
)
.await;
@@ -1324,20 +1358,11 @@ async fn run_participant_plain(
broadcast_signal(&all_senders, &directive).await;
}
// Debug tap: log packet metadata + record stats
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_packet(&room_name, "in", &addr, &pkt, others.len());
}
}
if let Some(ref mut ts) = tap_stats {
ts.record_in(&pkt, others.len());
}
// Forward to all others, applying simulcast layer selection for video.
let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64;
let is_video = pkt.header.media_type == wzp_proto::MediaType::Video;
let mut actual_fan_out = 0usize;
for (other_id, other) in &others {
// Simulcast layer selection (T5.6): video packets are filtered
// by the receiver's selected layer. Audio and non-simulcast
@@ -1345,8 +1370,34 @@ async fn run_participant_plain(
if is_video {
let selected = room_mgr.selected_layer(&room_name, *other_id);
if pkt.header.stream_id != selected {
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_video_route(
&room_name,
&addr,
*other_id,
&pkt,
selected,
false,
"simulcast_layer_mismatch",
);
}
}
continue;
}
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_video_route(
&room_name,
&addr,
*other_id,
&pkt,
selected,
true,
"selected_layer",
);
}
}
}
match other {
ParticipantSender::Quic(t) => {
@@ -1361,14 +1412,28 @@ async fn run_participant_plain(
"send_media error: {e}"
);
}
} else {
actual_fan_out += 1;
}
}
ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await;
actual_fan_out += 1;
}
}
}
// Debug tap: log packet metadata + record stats after forwarding so
// fan_out reflects actual sends after video layer filtering.
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_packet(&room_name, "in", &addr, &pkt, actual_fan_out);
}
}
if let Some(ref mut ts) = tap_stats {
ts.record_in(&pkt, actual_fan_out);
}
// Federation: forward to active peer relays via channel
if let Some(ref fed_tx) = federation_tx {
let data = pkt.to_bytes();
@@ -1394,7 +1459,7 @@ async fn run_participant_plain(
);
}
let fan_out = others.len() as u64;
let fan_out = actual_fan_out as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
@@ -1457,6 +1522,7 @@ async fn run_participant_trunked(
transport: Arc<wzp_transport::QuinnTransport>,
metrics: Arc<RelayMetrics>,
session_id: String,
debug_tap: Option<DebugTap>,
_is_authenticated: bool,
) {
use std::collections::HashMap;
@@ -1472,6 +1538,11 @@ async fn run_participant_trunked(
ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session());
let mut video_scorer_trunked = VideoScorer::new();
let mut last_bwe_kbps_trunked: Option<u32> = None;
let mut tap_stats = if debug_tap.as_ref().map_or(false, |t| t.matches(&room_name)) {
Some(TapStats::new())
} else {
None
};
info!(
room = %room_name,
@@ -1605,12 +1676,39 @@ async fn run_participant_trunked(
let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64;
let is_video = pkt.header.media_type == wzp_proto::MediaType::Video;
let mut actual_fan_out = 0usize;
for (other_id, other) in &others {
if is_video {
let selected = room_mgr.selected_layer(&room_name, *other_id);
if pkt.header.stream_id != selected {
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_video_route(
&room_name,
&addr,
*other_id,
&pkt,
selected,
false,
"simulcast_layer_mismatch",
);
}
}
continue;
}
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_video_route(
&room_name,
&addr,
*other_id,
&pkt,
selected,
true,
"selected_layer",
);
}
}
}
match other {
ParticipantSender::Quic(t) => {
@@ -1629,13 +1727,24 @@ async fn run_participant_trunked(
"trunked send error: {e}"
);
}
} else {
actual_fan_out += 1;
}
}
ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await;
actual_fan_out += 1;
}
}
}
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_packet(&room_name, "in", &addr, &pkt, actual_fan_out);
}
}
if let Some(ref mut ts) = tap_stats {
ts.record_in(&pkt, actual_fan_out);
}
let fwd_ms = fwd_start.elapsed().as_millis() as u64;
if fwd_ms > max_forward_ms {
max_forward_ms = fwd_ms;
@@ -1645,12 +1754,12 @@ async fn run_participant_trunked(
room = %room_name,
participant = participant_id,
fwd_ms,
fan_out = others.len(),
fan_out = actual_fan_out,
"slow forward (trunked)"
);
}
let fan_out = others.len() as u64;
let fan_out = actual_fan_out as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
@@ -1669,6 +1778,10 @@ async fn run_participant_trunked(
send_errors,
"participant stats (trunked)"
);
if let (Some(tap), Some(ts)) = (&debug_tap, &mut tap_stats) {
tap.log_stats(&room_name, ts);
ts.reset_period();
}
max_recv_gap_ms = 0;
max_forward_ms = 0;
last_log_instant = std::time::Instant::now();