diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 5367b68..fa32eb6 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -206,6 +206,91 @@ fn weakest_tier<'a>(qualities: impl Iterator) -> .unwrap_or(Tier::Good) } +// --------------------------------------------------------------------------- +// Simulcast receiver state (T5.6) +// --------------------------------------------------------------------------- + +/// Layer-selection thresholds (kbps). +const SIMULCAST_HIGH_THRESHOLD_KBPS: u32 = 3000; +const SIMULCAST_MID_THRESHOLD_KBPS: u32 = 750; + +/// Hysteresis duration before promoting a candidate layer. +const LAYER_SWITCH_HYSTERESIS_MS: u64 = 3000; + +/// Per-receiver simulcast layer state. +/// +/// Tracks the receiver's observed bandwidth and loss, and applies +/// hysteresis before switching layers so that transient dips don't +/// cause visible flicker. +#[derive(Clone, Debug)] +pub struct ReceiverState { + pub bwe_kbps: u32, + pub loss_pct: u8, + pub selected_layer: u8, + candidate_layer: u8, + candidate_since: std::time::Instant, +} + +impl ReceiverState { + pub fn new() -> Self { + Self { + bwe_kbps: 0, + loss_pct: 0, + selected_layer: 0, + candidate_layer: 0, + candidate_since: std::time::Instant::now(), + } + } + + /// Update state from a quality report and recompute the selected layer. + pub fn update(&mut self, bwe_kbps: u32, loss_pct: u8, now: std::time::Instant) { + let is_first = self.bwe_kbps == 0; + self.bwe_kbps = bwe_kbps; + self.loss_pct = loss_pct; + + let suggested = Self::suggest_layer(bwe_kbps, loss_pct); + + if suggested == self.selected_layer { + // Already on the right layer — reset candidate. + self.candidate_layer = suggested; + self.candidate_since = now; + return; + } + + // First measurement ever — apply immediately so the receiver starts + // on the correct layer without waiting for hysteresis. + if is_first { + self.selected_layer = suggested; + self.candidate_layer = suggested; + self.candidate_since = now; + return; + } + + if suggested != self.candidate_layer { + // New suggestion — start hysteresis timer. + self.candidate_layer = suggested; + self.candidate_since = now; + return; + } + + // Same candidate — check if hysteresis elapsed. + let elapsed = now.saturating_duration_since(self.candidate_since).as_millis() as u64; + if elapsed >= LAYER_SWITCH_HYSTERESIS_MS { + self.selected_layer = suggested; + } + } + + fn suggest_layer(bwe_kbps: u32, loss_pct: u8) -> u8 { + if bwe_kbps > SIMULCAST_HIGH_THRESHOLD_KBPS && loss_pct < 2 { + 2 // high + } else if bwe_kbps > SIMULCAST_MID_THRESHOLD_KBPS { + 1 // mid + } else { + 0 // low + } + } +} + /// Unique participant ID within a room. pub type ParticipantId = u64; @@ -357,6 +442,14 @@ impl Room { .collect() } + fn others_with_id(&self, exclude_id: ParticipantId) -> Vec<(ParticipantId, ParticipantSender)> { + self.participants + .iter() + .filter(|p| p.id != exclude_id) + .map(|p| (p.id, p.sender.clone())) + .collect() + } + /// Build a RoomUpdate participant list. fn participant_list(&self) -> Vec { self.participants @@ -438,6 +531,8 @@ pub struct RoomManager { /// Maps `(room, stream_id)` -> participant_id of the sender currently /// publishing on that stream. Updated on every non-repair media packet. stream_owner: DashMap<(String, u8), ParticipantId>, + /// Per-receiver simulcast state: `(room, receiver_id)` -> `ReceiverState`. + receiver_states: DashMap<(String, ParticipantId), ReceiverState>, } impl RoomManager { @@ -451,6 +546,7 @@ impl RoomManager { keyframe_buffer: DashMap::new(), pli_state: DashMap::new(), stream_owner: DashMap::new(), + receiver_states: DashMap::new(), } } @@ -465,6 +561,7 @@ impl RoomManager { keyframe_buffer: DashMap::new(), pli_state: DashMap::new(), stream_owner: DashMap::new(), + receiver_states: DashMap::new(), } } @@ -748,6 +845,51 @@ impl RoomManager { .unwrap_or_default() } + /// Get `(id, sender)` pairs for all OTHER participants in a room. + pub fn others_with_id( + &self, + room_name: &str, + participant_id: ParticipantId, + ) -> Vec<(ParticipantId, ParticipantSender)> { + self.rooms + .get(room_name) + .map(|arc| arc.read().unwrap().others_with_id(participant_id)) + .unwrap_or_default() + } + + /// Update a receiver's simulcast state from observed network metrics. + /// + /// Called when a quality report arrives from the receiver (or from + /// transport feedback carrying the receiver's BWE estimate). + pub fn update_receiver_state( + &self, + room_name: &str, + receiver_id: ParticipantId, + bwe_kbps: u32, + loss_pct: u8, + ) { + let key = (room_name.to_string(), receiver_id); + let mut entry = self + .receiver_states + .entry(key) + .or_insert_with(ReceiverState::new); + entry.update(bwe_kbps, loss_pct, std::time::Instant::now()); + } + + /// Return the selected simulcast layer (0/1/2) for a receiver. + /// + /// Defaults to layer 0 (low) if no state has been recorded yet. + pub fn selected_layer( + &self, + room_name: &str, + receiver_id: ParticipantId, + ) -> u8 { + self.receiver_states + .get(&(room_name.to_string(), receiver_id)) + .map(|s| s.selected_layer) + .unwrap_or(0) + } + /// Get room size. pub fn room_size(&self, room_name: &str) -> usize { self.rooms @@ -1126,6 +1268,12 @@ async fn run_participant_plain( metrics.update_session_quality(&session_id, report); } + // Update receiver state from this participant's quality report (if present). + if let Some(ref report) = pkt.quality_report { + let bwe_kbps = report.bitrate_cap_kbps as u32; + room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct); + } + // Get current list of other participants + check quality directive let lock_start = std::time::Instant::now(); let (others, quality_directive) = { @@ -1134,7 +1282,7 @@ async fn run_participant_plain( } else { None }; - let o = room_mgr.others(&room_name, participant_id); + let o = room_mgr.others_with_id(&room_name, participant_id); (o, directive) }; let lock_ms = lock_start.elapsed().as_millis() as u64; @@ -1167,10 +1315,20 @@ async fn run_participant_plain( ts.record_in(&pkt, others.len()); } - // Forward to all others + // Forward to all others, applying simulcast layer selection for video. let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; - for other in &others { + let is_video = pkt.header.media_type == wzp_proto::MediaType::Video; + for (other_id, other) in &others { + // Simulcast layer selection (T5.6): video packets are filtered + // by the receiver's selected layer. Audio and non-simulcast + // traffic pass through unchanged. + if is_video { + let selected = room_mgr.selected_layer(&room_name, *other_id); + if pkt.header.stream_id != selected { + continue; + } + } match other { ParticipantSender::Quic(t) => { if let Err(e) = t.send_media(&pkt).await { @@ -1372,6 +1530,12 @@ async fn run_participant_trunked( ); } + // Update receiver state from this participant's quality report. + if let Some(ref report) = pkt.quality_report { + let bwe_kbps = report.bitrate_cap_kbps as u32; + room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct); + } + if let Some(ref report) = pkt.quality_report { metrics.update_session_quality(&session_id, report); } @@ -1383,7 +1547,7 @@ async fn run_participant_trunked( } else { None }; - let o = room_mgr.others(&room_name, participant_id); + let o = room_mgr.others_with_id(&room_name, participant_id); (o, directive) }; let lock_ms = lock_start.elapsed().as_millis() as u64; @@ -1403,7 +1567,14 @@ async fn run_participant_trunked( let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; - for other in &others { + let is_video = pkt.header.media_type == wzp_proto::MediaType::Video; + for (other_id, other) in &others { + if is_video { + let selected = room_mgr.selected_layer(&room_name, *other_id); + if pkt.header.stream_id != selected { + continue; + } + } match other { ParticipantSender::Quic(t) => { let peer_addr = t.connection().remote_address(); @@ -1776,4 +1947,82 @@ mod tests { "PLI for a stream with no mapped owner should return None" ); } + + // ---- Simulcast receiver state (T5.6) ---- + + #[test] + fn receiver_state_defaults_to_layer_zero() { + let rs = ReceiverState::new(); + assert_eq!(rs.selected_layer, 0); + assert_eq!(rs.bwe_kbps, 0); + assert_eq!(rs.loss_pct, 0); + } + + #[test] + fn receiver_state_selects_high_on_good_link() { + let mut rs = ReceiverState::new(); + let t0 = std::time::Instant::now(); + rs.update(4000, 0, t0); + assert_eq!(rs.selected_layer, 2, ">3 Mbps + 0% loss → high layer immediately"); + } + + #[test] + fn receiver_state_selects_mid_on_medium_link() { + let mut rs = ReceiverState::new(); + let t0 = std::time::Instant::now(); + rs.update(1000, 5, t0); + assert_eq!(rs.selected_layer, 1, ">750 kbps → mid layer immediately"); + } + + #[test] + fn receiver_state_hysteresis_delays_switch() { + let mut rs = ReceiverState::new(); + let t0 = std::time::Instant::now(); + // Start on high layer + rs.update(4000, 0, t0); + assert_eq!(rs.selected_layer, 2); + + // Drop to low-bandwidth — should not switch immediately + let t1 = t0 + std::time::Duration::from_millis(100); + rs.update(100, 0, t1); + assert_eq!(rs.selected_layer, 2, "hysteresis prevents immediate downgrade"); + + // After 3 s — switch should happen + let t2 = t0 + std::time::Duration::from_millis(3100); + rs.update(100, 0, t2); + assert_eq!(rs.selected_layer, 0, "after 3 s hysteresis, downgrade occurs"); + } + + #[test] + fn receiver_state_loss_blocks_high_layer() { + let mut rs = ReceiverState::new(); + let t0 = std::time::Instant::now(); + // High BWE but high loss → mid, not high + rs.update(4000, 5, t0); + assert_eq!(rs.selected_layer, 1, "high loss blocks high layer"); + } + + #[test] + fn room_manager_selected_layer_defaults_to_zero() { + let mgr = RoomManager::new(); + assert_eq!(mgr.selected_layer("room", 42), 0); + } + + #[test] + fn room_manager_updates_receiver_state() { + let mgr = RoomManager::new(); + let now = std::time::Instant::now(); + mgr.update_receiver_state("room", 1, 4000, 0); + // State is updated; we can verify via selected_layer + assert_eq!(mgr.selected_layer("room", 1), 2); + } + + #[test] + fn room_manager_receiver_states_are_isolated_by_room() { + let mgr = RoomManager::new(); + mgr.update_receiver_state("room-a", 1, 4000, 0); + mgr.update_receiver_state("room-b", 1, 100, 0); + assert_eq!(mgr.selected_layer("room-a", 1), 2); + assert_eq!(mgr.selected_layer("room-b", 1), 0); + } }