T5.6: Per-receiver layer selection at SFU — ReceiverState + hysteresis + forwarding filter
This commit is contained in:
@@ -206,6 +206,91 @@ fn weakest_tier<'a>(qualities: impl Iterator<Item = &'a ParticipantQuality>) ->
|
||||
.unwrap_or(Tier::Good)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Simulcast receiver state (T5.6)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Layer-selection thresholds (kbps).
|
||||
const SIMULCAST_HIGH_THRESHOLD_KBPS: u32 = 3000;
|
||||
const SIMULCAST_MID_THRESHOLD_KBPS: u32 = 750;
|
||||
|
||||
/// Hysteresis duration before promoting a candidate layer.
|
||||
const LAYER_SWITCH_HYSTERESIS_MS: u64 = 3000;
|
||||
|
||||
/// Per-receiver simulcast layer state.
|
||||
///
|
||||
/// Tracks the receiver's observed bandwidth and loss, and applies
|
||||
/// hysteresis before switching layers so that transient dips don't
|
||||
/// cause visible flicker.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReceiverState {
|
||||
pub bwe_kbps: u32,
|
||||
pub loss_pct: u8,
|
||||
pub selected_layer: u8,
|
||||
candidate_layer: u8,
|
||||
candidate_since: std::time::Instant,
|
||||
}
|
||||
|
||||
impl ReceiverState {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
bwe_kbps: 0,
|
||||
loss_pct: 0,
|
||||
selected_layer: 0,
|
||||
candidate_layer: 0,
|
||||
candidate_since: std::time::Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update state from a quality report and recompute the selected layer.
|
||||
pub fn update(&mut self, bwe_kbps: u32, loss_pct: u8, now: std::time::Instant) {
|
||||
let is_first = self.bwe_kbps == 0;
|
||||
self.bwe_kbps = bwe_kbps;
|
||||
self.loss_pct = loss_pct;
|
||||
|
||||
let suggested = Self::suggest_layer(bwe_kbps, loss_pct);
|
||||
|
||||
if suggested == self.selected_layer {
|
||||
// Already on the right layer — reset candidate.
|
||||
self.candidate_layer = suggested;
|
||||
self.candidate_since = now;
|
||||
return;
|
||||
}
|
||||
|
||||
// First measurement ever — apply immediately so the receiver starts
|
||||
// on the correct layer without waiting for hysteresis.
|
||||
if is_first {
|
||||
self.selected_layer = suggested;
|
||||
self.candidate_layer = suggested;
|
||||
self.candidate_since = now;
|
||||
return;
|
||||
}
|
||||
|
||||
if suggested != self.candidate_layer {
|
||||
// New suggestion — start hysteresis timer.
|
||||
self.candidate_layer = suggested;
|
||||
self.candidate_since = now;
|
||||
return;
|
||||
}
|
||||
|
||||
// Same candidate — check if hysteresis elapsed.
|
||||
let elapsed = now.saturating_duration_since(self.candidate_since).as_millis() as u64;
|
||||
if elapsed >= LAYER_SWITCH_HYSTERESIS_MS {
|
||||
self.selected_layer = suggested;
|
||||
}
|
||||
}
|
||||
|
||||
fn suggest_layer(bwe_kbps: u32, loss_pct: u8) -> u8 {
|
||||
if bwe_kbps > SIMULCAST_HIGH_THRESHOLD_KBPS && loss_pct < 2 {
|
||||
2 // high
|
||||
} else if bwe_kbps > SIMULCAST_MID_THRESHOLD_KBPS {
|
||||
1 // mid
|
||||
} else {
|
||||
0 // low
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Unique participant ID within a room.
|
||||
pub type ParticipantId = u64;
|
||||
|
||||
@@ -357,6 +442,14 @@ impl Room {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn others_with_id(&self, exclude_id: ParticipantId) -> Vec<(ParticipantId, ParticipantSender)> {
|
||||
self.participants
|
||||
.iter()
|
||||
.filter(|p| p.id != exclude_id)
|
||||
.map(|p| (p.id, p.sender.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Build a RoomUpdate participant list.
|
||||
fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> {
|
||||
self.participants
|
||||
@@ -438,6 +531,8 @@ pub struct RoomManager {
|
||||
/// Maps `(room, stream_id)` -> participant_id of the sender currently
|
||||
/// publishing on that stream. Updated on every non-repair media packet.
|
||||
stream_owner: DashMap<(String, u8), ParticipantId>,
|
||||
/// Per-receiver simulcast state: `(room, receiver_id)` -> `ReceiverState`.
|
||||
receiver_states: DashMap<(String, ParticipantId), ReceiverState>,
|
||||
}
|
||||
|
||||
impl RoomManager {
|
||||
@@ -451,6 +546,7 @@ impl RoomManager {
|
||||
keyframe_buffer: DashMap::new(),
|
||||
pli_state: DashMap::new(),
|
||||
stream_owner: DashMap::new(),
|
||||
receiver_states: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,6 +561,7 @@ impl RoomManager {
|
||||
keyframe_buffer: DashMap::new(),
|
||||
pli_state: DashMap::new(),
|
||||
stream_owner: DashMap::new(),
|
||||
receiver_states: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -748,6 +845,51 @@ impl RoomManager {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get `(id, sender)` pairs for all OTHER participants in a room.
|
||||
pub fn others_with_id(
|
||||
&self,
|
||||
room_name: &str,
|
||||
participant_id: ParticipantId,
|
||||
) -> Vec<(ParticipantId, ParticipantSender)> {
|
||||
self.rooms
|
||||
.get(room_name)
|
||||
.map(|arc| arc.read().unwrap().others_with_id(participant_id))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Update a receiver's simulcast state from observed network metrics.
|
||||
///
|
||||
/// Called when a quality report arrives from the receiver (or from
|
||||
/// transport feedback carrying the receiver's BWE estimate).
|
||||
pub fn update_receiver_state(
|
||||
&self,
|
||||
room_name: &str,
|
||||
receiver_id: ParticipantId,
|
||||
bwe_kbps: u32,
|
||||
loss_pct: u8,
|
||||
) {
|
||||
let key = (room_name.to_string(), receiver_id);
|
||||
let mut entry = self
|
||||
.receiver_states
|
||||
.entry(key)
|
||||
.or_insert_with(ReceiverState::new);
|
||||
entry.update(bwe_kbps, loss_pct, std::time::Instant::now());
|
||||
}
|
||||
|
||||
/// Return the selected simulcast layer (0/1/2) for a receiver.
|
||||
///
|
||||
/// Defaults to layer 0 (low) if no state has been recorded yet.
|
||||
pub fn selected_layer(
|
||||
&self,
|
||||
room_name: &str,
|
||||
receiver_id: ParticipantId,
|
||||
) -> u8 {
|
||||
self.receiver_states
|
||||
.get(&(room_name.to_string(), receiver_id))
|
||||
.map(|s| s.selected_layer)
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Get room size.
|
||||
pub fn room_size(&self, room_name: &str) -> usize {
|
||||
self.rooms
|
||||
@@ -1126,6 +1268,12 @@ async fn run_participant_plain(
|
||||
metrics.update_session_quality(&session_id, report);
|
||||
}
|
||||
|
||||
// Update receiver state from this participant's quality report (if present).
|
||||
if let Some(ref report) = pkt.quality_report {
|
||||
let bwe_kbps = report.bitrate_cap_kbps as u32;
|
||||
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
|
||||
}
|
||||
|
||||
// Get current list of other participants + check quality directive
|
||||
let lock_start = std::time::Instant::now();
|
||||
let (others, quality_directive) = {
|
||||
@@ -1134,7 +1282,7 @@ async fn run_participant_plain(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let o = room_mgr.others(&room_name, participant_id);
|
||||
let o = room_mgr.others_with_id(&room_name, participant_id);
|
||||
(o, directive)
|
||||
};
|
||||
let lock_ms = lock_start.elapsed().as_millis() as u64;
|
||||
@@ -1167,10 +1315,20 @@ async fn run_participant_plain(
|
||||
ts.record_in(&pkt, others.len());
|
||||
}
|
||||
|
||||
// Forward to all others
|
||||
// Forward to all others, applying simulcast layer selection for video.
|
||||
let fwd_start = std::time::Instant::now();
|
||||
let pkt_bytes = pkt.payload.len() as u64;
|
||||
for other in &others {
|
||||
let is_video = pkt.header.media_type == wzp_proto::MediaType::Video;
|
||||
for (other_id, other) in &others {
|
||||
// Simulcast layer selection (T5.6): video packets are filtered
|
||||
// by the receiver's selected layer. Audio and non-simulcast
|
||||
// traffic pass through unchanged.
|
||||
if is_video {
|
||||
let selected = room_mgr.selected_layer(&room_name, *other_id);
|
||||
if pkt.header.stream_id != selected {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
match other {
|
||||
ParticipantSender::Quic(t) => {
|
||||
if let Err(e) = t.send_media(&pkt).await {
|
||||
@@ -1372,6 +1530,12 @@ async fn run_participant_trunked(
|
||||
);
|
||||
}
|
||||
|
||||
// Update receiver state from this participant's quality report.
|
||||
if let Some(ref report) = pkt.quality_report {
|
||||
let bwe_kbps = report.bitrate_cap_kbps as u32;
|
||||
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
|
||||
}
|
||||
|
||||
if let Some(ref report) = pkt.quality_report {
|
||||
metrics.update_session_quality(&session_id, report);
|
||||
}
|
||||
@@ -1383,7 +1547,7 @@ async fn run_participant_trunked(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let o = room_mgr.others(&room_name, participant_id);
|
||||
let o = room_mgr.others_with_id(&room_name, participant_id);
|
||||
(o, directive)
|
||||
};
|
||||
let lock_ms = lock_start.elapsed().as_millis() as u64;
|
||||
@@ -1403,7 +1567,14 @@ async fn run_participant_trunked(
|
||||
|
||||
let fwd_start = std::time::Instant::now();
|
||||
let pkt_bytes = pkt.payload.len() as u64;
|
||||
for other in &others {
|
||||
let is_video = pkt.header.media_type == wzp_proto::MediaType::Video;
|
||||
for (other_id, other) in &others {
|
||||
if is_video {
|
||||
let selected = room_mgr.selected_layer(&room_name, *other_id);
|
||||
if pkt.header.stream_id != selected {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
match other {
|
||||
ParticipantSender::Quic(t) => {
|
||||
let peer_addr = t.connection().remote_address();
|
||||
@@ -1776,4 +1947,82 @@ mod tests {
|
||||
"PLI for a stream with no mapped owner should return None"
|
||||
);
|
||||
}
|
||||
|
||||
// ---- Simulcast receiver state (T5.6) ----
|
||||
|
||||
#[test]
|
||||
fn receiver_state_defaults_to_layer_zero() {
|
||||
let rs = ReceiverState::new();
|
||||
assert_eq!(rs.selected_layer, 0);
|
||||
assert_eq!(rs.bwe_kbps, 0);
|
||||
assert_eq!(rs.loss_pct, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receiver_state_selects_high_on_good_link() {
|
||||
let mut rs = ReceiverState::new();
|
||||
let t0 = std::time::Instant::now();
|
||||
rs.update(4000, 0, t0);
|
||||
assert_eq!(rs.selected_layer, 2, ">3 Mbps + 0% loss → high layer immediately");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receiver_state_selects_mid_on_medium_link() {
|
||||
let mut rs = ReceiverState::new();
|
||||
let t0 = std::time::Instant::now();
|
||||
rs.update(1000, 5, t0);
|
||||
assert_eq!(rs.selected_layer, 1, ">750 kbps → mid layer immediately");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receiver_state_hysteresis_delays_switch() {
|
||||
let mut rs = ReceiverState::new();
|
||||
let t0 = std::time::Instant::now();
|
||||
// Start on high layer
|
||||
rs.update(4000, 0, t0);
|
||||
assert_eq!(rs.selected_layer, 2);
|
||||
|
||||
// Drop to low-bandwidth — should not switch immediately
|
||||
let t1 = t0 + std::time::Duration::from_millis(100);
|
||||
rs.update(100, 0, t1);
|
||||
assert_eq!(rs.selected_layer, 2, "hysteresis prevents immediate downgrade");
|
||||
|
||||
// After 3 s — switch should happen
|
||||
let t2 = t0 + std::time::Duration::from_millis(3100);
|
||||
rs.update(100, 0, t2);
|
||||
assert_eq!(rs.selected_layer, 0, "after 3 s hysteresis, downgrade occurs");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receiver_state_loss_blocks_high_layer() {
|
||||
let mut rs = ReceiverState::new();
|
||||
let t0 = std::time::Instant::now();
|
||||
// High BWE but high loss → mid, not high
|
||||
rs.update(4000, 5, t0);
|
||||
assert_eq!(rs.selected_layer, 1, "high loss blocks high layer");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn room_manager_selected_layer_defaults_to_zero() {
|
||||
let mgr = RoomManager::new();
|
||||
assert_eq!(mgr.selected_layer("room", 42), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn room_manager_updates_receiver_state() {
|
||||
let mgr = RoomManager::new();
|
||||
let now = std::time::Instant::now();
|
||||
mgr.update_receiver_state("room", 1, 4000, 0);
|
||||
// State is updated; we can verify via selected_layer
|
||||
assert_eq!(mgr.selected_layer("room", 1), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn room_manager_receiver_states_are_isolated_by_room() {
|
||||
let mgr = RoomManager::new();
|
||||
mgr.update_receiver_state("room-a", 1, 4000, 0);
|
||||
mgr.update_receiver_state("room-b", 1, 100, 0);
|
||||
assert_eq!(mgr.selected_layer("room-a", 1), 2);
|
||||
assert_eq!(mgr.selected_layer("room-b", 1), 0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user