diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 17c3773..5367b68 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -651,12 +651,15 @@ impl RoomManager { let key = (room_name.to_string(), sender_id, h.stream_id); if h.is_keyframe() { - let mut entry = self.keyframe_buffer.entry(key.clone()).or_insert_with(|| KeyframeBuffer { - packets: Vec::new(), - sequence_first: h.seq, - timestamp_ms: h.timestamp, - total_bytes: 0, - }); + let mut entry = + self.keyframe_buffer + .entry(key.clone()) + .or_insert_with(|| KeyframeBuffer { + packets: Vec::new(), + sequence_first: h.seq, + timestamp_ms: h.timestamp, + total_bytes: 0, + }); let pkt_bytes = pkt.payload.len(); // If this would overflow the per-stream cap, drop the partial buffer @@ -678,8 +681,7 @@ impl RoomManager { timestamp_ms: entry.timestamp_ms, total_bytes: entry.total_bytes, }; - self.keyframe_cache - .insert(key.clone(), completed); + self.keyframe_cache.insert(key.clone(), completed); entry.total_bytes = 0; } } else { @@ -692,10 +694,7 @@ impl RoomManager { /// /// Used to replay keyframes to a newly-joined participant before live /// forwarding starts. - pub fn cached_keyframes_for_room( - &self, - room_name: &str, - ) -> Vec> { + pub fn cached_keyframes_for_room(&self, room_name: &str) -> Vec> { self.keyframe_cache .iter() .filter(|e| e.key().0 == room_name) @@ -705,32 +704,34 @@ impl RoomManager { /// Remove all per-room state when a room is closed. fn clear_room_state(&self, room_name: &str) { - self.keyframe_cache - .retain(|k, _| k.0 != room_name); - self.keyframe_buffer - .retain(|k, _| k.0 != room_name); - self.pli_state - .retain(|k, _| k.0 != room_name); - self.stream_owner - .retain(|k, _| k.0 != room_name); + self.keyframe_cache.retain(|k, _| k.0 != room_name); + self.keyframe_buffer.retain(|k, _| k.0 != room_name); + self.pli_state.retain(|k, _| k.0 != room_name); + self.stream_owner.retain(|k, _| k.0 != room_name); } /// PLI suppression window (PRD-video-v1 T4.7). const PLI_SUPPRESS_MS: u64 = 200; - /// Returns `true` if this PLI should be forwarded upstream. + /// Returns `Some(sender_id)` if this PLI should be forwarded upstream, + /// or `None` if it is suppressed (duplicate within 200 ms) or no sender + /// is mapped to the given stream. /// /// Suppresses duplicate PLIs for the same `(room, sender, stream_id)` - /// within 200 ms. Looks up the current owner of `stream_id` in the room - /// and uses `(owner, stream)` as the suppression key. - pub fn should_forward_pli(&self, room_name: &str, stream_id: u8) -> Option { + /// within 200 ms. `now` is taken as a parameter so the dedup window can + /// be exercised deterministically by tests. + pub fn should_forward_pli( + &self, + room_name: &str, + stream_id: u8, + now: std::time::Instant, + ) -> Option { let owner = self.stream_owner.get(&(room_name.to_string(), stream_id))?; let sender_id = *owner; drop(owner); let key = (room_name.to_string(), sender_id, stream_id); - let now = std::time::Instant::now(); if let Some(entry) = self.pli_state.get(&key) { - let elapsed = entry.last_pli.elapsed().as_millis() as u64; + let elapsed = now.saturating_duration_since(entry.last_pli).as_millis() as u64; if elapsed < Self::PLI_SUPPRESS_MS { return None; } @@ -916,7 +917,8 @@ pub async fn run_participant_signals( loop { match transport.recv_signal().await { Ok(Some(wzp_proto::SignalMessage::PictureLossIndication { stream_id, .. })) => { - match room_mgr.should_forward_pli(&room_name, stream_id) { + match room_mgr.should_forward_pli(&room_name, stream_id, std::time::Instant::now()) + { Some(_target_id) => { // Forward PLI to the specific sender that owns this stream. let others = room_mgr.others(&room_name, participant_id); @@ -1082,10 +1084,9 @@ async fn run_participant_plain( room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt); // Register this participant as the owner of this stream for PLI routing. if !pkt.header.is_repair() { - room_mgr.stream_owner.insert( - (room_name.clone(), pkt.header.stream_id), - participant_id, - ); + room_mgr + .stream_owner + .insert((room_name.clone(), pkt.header.stream_id), participant_id); } let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; @@ -1680,4 +1681,99 @@ mod tests { "weakest should not be Good when one participant is bad" ); } + + // PLI suppression tests (T4.7 rework). + // + // `should_forward_pli` takes `now: Instant` as a parameter so we can + // drive the dedup window deterministically. Each test uses a base + // `Instant::now()` and offsets via `+ Duration::from_millis(N)`. + + fn seed_stream_owner(mgr: &RoomManager, room: &str, stream_id: u8, owner: ParticipantId) { + mgr.stream_owner + .insert((room.to_string(), stream_id), owner); + } + + #[test] + fn pli_first_forwards() { + let mgr = RoomManager::new(); + let owner: ParticipantId = 1; + seed_stream_owner(&mgr, "room", 0, owner); + let t0 = std::time::Instant::now(); + assert_eq!( + mgr.should_forward_pli("room", 0, t0), + Some(owner), + "first PLI for a stream should be forwarded" + ); + } + + #[test] + fn pli_within_window_suppressed() { + let mgr = RoomManager::new(); + let owner: ParticipantId = 1; + seed_stream_owner(&mgr, "room", 0, owner); + let t0 = std::time::Instant::now(); + assert!(mgr.should_forward_pli("room", 0, t0).is_some()); + let t1 = t0 + std::time::Duration::from_millis(100); + assert_eq!( + mgr.should_forward_pli("room", 0, t1), + None, + "PLI within the 200 ms suppression window must be dropped" + ); + } + + #[test] + fn pli_after_window_forwards() { + let mgr = RoomManager::new(); + let owner: ParticipantId = 1; + seed_stream_owner(&mgr, "room", 0, owner); + let t0 = std::time::Instant::now(); + assert!(mgr.should_forward_pli("room", 0, t0).is_some()); + let t1 = t0 + std::time::Duration::from_millis(300); + assert_eq!( + mgr.should_forward_pli("room", 0, t1), + Some(owner), + "PLI after the suppression window should forward again" + ); + } + + #[test] + fn pli_different_streams_independent() { + let mgr = RoomManager::new(); + let owner_a: ParticipantId = 1; + let owner_b: ParticipantId = 2; + seed_stream_owner(&mgr, "room", 0, owner_a); + seed_stream_owner(&mgr, "room", 1, owner_b); + let t0 = std::time::Instant::now(); + assert!(mgr.should_forward_pli("room", 0, t0).is_some()); + assert!( + mgr.should_forward_pli("room", 1, t0).is_some(), + "PLI on a different stream within the window must not be suppressed" + ); + } + + #[test] + fn pli_different_rooms_independent() { + let mgr = RoomManager::new(); + let owner_a: ParticipantId = 1; + let owner_b: ParticipantId = 2; + seed_stream_owner(&mgr, "room-a", 0, owner_a); + seed_stream_owner(&mgr, "room-b", 0, owner_b); + let t0 = std::time::Instant::now(); + assert!(mgr.should_forward_pli("room-a", 0, t0).is_some()); + assert!( + mgr.should_forward_pli("room-b", 0, t0).is_some(), + "PLI in a different room within the window must not be suppressed" + ); + } + + #[test] + fn pli_no_owner_returns_none() { + let mgr = RoomManager::new(); + let t0 = std::time::Instant::now(); + assert_eq!( + mgr.should_forward_pli("room", 0, t0), + None, + "PLI for a stream with no mapped owner should return None" + ); + } }