T4.7 rework: make should_forward_pli take now: Instant + 6 unit tests

- Refactor should_forward_pli(room, stream_id) -> should_forward_pli(room, stream_id, now: Instant)
  so the 200 ms dedup window is deterministically testable.
- Update the one caller in run_participant_signals to pass Instant::now().
- Add 6 PLI unit tests covering:
  * first PLI forwards
  * duplicate within 200 ms suppressed
  * after 200 ms forwards again
  * different streams independent
  * different rooms independent
  * no stream owner returns None

Addresses reviewer CR on T4.7 (line drawn at T4.6 — stateful relay features must
have state-transition tests).

wzp-relay tests: 93 -> 99 pass.
This commit is contained in:
Siavash Sameni
2026-05-12 11:39:35 +04:00
parent 36b0421d68
commit 001d94f9ae

View File

@@ -651,7 +651,10 @@ impl RoomManager {
let key = (room_name.to_string(), sender_id, h.stream_id); let key = (room_name.to_string(), sender_id, h.stream_id);
if h.is_keyframe() { if h.is_keyframe() {
let mut entry = self.keyframe_buffer.entry(key.clone()).or_insert_with(|| KeyframeBuffer { let mut entry =
self.keyframe_buffer
.entry(key.clone())
.or_insert_with(|| KeyframeBuffer {
packets: Vec::new(), packets: Vec::new(),
sequence_first: h.seq, sequence_first: h.seq,
timestamp_ms: h.timestamp, timestamp_ms: h.timestamp,
@@ -678,8 +681,7 @@ impl RoomManager {
timestamp_ms: entry.timestamp_ms, timestamp_ms: entry.timestamp_ms,
total_bytes: entry.total_bytes, total_bytes: entry.total_bytes,
}; };
self.keyframe_cache self.keyframe_cache.insert(key.clone(), completed);
.insert(key.clone(), completed);
entry.total_bytes = 0; entry.total_bytes = 0;
} }
} else { } else {
@@ -692,10 +694,7 @@ impl RoomManager {
/// ///
/// Used to replay keyframes to a newly-joined participant before live /// Used to replay keyframes to a newly-joined participant before live
/// forwarding starts. /// forwarding starts.
pub fn cached_keyframes_for_room( pub fn cached_keyframes_for_room(&self, room_name: &str) -> Vec<Vec<wzp_proto::MediaPacket>> {
&self,
room_name: &str,
) -> Vec<Vec<wzp_proto::MediaPacket>> {
self.keyframe_cache self.keyframe_cache
.iter() .iter()
.filter(|e| e.key().0 == room_name) .filter(|e| e.key().0 == room_name)
@@ -705,32 +704,34 @@ impl RoomManager {
/// Remove all per-room state when a room is closed. /// Remove all per-room state when a room is closed.
fn clear_room_state(&self, room_name: &str) { fn clear_room_state(&self, room_name: &str) {
self.keyframe_cache self.keyframe_cache.retain(|k, _| k.0 != room_name);
.retain(|k, _| k.0 != room_name); self.keyframe_buffer.retain(|k, _| k.0 != room_name);
self.keyframe_buffer self.pli_state.retain(|k, _| k.0 != room_name);
.retain(|k, _| k.0 != room_name); self.stream_owner.retain(|k, _| k.0 != room_name);
self.pli_state
.retain(|k, _| k.0 != room_name);
self.stream_owner
.retain(|k, _| k.0 != room_name);
} }
/// PLI suppression window (PRD-video-v1 T4.7). /// PLI suppression window (PRD-video-v1 T4.7).
const PLI_SUPPRESS_MS: u64 = 200; const PLI_SUPPRESS_MS: u64 = 200;
/// Returns `true` if this PLI should be forwarded upstream. /// Returns `Some(sender_id)` if this PLI should be forwarded upstream,
/// or `None` if it is suppressed (duplicate within 200 ms) or no sender
/// is mapped to the given stream.
/// ///
/// Suppresses duplicate PLIs for the same `(room, sender, stream_id)` /// Suppresses duplicate PLIs for the same `(room, sender, stream_id)`
/// within 200 ms. Looks up the current owner of `stream_id` in the room /// within 200 ms. `now` is taken as a parameter so the dedup window can
/// and uses `(owner, stream)` as the suppression key. /// be exercised deterministically by tests.
pub fn should_forward_pli(&self, room_name: &str, stream_id: u8) -> Option<ParticipantId> { pub fn should_forward_pli(
&self,
room_name: &str,
stream_id: u8,
now: std::time::Instant,
) -> Option<ParticipantId> {
let owner = self.stream_owner.get(&(room_name.to_string(), stream_id))?; let owner = self.stream_owner.get(&(room_name.to_string(), stream_id))?;
let sender_id = *owner; let sender_id = *owner;
drop(owner); drop(owner);
let key = (room_name.to_string(), sender_id, stream_id); let key = (room_name.to_string(), sender_id, stream_id);
let now = std::time::Instant::now();
if let Some(entry) = self.pli_state.get(&key) { if let Some(entry) = self.pli_state.get(&key) {
let elapsed = entry.last_pli.elapsed().as_millis() as u64; let elapsed = now.saturating_duration_since(entry.last_pli).as_millis() as u64;
if elapsed < Self::PLI_SUPPRESS_MS { if elapsed < Self::PLI_SUPPRESS_MS {
return None; return None;
} }
@@ -916,7 +917,8 @@ pub async fn run_participant_signals(
loop { loop {
match transport.recv_signal().await { match transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::PictureLossIndication { stream_id, .. })) => { Ok(Some(wzp_proto::SignalMessage::PictureLossIndication { stream_id, .. })) => {
match room_mgr.should_forward_pli(&room_name, stream_id) { match room_mgr.should_forward_pli(&room_name, stream_id, std::time::Instant::now())
{
Some(_target_id) => { Some(_target_id) => {
// Forward PLI to the specific sender that owns this stream. // Forward PLI to the specific sender that owns this stream.
let others = room_mgr.others(&room_name, participant_id); let others = room_mgr.others(&room_name, participant_id);
@@ -1082,10 +1084,9 @@ async fn run_participant_plain(
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt); room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
// Register this participant as the owner of this stream for PLI routing. // Register this participant as the owner of this stream for PLI routing.
if !pkt.header.is_repair() { if !pkt.header.is_repair() {
room_mgr.stream_owner.insert( room_mgr
(room_name.clone(), pkt.header.stream_id), .stream_owner
participant_id, .insert((room_name.clone(), pkt.header.stream_id), participant_id);
);
} }
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
@@ -1680,4 +1681,99 @@ mod tests {
"weakest should not be Good when one participant is bad" "weakest should not be Good when one participant is bad"
); );
} }
// PLI suppression tests (T4.7 rework).
//
// `should_forward_pli` takes `now: Instant` as a parameter so we can
// drive the dedup window deterministically. Each test uses a base
// `Instant::now()` and offsets via `+ Duration::from_millis(N)`.
fn seed_stream_owner(mgr: &RoomManager, room: &str, stream_id: u8, owner: ParticipantId) {
mgr.stream_owner
.insert((room.to_string(), stream_id), owner);
}
#[test]
fn pli_first_forwards() {
let mgr = RoomManager::new();
let owner: ParticipantId = 1;
seed_stream_owner(&mgr, "room", 0, owner);
let t0 = std::time::Instant::now();
assert_eq!(
mgr.should_forward_pli("room", 0, t0),
Some(owner),
"first PLI for a stream should be forwarded"
);
}
#[test]
fn pli_within_window_suppressed() {
let mgr = RoomManager::new();
let owner: ParticipantId = 1;
seed_stream_owner(&mgr, "room", 0, owner);
let t0 = std::time::Instant::now();
assert!(mgr.should_forward_pli("room", 0, t0).is_some());
let t1 = t0 + std::time::Duration::from_millis(100);
assert_eq!(
mgr.should_forward_pli("room", 0, t1),
None,
"PLI within the 200 ms suppression window must be dropped"
);
}
#[test]
fn pli_after_window_forwards() {
let mgr = RoomManager::new();
let owner: ParticipantId = 1;
seed_stream_owner(&mgr, "room", 0, owner);
let t0 = std::time::Instant::now();
assert!(mgr.should_forward_pli("room", 0, t0).is_some());
let t1 = t0 + std::time::Duration::from_millis(300);
assert_eq!(
mgr.should_forward_pli("room", 0, t1),
Some(owner),
"PLI after the suppression window should forward again"
);
}
#[test]
fn pli_different_streams_independent() {
let mgr = RoomManager::new();
let owner_a: ParticipantId = 1;
let owner_b: ParticipantId = 2;
seed_stream_owner(&mgr, "room", 0, owner_a);
seed_stream_owner(&mgr, "room", 1, owner_b);
let t0 = std::time::Instant::now();
assert!(mgr.should_forward_pli("room", 0, t0).is_some());
assert!(
mgr.should_forward_pli("room", 1, t0).is_some(),
"PLI on a different stream within the window must not be suppressed"
);
}
#[test]
fn pli_different_rooms_independent() {
let mgr = RoomManager::new();
let owner_a: ParticipantId = 1;
let owner_b: ParticipantId = 2;
seed_stream_owner(&mgr, "room-a", 0, owner_a);
seed_stream_owner(&mgr, "room-b", 0, owner_b);
let t0 = std::time::Instant::now();
assert!(mgr.should_forward_pli("room-a", 0, t0).is_some());
assert!(
mgr.should_forward_pli("room-b", 0, t0).is_some(),
"PLI in a different room within the window must not be suppressed"
);
}
#[test]
fn pli_no_owner_returns_none() {
let mgr = RoomManager::new();
let t0 = std::time::Instant::now();
assert_eq!(
mgr.should_forward_pli("room", 0, t0),
None,
"PLI for a stream with no mapped owner should return None"
);
}
} }