T4.7: PLI suppression at SFU — 200 ms dedup window per (room, stream_id)

This commit is contained in:
Siavash Sameni
2026-05-12 11:04:14 +04:00
parent 828fbea2ea
commit 36b0421d68
5 changed files with 259 additions and 27 deletions

View File

@@ -2028,20 +2028,29 @@ async fn main() -> anyhow::Result<()> {
(None, None)
};
room::run_participant(
let media_handle = tokio::spawn(room::run_participant(
room_mgr.clone(),
room_name,
room_name.clone(),
participant_id,
transport.clone(),
metrics.clone(),
&session_id_str,
session_id_str.clone(),
trunking_enabled,
debug_tap,
federation_tx,
federation_room_hash,
authenticated_fp.is_some(),
)
.await;
));
let signal_handle = tokio::spawn(room::run_participant_signals(
room_mgr.clone(),
room_name.clone(),
participant_id,
transport.clone(),
));
tokio::select! {
_ = media_handle => {},
_ = signal_handle => {},
}
// Participant disconnected — clean up presence + per-session metrics
if let Some(ref fp) = authenticated_fp {

View File

@@ -11,7 +11,7 @@ use std::time::Duration;
use bytes::Bytes;
use dashmap::DashMap;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use wzp_proto::packet::TrunkFrame;
use wzp_proto::quality::{AdaptiveQualityController, Tier};
@@ -404,6 +404,11 @@ struct KeyframeBuffer {
total_bytes: usize,
}
/// Suppression window for PictureLossIndication per (room, stream_id).
struct PliState {
last_pli: std::time::Instant,
}
/// Manages all rooms on the relay.
///
/// Uses `DashMap` for per-room sharded locking -- rooms are independently
@@ -428,6 +433,11 @@ pub struct RoomManager {
keyframe_cache: DashMap<(String, ParticipantId, u8), KeyframeCacheEntry>,
/// Per `(room, sender, stream)` buffer for a keyframe currently being received.
keyframe_buffer: DashMap<(String, ParticipantId, u8), KeyframeBuffer>,
/// Per `(room, stream_id)` last PLI timestamp for suppression.
pli_state: DashMap<(String, ParticipantId, u8), PliState>,
/// Maps `(room, stream_id)` -> participant_id of the sender currently
/// publishing on that stream. Updated on every non-repair media packet.
stream_owner: DashMap<(String, u8), ParticipantId>,
}
impl RoomManager {
@@ -439,6 +449,8 @@ impl RoomManager {
event_tx,
keyframe_cache: DashMap::new(),
keyframe_buffer: DashMap::new(),
pli_state: DashMap::new(),
stream_owner: DashMap::new(),
}
}
@@ -451,6 +463,8 @@ impl RoomManager {
event_tx,
keyframe_cache: DashMap::new(),
keyframe_buffer: DashMap::new(),
pli_state: DashMap::new(),
stream_owner: DashMap::new(),
}
}
@@ -597,7 +611,7 @@ impl RoomManager {
drop(room); // release room lock
drop(arc); // release DashMap guard
self.rooms.remove(room_name);
self.clear_keyframes_for_room(room_name);
self.clear_room_state(room_name);
let _ = self.event_tx.send(RoomEvent::LocalLeave {
room: room_name.to_string(),
});
@@ -689,12 +703,40 @@ impl RoomManager {
.collect()
}
/// Remove all keyframe state for a room when it is closed.
fn clear_keyframes_for_room(&self, room_name: &str) {
/// Remove all per-room state when a room is closed.
fn clear_room_state(&self, room_name: &str) {
self.keyframe_cache
.retain(|k, _| k.0 != room_name);
self.keyframe_buffer
.retain(|k, _| k.0 != room_name);
self.pli_state
.retain(|k, _| k.0 != room_name);
self.stream_owner
.retain(|k, _| k.0 != room_name);
}
/// PLI suppression window (PRD-video-v1 T4.7).
const PLI_SUPPRESS_MS: u64 = 200;
/// Returns `true` if this PLI should be forwarded upstream.
///
/// Suppresses duplicate PLIs for the same `(room, sender, stream_id)`
/// within 200 ms. Looks up the current owner of `stream_id` in the room
/// and uses `(owner, stream)` as the suppression key.
pub fn should_forward_pli(&self, room_name: &str, stream_id: u8) -> Option<ParticipantId> {
let owner = self.stream_owner.get(&(room_name.to_string(), stream_id))?;
let sender_id = *owner;
drop(owner);
let key = (room_name.to_string(), sender_id, stream_id);
let now = std::time::Instant::now();
if let Some(entry) = self.pli_state.get(&key) {
let elapsed = entry.last_pli.elapsed().as_millis() as u64;
if elapsed < Self::PLI_SUPPRESS_MS {
return None;
}
}
self.pli_state.insert(key, PliState { last_pli: now });
Some(sender_id)
}
/// Get senders for all OTHER participants in a room.
@@ -848,6 +890,83 @@ impl TrunkedForwarder {
}
}
// ---------------------------------------------------------------------------
// Signal handling for room-mode participants
// ---------------------------------------------------------------------------
/// Receive signal loop for one participant in a room.
///
/// Currently handles `PictureLossIndication` suppression (T4.7): if multiple
/// receivers PLI the same stream within 200 ms, only the first is forwarded
/// upstream.
pub async fn run_participant_signals(
room_mgr: Arc<RoomManager>,
room_name: String,
participant_id: ParticipantId,
transport: Arc<wzp_transport::QuinnTransport>,
) {
let addr = transport.connection().remote_address();
info!(
room = %room_name,
participant = participant_id,
%addr,
"signal loop started"
);
loop {
match transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::PictureLossIndication { stream_id, .. })) => {
match room_mgr.should_forward_pli(&room_name, stream_id) {
Some(_target_id) => {
// Forward PLI to the specific sender that owns this stream.
let others = room_mgr.others(&room_name, participant_id);
for sender in &others {
if let ParticipantSender::Quic(t) = sender {
let msg = wzp_proto::SignalMessage::PictureLossIndication {
version: default_signal_version(),
stream_id,
};
if let Err(e) = t.send_signal(&msg).await {
warn!(
room = %room_name,
participant = participant_id,
peer = %t.connection().remote_address(),
"PLI forward error: {e}"
);
}
}
}
}
None => {
debug!(
room = %room_name,
participant = participant_id,
stream_id,
"PLI suppressed (within 200 ms window)"
);
}
}
}
Ok(Some(_other)) => {
// Other signals are not handled in room mode yet.
}
Ok(None) => {
info!(%addr, participant = participant_id, "signal stream closed");
break;
}
Err(e) => {
let msg = e.to_string();
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
info!(%addr, participant = participant_id, "signal connection closed: {e}");
} else {
error!(%addr, participant = participant_id, "signal recv error: {e}");
}
break;
}
}
}
}
// ---------------------------------------------------------------------------
// run_participant — the hot-path forwarding loop
// ---------------------------------------------------------------------------
@@ -864,7 +983,7 @@ pub async fn run_participant(
participant_id: ParticipantId,
transport: Arc<wzp_transport::QuinnTransport>,
metrics: Arc<RelayMetrics>,
session_id: &str,
session_id: String,
trunking_enabled: bool,
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
@@ -906,7 +1025,7 @@ async fn run_participant_plain(
participant_id: ParticipantId,
transport: Arc<wzp_transport::QuinnTransport>,
metrics: Arc<RelayMetrics>,
session_id: &str,
session_id: String,
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
federation_room_hash: Option<[u8; 8]>,
@@ -937,7 +1056,7 @@ async fn run_participant_plain(
room = %room_name,
participant = participant_id,
%addr,
session = session_id,
session = %session_id,
"forwarding loop started (plain)"
);
@@ -961,6 +1080,13 @@ async fn run_participant_plain(
// Cache keyframe packets for fast join-to-first-frame replay.
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
// Register this participant as the owner of this stream for PLI routing.
if !pkt.header.is_repair() {
room_mgr.stream_owner.insert(
(room_name.clone(), pkt.header.stream_id),
participant_id,
);
}
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
last_recv_instant = std::time::Instant::now();
@@ -996,7 +1122,7 @@ async fn run_participant_plain(
// Update per-session quality metrics if a quality report is present
if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report);
metrics.update_session_quality(&session_id, report);
}
// Get current list of other participants + check quality directive
@@ -1152,7 +1278,7 @@ async fn run_participant_trunked(
participant_id: ParticipantId,
transport: Arc<wzp_transport::QuinnTransport>,
metrics: Arc<RelayMetrics>,
session_id: &str,
session_id: String,
_is_authenticated: bool,
) {
use std::collections::HashMap;
@@ -1171,7 +1297,7 @@ async fn run_participant_trunked(
room = %room_name,
participant = participant_id,
%addr,
session = session_id,
session = %session_id,
"forwarding loop started (trunked)"
);
@@ -1181,7 +1307,7 @@ async fn run_participant_trunked(
let mut forwarders: HashMap<std::net::SocketAddr, TrunkedForwarder> = HashMap::new();
// Derive a 2-byte session tag from the session_id hex string.
let sid_bytes: [u8; 2] = parse_session_id_bytes(session_id);
let sid_bytes: [u8; 2] = parse_session_id_bytes(&session_id);
let mut flush_interval = tokio::time::interval(Duration::from_millis(5));
// Don't let missed ticks pile up — skip them and move on.
@@ -1206,6 +1332,13 @@ async fn run_participant_trunked(
// Cache keyframe packets for fast join-to-first-frame replay.
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
// Register this participant as the owner of this stream for PLI routing.
if !pkt.header.is_repair() {
room_mgr.stream_owner.insert(
(room_name.clone(), pkt.header.stream_id),
participant_id,
);
}
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
last_recv_instant = std::time::Instant::now();
@@ -1239,7 +1372,7 @@ async fn run_participant_trunked(
}
if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report);
metrics.update_session_quality(&session_id, report);
}
let lock_start = std::time::Instant::now();