T4.6: SFU keyframe cache — per-(room,sender,stream) I-frame replay on join

This commit is contained in:
Siavash Sameni
2026-05-12 10:54:04 +04:00
parent cc5aef2534
commit 828fbea2ea
5 changed files with 239 additions and 12 deletions

View File

@@ -1936,9 +1936,21 @@ async fn main() -> anyhow::Result<()> {
Some(&participant_fp),
caller_alias.as_deref(),
) {
Ok((id, update, senders)) => {
Ok((id, update, senders, cached_keyframes)) => {
metrics.active_rooms.set(room_mgr.list().len() as i64);
// Replay cached keyframes to the new participant before live
// traffic starts. This eliminates black-screen-on-join when
// the cache is warm.
for kf in cached_keyframes {
for pkt in kf {
if let Err(e) = transport.send_media(&pkt).await {
warn!(%addr, participant = id, "keyframe replay send error: {e}");
break;
}
}
}
// Merge federated participants into RoomUpdate if this is a global room
let merged_update = if let Some(ref fm) = federation_mgr {
if fm.is_global_room(&room_name) {

View File

@@ -383,6 +383,27 @@ impl Room {
}
}
/// Maximum bytes to cache per `(room, sender, stream)` keyframe.
const KEYFRAME_CACHE_MAX_BYTES: usize = 200_000;
/// Cached complete keyframe for fast join-to-first-frame replay.
#[derive(Clone)]
#[allow(dead_code)]
struct KeyframeCacheEntry {
packets: Vec<wzp_proto::MediaPacket>,
sequence_first: u32,
timestamp_ms: u32,
total_bytes: usize,
}
/// In-progress keyframe buffer while accumulating packets.
struct KeyframeBuffer {
packets: Vec<wzp_proto::MediaPacket>,
sequence_first: u32,
timestamp_ms: u32,
total_bytes: usize,
}
/// Manages all rooms on the relay.
///
/// Uses `DashMap` for per-room sharded locking -- rooms are independently
@@ -403,6 +424,10 @@ pub struct RoomManager {
acl: Option<std::sync::Mutex<HashMap<String, HashSet<String>>>>,
/// Channel for room lifecycle events (federation subscribes).
event_tx: tokio::sync::broadcast::Sender<RoomEvent>,
/// Per `(room, sender, stream)` cache of the most recent complete keyframe.
keyframe_cache: DashMap<(String, ParticipantId, u8), KeyframeCacheEntry>,
/// Per `(room, sender, stream)` buffer for a keyframe currently being received.
keyframe_buffer: DashMap<(String, ParticipantId, u8), KeyframeBuffer>,
}
impl RoomManager {
@@ -412,6 +437,8 @@ impl RoomManager {
rooms: DashMap::new(),
acl: None,
event_tx,
keyframe_cache: DashMap::new(),
keyframe_buffer: DashMap::new(),
}
}
@@ -422,6 +449,8 @@ impl RoomManager {
rooms: DashMap::new(),
acl: Some(std::sync::Mutex::new(HashMap::new())),
event_tx,
keyframe_cache: DashMap::new(),
keyframe_buffer: DashMap::new(),
}
}
@@ -458,7 +487,7 @@ impl RoomManager {
}
}
/// Join a room. Returns (participant_id, room_update_msg, all_senders) for broadcasting.
/// Join a room. Returns (participant_id, room_update_msg, all_senders, cached_keyframes) for broadcasting.
pub fn join(
&self,
room_name: &str,
@@ -471,6 +500,7 @@ impl RoomManager {
ParticipantId,
wzp_proto::SignalMessage,
Vec<ParticipantSender>,
Vec<Vec<wzp_proto::MediaPacket>>,
),
String,
> {
@@ -506,7 +536,8 @@ impl RoomManager {
room: room_name.to_string(),
});
}
Ok((id, update, senders))
let keyframes = self.cached_keyframes_for_room(room_name);
Ok((id, update, senders, keyframes))
}
/// Join a room via WebSocket. Convenience wrapper around `join()`.
@@ -517,7 +548,7 @@ impl RoomManager {
sender: tokio::sync::mpsc::Sender<Bytes>,
fingerprint: Option<&str>,
) -> Result<ParticipantId, String> {
let (id, _update, _senders) = self.join(
let (id, _update, _senders, _keyframes) = self.join(
room_name,
addr,
ParticipantSender::WebSocket(sender),
@@ -566,6 +597,7 @@ impl RoomManager {
drop(room); // release room lock
drop(arc); // release DashMap guard
self.rooms.remove(room_name);
self.clear_keyframes_for_room(room_name);
let _ = self.event_tx.send(RoomEvent::LocalLeave {
room: room_name.to_string(),
});
@@ -586,6 +618,85 @@ impl RoomManager {
result
}
/// Update the keyframe cache from an incoming media packet.
///
/// Called from the forwarding hot-path. If the packet belongs to a
/// keyframe we buffer it; when the frame-end flag arrives we store the
/// complete keyframe. Non-keyframe packets flush any stale partial buffer.
pub fn update_keyframe_cache(
&self,
room_name: &str,
sender_id: ParticipantId,
pkt: &wzp_proto::MediaPacket,
) {
let h = &pkt.header;
if h.is_repair() {
// Never cache repair packets.
return;
}
let key = (room_name.to_string(), sender_id, h.stream_id);
if h.is_keyframe() {
let mut entry = self.keyframe_buffer.entry(key.clone()).or_insert_with(|| KeyframeBuffer {
packets: Vec::new(),
sequence_first: h.seq,
timestamp_ms: h.timestamp,
total_bytes: 0,
});
let pkt_bytes = pkt.payload.len();
// If this would overflow the per-stream cap, drop the partial buffer
// and start fresh.
if entry.total_bytes + pkt_bytes > KEYFRAME_CACHE_MAX_BYTES {
entry.packets.clear();
entry.total_bytes = 0;
entry.sequence_first = h.seq;
entry.timestamp_ms = h.timestamp;
}
entry.packets.push(pkt.clone());
entry.total_bytes += pkt_bytes;
if h.is_frame_end() {
let completed = KeyframeCacheEntry {
packets: std::mem::take(&mut entry.packets),
sequence_first: entry.sequence_first,
timestamp_ms: entry.timestamp_ms,
total_bytes: entry.total_bytes,
};
self.keyframe_cache
.insert(key.clone(), completed);
entry.total_bytes = 0;
}
} else {
// Non-keyframe packet: discard any partial buffer for this stream.
self.keyframe_buffer.remove(&key);
}
}
/// Return a copy of all completed keyframes for a given room.
///
/// Used to replay keyframes to a newly-joined participant before live
/// forwarding starts.
pub fn cached_keyframes_for_room(
&self,
room_name: &str,
) -> Vec<Vec<wzp_proto::MediaPacket>> {
self.keyframe_cache
.iter()
.filter(|e| e.key().0 == room_name)
.map(|e| e.value().packets.clone())
.collect()
}
/// Remove all keyframe state for a room when it is closed.
fn clear_keyframes_for_room(&self, room_name: &str) {
self.keyframe_cache
.retain(|k, _| k.0 != room_name);
self.keyframe_buffer
.retain(|k, _| k.0 != room_name);
}
/// Get senders for all OTHER participants in a room.
pub fn others(&self, room_name: &str, participant_id: ParticipantId) -> Vec<ParticipantSender> {
self.rooms
@@ -848,6 +959,9 @@ async fn run_participant_plain(
}
};
// Cache keyframe packets for fast join-to-first-frame replay.
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
last_recv_instant = std::time::Instant::now();
if recv_gap_ms > max_recv_gap_ms {
@@ -1090,6 +1204,9 @@ async fn run_participant_trunked(
}
};
// Cache keyframe packets for fast join-to-first-frame replay.
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
last_recv_instant = std::time::Instant::now();
if recv_gap_ms > max_recv_gap_ms {