T5.6: Per-receiver layer selection at SFU — ReceiverState + hysteresis + forwarding filter

This commit is contained in:
Siavash Sameni
2026-05-12 15:05:32 +04:00
parent 2f1a9f74d5
commit 2bbb664df4

View File

@@ -206,6 +206,91 @@ fn weakest_tier<'a>(qualities: impl Iterator<Item = &'a ParticipantQuality>) ->
.unwrap_or(Tier::Good) .unwrap_or(Tier::Good)
} }
// ---------------------------------------------------------------------------
// Simulcast receiver state (T5.6)
// ---------------------------------------------------------------------------
/// Layer-selection thresholds (kbps).
const SIMULCAST_HIGH_THRESHOLD_KBPS: u32 = 3000;
const SIMULCAST_MID_THRESHOLD_KBPS: u32 = 750;
/// Hysteresis duration before promoting a candidate layer.
const LAYER_SWITCH_HYSTERESIS_MS: u64 = 3000;
/// Per-receiver simulcast layer state.
///
/// Tracks the receiver's observed bandwidth and loss, and applies
/// hysteresis before switching layers so that transient dips don't
/// cause visible flicker.
#[derive(Clone, Debug)]
pub struct ReceiverState {
pub bwe_kbps: u32,
pub loss_pct: u8,
pub selected_layer: u8,
candidate_layer: u8,
candidate_since: std::time::Instant,
}
impl ReceiverState {
pub fn new() -> Self {
Self {
bwe_kbps: 0,
loss_pct: 0,
selected_layer: 0,
candidate_layer: 0,
candidate_since: std::time::Instant::now(),
}
}
/// Update state from a quality report and recompute the selected layer.
pub fn update(&mut self, bwe_kbps: u32, loss_pct: u8, now: std::time::Instant) {
let is_first = self.bwe_kbps == 0;
self.bwe_kbps = bwe_kbps;
self.loss_pct = loss_pct;
let suggested = Self::suggest_layer(bwe_kbps, loss_pct);
if suggested == self.selected_layer {
// Already on the right layer — reset candidate.
self.candidate_layer = suggested;
self.candidate_since = now;
return;
}
// First measurement ever — apply immediately so the receiver starts
// on the correct layer without waiting for hysteresis.
if is_first {
self.selected_layer = suggested;
self.candidate_layer = suggested;
self.candidate_since = now;
return;
}
if suggested != self.candidate_layer {
// New suggestion — start hysteresis timer.
self.candidate_layer = suggested;
self.candidate_since = now;
return;
}
// Same candidate — check if hysteresis elapsed.
let elapsed = now.saturating_duration_since(self.candidate_since).as_millis() as u64;
if elapsed >= LAYER_SWITCH_HYSTERESIS_MS {
self.selected_layer = suggested;
}
}
fn suggest_layer(bwe_kbps: u32, loss_pct: u8) -> u8 {
if bwe_kbps > SIMULCAST_HIGH_THRESHOLD_KBPS && loss_pct < 2 {
2 // high
} else if bwe_kbps > SIMULCAST_MID_THRESHOLD_KBPS {
1 // mid
} else {
0 // low
}
}
}
/// Unique participant ID within a room. /// Unique participant ID within a room.
pub type ParticipantId = u64; pub type ParticipantId = u64;
@@ -357,6 +442,14 @@ impl Room {
.collect() .collect()
} }
fn others_with_id(&self, exclude_id: ParticipantId) -> Vec<(ParticipantId, ParticipantSender)> {
self.participants
.iter()
.filter(|p| p.id != exclude_id)
.map(|p| (p.id, p.sender.clone()))
.collect()
}
/// Build a RoomUpdate participant list. /// Build a RoomUpdate participant list.
fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> { fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> {
self.participants self.participants
@@ -438,6 +531,8 @@ pub struct RoomManager {
/// Maps `(room, stream_id)` -> participant_id of the sender currently /// Maps `(room, stream_id)` -> participant_id of the sender currently
/// publishing on that stream. Updated on every non-repair media packet. /// publishing on that stream. Updated on every non-repair media packet.
stream_owner: DashMap<(String, u8), ParticipantId>, stream_owner: DashMap<(String, u8), ParticipantId>,
/// Per-receiver simulcast state: `(room, receiver_id)` -> `ReceiverState`.
receiver_states: DashMap<(String, ParticipantId), ReceiverState>,
} }
impl RoomManager { impl RoomManager {
@@ -451,6 +546,7 @@ impl RoomManager {
keyframe_buffer: DashMap::new(), keyframe_buffer: DashMap::new(),
pli_state: DashMap::new(), pli_state: DashMap::new(),
stream_owner: DashMap::new(), stream_owner: DashMap::new(),
receiver_states: DashMap::new(),
} }
} }
@@ -465,6 +561,7 @@ impl RoomManager {
keyframe_buffer: DashMap::new(), keyframe_buffer: DashMap::new(),
pli_state: DashMap::new(), pli_state: DashMap::new(),
stream_owner: DashMap::new(), stream_owner: DashMap::new(),
receiver_states: DashMap::new(),
} }
} }
@@ -748,6 +845,51 @@ impl RoomManager {
.unwrap_or_default() .unwrap_or_default()
} }
/// Get `(id, sender)` pairs for all OTHER participants in a room.
pub fn others_with_id(
&self,
room_name: &str,
participant_id: ParticipantId,
) -> Vec<(ParticipantId, ParticipantSender)> {
self.rooms
.get(room_name)
.map(|arc| arc.read().unwrap().others_with_id(participant_id))
.unwrap_or_default()
}
/// Update a receiver's simulcast state from observed network metrics.
///
/// Called when a quality report arrives from the receiver (or from
/// transport feedback carrying the receiver's BWE estimate).
pub fn update_receiver_state(
&self,
room_name: &str,
receiver_id: ParticipantId,
bwe_kbps: u32,
loss_pct: u8,
) {
let key = (room_name.to_string(), receiver_id);
let mut entry = self
.receiver_states
.entry(key)
.or_insert_with(ReceiverState::new);
entry.update(bwe_kbps, loss_pct, std::time::Instant::now());
}
/// Return the selected simulcast layer (0/1/2) for a receiver.
///
/// Defaults to layer 0 (low) if no state has been recorded yet.
pub fn selected_layer(
&self,
room_name: &str,
receiver_id: ParticipantId,
) -> u8 {
self.receiver_states
.get(&(room_name.to_string(), receiver_id))
.map(|s| s.selected_layer)
.unwrap_or(0)
}
/// Get room size. /// Get room size.
pub fn room_size(&self, room_name: &str) -> usize { pub fn room_size(&self, room_name: &str) -> usize {
self.rooms self.rooms
@@ -1126,6 +1268,12 @@ async fn run_participant_plain(
metrics.update_session_quality(&session_id, report); metrics.update_session_quality(&session_id, report);
} }
// Update receiver state from this participant's quality report (if present).
if let Some(ref report) = pkt.quality_report {
let bwe_kbps = report.bitrate_cap_kbps as u32;
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
}
// Get current list of other participants + check quality directive // Get current list of other participants + check quality directive
let lock_start = std::time::Instant::now(); let lock_start = std::time::Instant::now();
let (others, quality_directive) = { let (others, quality_directive) = {
@@ -1134,7 +1282,7 @@ async fn run_participant_plain(
} else { } else {
None None
}; };
let o = room_mgr.others(&room_name, participant_id); let o = room_mgr.others_with_id(&room_name, participant_id);
(o, directive) (o, directive)
}; };
let lock_ms = lock_start.elapsed().as_millis() as u64; let lock_ms = lock_start.elapsed().as_millis() as u64;
@@ -1167,10 +1315,20 @@ async fn run_participant_plain(
ts.record_in(&pkt, others.len()); ts.record_in(&pkt, others.len());
} }
// Forward to all others // Forward to all others, applying simulcast layer selection for video.
let fwd_start = std::time::Instant::now(); let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64; let pkt_bytes = pkt.payload.len() as u64;
for other in &others { let is_video = pkt.header.media_type == wzp_proto::MediaType::Video;
for (other_id, other) in &others {
// Simulcast layer selection (T5.6): video packets are filtered
// by the receiver's selected layer. Audio and non-simulcast
// traffic pass through unchanged.
if is_video {
let selected = room_mgr.selected_layer(&room_name, *other_id);
if pkt.header.stream_id != selected {
continue;
}
}
match other { match other {
ParticipantSender::Quic(t) => { ParticipantSender::Quic(t) => {
if let Err(e) = t.send_media(&pkt).await { if let Err(e) = t.send_media(&pkt).await {
@@ -1372,6 +1530,12 @@ async fn run_participant_trunked(
); );
} }
// Update receiver state from this participant's quality report.
if let Some(ref report) = pkt.quality_report {
let bwe_kbps = report.bitrate_cap_kbps as u32;
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
}
if let Some(ref report) = pkt.quality_report { if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(&session_id, report); metrics.update_session_quality(&session_id, report);
} }
@@ -1383,7 +1547,7 @@ async fn run_participant_trunked(
} else { } else {
None None
}; };
let o = room_mgr.others(&room_name, participant_id); let o = room_mgr.others_with_id(&room_name, participant_id);
(o, directive) (o, directive)
}; };
let lock_ms = lock_start.elapsed().as_millis() as u64; let lock_ms = lock_start.elapsed().as_millis() as u64;
@@ -1403,7 +1567,14 @@ async fn run_participant_trunked(
let fwd_start = std::time::Instant::now(); let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64; let pkt_bytes = pkt.payload.len() as u64;
for other in &others { let is_video = pkt.header.media_type == wzp_proto::MediaType::Video;
for (other_id, other) in &others {
if is_video {
let selected = room_mgr.selected_layer(&room_name, *other_id);
if pkt.header.stream_id != selected {
continue;
}
}
match other { match other {
ParticipantSender::Quic(t) => { ParticipantSender::Quic(t) => {
let peer_addr = t.connection().remote_address(); let peer_addr = t.connection().remote_address();
@@ -1776,4 +1947,82 @@ mod tests {
"PLI for a stream with no mapped owner should return None" "PLI for a stream with no mapped owner should return None"
); );
} }
// ---- Simulcast receiver state (T5.6) ----
#[test]
fn receiver_state_defaults_to_layer_zero() {
let rs = ReceiverState::new();
assert_eq!(rs.selected_layer, 0);
assert_eq!(rs.bwe_kbps, 0);
assert_eq!(rs.loss_pct, 0);
}
#[test]
fn receiver_state_selects_high_on_good_link() {
let mut rs = ReceiverState::new();
let t0 = std::time::Instant::now();
rs.update(4000, 0, t0);
assert_eq!(rs.selected_layer, 2, ">3 Mbps + 0% loss → high layer immediately");
}
#[test]
fn receiver_state_selects_mid_on_medium_link() {
let mut rs = ReceiverState::new();
let t0 = std::time::Instant::now();
rs.update(1000, 5, t0);
assert_eq!(rs.selected_layer, 1, ">750 kbps → mid layer immediately");
}
#[test]
fn receiver_state_hysteresis_delays_switch() {
let mut rs = ReceiverState::new();
let t0 = std::time::Instant::now();
// Start on high layer
rs.update(4000, 0, t0);
assert_eq!(rs.selected_layer, 2);
// Drop to low-bandwidth — should not switch immediately
let t1 = t0 + std::time::Duration::from_millis(100);
rs.update(100, 0, t1);
assert_eq!(rs.selected_layer, 2, "hysteresis prevents immediate downgrade");
// After 3 s — switch should happen
let t2 = t0 + std::time::Duration::from_millis(3100);
rs.update(100, 0, t2);
assert_eq!(rs.selected_layer, 0, "after 3 s hysteresis, downgrade occurs");
}
#[test]
fn receiver_state_loss_blocks_high_layer() {
let mut rs = ReceiverState::new();
let t0 = std::time::Instant::now();
// High BWE but high loss → mid, not high
rs.update(4000, 5, t0);
assert_eq!(rs.selected_layer, 1, "high loss blocks high layer");
}
#[test]
fn room_manager_selected_layer_defaults_to_zero() {
let mgr = RoomManager::new();
assert_eq!(mgr.selected_layer("room", 42), 0);
}
#[test]
fn room_manager_updates_receiver_state() {
let mgr = RoomManager::new();
let now = std::time::Instant::now();
mgr.update_receiver_state("room", 1, 4000, 0);
// State is updated; we can verify via selected_layer
assert_eq!(mgr.selected_layer("room", 1), 2);
}
#[test]
fn room_manager_receiver_states_are_isolated_by_room() {
let mgr = RoomManager::new();
mgr.update_receiver_state("room-a", 1, 4000, 0);
mgr.update_receiver_state("room-b", 1, 100, 0);
assert_eq!(mgr.selected_layer("room-a", 1), 2);
assert_eq!(mgr.selected_layer("room-b", 1), 0);
}
} }