fix(video): stabilize relay streams and remote rendering
Some checks failed
Mirror to GitHub / mirror (push) Failing after 31s
Build Release Binaries / build-amd64 (push) Failing after 3m2s

This commit is contained in:
Siavash Sameni
2026-05-26 07:18:22 +04:00
parent f6ace54556
commit a08a37b5eb
4 changed files with 594 additions and 20 deletions

View File

@@ -2028,7 +2028,7 @@ async fn main() -> anyhow::Result<()> {
(None, None)
};
let media_handle = tokio::spawn(room::run_participant(
let mut media_handle = tokio::spawn(room::run_participant(
room_mgr.clone(),
room_name.clone(),
participant_id,
@@ -2041,15 +2041,38 @@ async fn main() -> anyhow::Result<()> {
federation_room_hash,
authenticated_fp.is_some(),
));
let signal_handle = tokio::spawn(room::run_participant_signals(
let mut signal_handle = tokio::spawn(room::run_participant_signals(
room_mgr.clone(),
room_name.clone(),
participant_id,
transport.clone(),
));
tokio::select! {
_ = media_handle => {},
_ = signal_handle => {},
_ = &mut media_handle => {
signal_handle.abort();
let _ = signal_handle.await;
},
_ = &mut signal_handle => {
close_transport(&*transport, "signal-loop-ended").await;
match tokio::time::timeout(Duration::from_secs(2), &mut media_handle).await {
Ok(_) => {}
Err(_) => {
warn!(
%addr,
room = %room_name,
participant = participant_id,
"media loop did not exit after signal close; forcing room leave"
);
media_handle.abort();
let _ = media_handle.await;
if let Some((update, senders)) =
room_mgr.leave(&room_name, participant_id)
{
room::broadcast_signal(&senders, &update).await;
}
}
}
},
}
// Participant disconnected — clean up presence + per-session metrics

View File

@@ -354,6 +354,24 @@ fn next_id() -> ParticipantId {
NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed)
}
fn outbound_video_stream_id(participant_id: ParticipantId) -> u8 {
// Reserve stream 0 for the sender's local/simulcast layer id. Forwarded
// room video needs a sender-distinct stream id so receivers and analyzers
// do not merge independent H264 access-unit sequences.
((participant_id.saturating_sub(1) % 250) + 1) as u8
}
fn with_outbound_video_stream_id(
pkt: &wzp_proto::MediaPacket,
participant_id: ParticipantId,
) -> wzp_proto::MediaPacket {
let mut out = pkt.clone();
if out.header.media_type == wzp_proto::MediaType::Video {
out.header.stream_id = outbound_video_stream_id(participant_id);
}
out
}
/// Events emitted by RoomManager for federation to observe.
#[derive(Clone, Debug)]
pub enum RoomEvent {
@@ -488,6 +506,25 @@ impl Room {
);
}
fn remove_by_fingerprint(&mut self, fingerprint: &str) -> Vec<ParticipantId> {
let mut removed = Vec::new();
self.participants.retain(|p| {
let matches = p.fingerprint.as_deref() == Some(fingerprint);
if matches {
removed.push(p.id);
}
!matches
});
for id in &removed {
self.qualities.remove(id);
}
removed
}
fn contains(&self, id: ParticipantId) -> bool {
self.participants.iter().any(|p| p.id == id)
}
fn others(&self, exclude_id: ParticipantId) -> Vec<ParticipantSender> {
self.participants
.iter()
@@ -682,6 +719,18 @@ impl RoomManager {
.entry(room_name.to_string())
.or_insert_with(|| Arc::new(RwLock::new(Room::new())));
let mut room = arc.write().unwrap();
if let Some(fp) = fingerprint {
let removed = room.remove_by_fingerprint(fp);
for old_id in removed {
warn!(
room = room_name,
participant = old_id,
fingerprint = fp,
"replacing existing participant with same fingerprint"
);
self.clear_participant_state(room_name, old_id);
}
}
let id = room.add(
addr,
sender,
@@ -758,6 +807,7 @@ impl RoomManager {
let mut room = arc.write().unwrap();
room.qualities.remove(&participant_id);
room.remove(participant_id);
self.clear_participant_state(room_name, participant_id);
if room.is_empty() {
drop(room); // release room lock
drop(arc); // release DashMap guard
@@ -849,7 +899,14 @@ impl RoomManager {
self.keyframe_cache
.iter()
.filter(|e| e.key().0 == room_name)
.map(|e| e.value().packets.clone())
.map(|e| {
let sender_id = e.key().1;
e.value()
.packets
.iter()
.map(|pkt| with_outbound_video_stream_id(pkt, sender_id))
.collect()
})
.collect()
}
@@ -859,6 +916,27 @@ impl RoomManager {
self.keyframe_buffer.retain(|k, _| k.0 != room_name);
self.pli_state.retain(|k, _| k.0 != room_name);
self.stream_owner.retain(|k, _| k.0 != room_name);
self.receiver_states.retain(|k, _| k.0 != room_name);
}
fn clear_participant_state(&self, room_name: &str, participant_id: ParticipantId) {
self.keyframe_cache
.retain(|k, _| !(k.0 == room_name && k.1 == participant_id));
self.keyframe_buffer
.retain(|k, _| !(k.0 == room_name && k.1 == participant_id));
self.pli_state
.retain(|k, _| !(k.0 == room_name && k.1 == participant_id));
self.stream_owner
.retain(|k, owner| !(k.0 == room_name && *owner == participant_id));
self.receiver_states
.retain(|k, _| !(k.0 == room_name && k.1 == participant_id));
}
pub fn contains_participant(&self, room_name: &str, participant_id: ParticipantId) -> bool {
self.rooms
.get(room_name)
.map(|arc| arc.read().unwrap().contains(participant_id))
.unwrap_or(false)
}
/// PLI suppression window (PRD-video-v1 T4.7).
@@ -1276,6 +1354,16 @@ async fn run_participant_plain(
}
};
if !room_mgr.contains_participant(&room_name, participant_id) {
info!(
room = %room_name,
participant = participant_id,
forwarded = packets_forwarded,
"stale participant loop stopped"
);
break;
}
// Cache keyframe packets for fast join-to-first-frame replay.
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
// Register this participant as the owner of this stream for PLI routing.
@@ -1283,6 +1371,12 @@ async fn run_participant_plain(
room_mgr
.stream_owner
.insert((room_name.clone(), pkt.header.stream_id), participant_id);
if pkt.header.media_type == wzp_proto::MediaType::Video {
room_mgr.stream_owner.insert(
(room_name.clone(), outbound_video_stream_id(participant_id)),
participant_id,
);
}
}
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
@@ -1326,9 +1420,8 @@ async fn run_participant_plain(
room = %room_name,
participant = participant_id,
seq = pkt.header.seq,
"VideoScorer: Abusive verdict — dropping packet"
"VideoScorer: Abusive verdict — observe-only"
);
continue;
}
}
@@ -1419,7 +1512,12 @@ async fn run_participant_plain(
}
match other {
ParticipantSender::Quic(t) => {
if let Err(e) = t.send_media(&pkt).await {
let outbound_pkt = if is_video {
with_outbound_video_stream_id(&pkt, participant_id)
} else {
pkt.clone()
};
if let Err(e) = t.send_media(&outbound_pkt).await {
send_errors += 1;
if send_errors <= 5 || send_errors % 100 == 0 {
warn!(
@@ -1599,6 +1697,16 @@ async fn run_participant_trunked(
}
};
if !room_mgr.contains_participant(&room_name, participant_id) {
info!(
room = %room_name,
participant = participant_id,
forwarded = packets_forwarded,
"stale participant loop stopped (trunked)"
);
break;
}
// Cache keyframe packets for fast join-to-first-frame replay.
room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt);
// Register this participant as the owner of this stream for PLI routing.
@@ -1607,6 +1715,15 @@ async fn run_participant_trunked(
(room_name.clone(), pkt.header.stream_id),
participant_id,
);
if pkt.header.media_type == wzp_proto::MediaType::Video {
room_mgr.stream_owner.insert(
(
room_name.clone(),
outbound_video_stream_id(participant_id),
),
participant_id,
);
}
}
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
@@ -1649,9 +1766,8 @@ async fn run_participant_trunked(
room = %room_name,
participant = participant_id,
seq = pkt.header.seq,
"VideoScorer: Abusive verdict — dropping packet (trunked)"
"VideoScorer: Abusive verdict — observe-only (trunked)"
);
continue;
}
}
@@ -1735,7 +1851,12 @@ async fn run_participant_trunked(
let fwd = forwarders
.entry(peer_addr)
.or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes));
if let Err(e) = fwd.send(&pkt).await {
let outbound_pkt = if is_video {
with_outbound_video_stream_id(&pkt, participant_id)
} else {
pkt.clone()
};
if let Err(e) = fwd.send(&outbound_pkt).await {
send_errors += 1;
if send_errors <= 5 || send_errors % 100 == 0 {
warn!(
@@ -1859,6 +1980,72 @@ mod tests {
assert!(mgr.list().is_empty());
}
#[test]
fn join_replaces_existing_fingerprint_in_same_room() {
let mgr = RoomManager::new();
let addr: std::net::SocketAddr = "127.0.0.1:10000".parse().unwrap();
let (tx1, _rx1) = tokio::sync::mpsc::channel(1);
let (tx2, _rx2) = tokio::sync::mpsc::channel(1);
let (first_id, _, _, _) = mgr
.join(
"room",
addr,
ParticipantSender::WebSocket(tx1),
Some("fp-a"),
Some("old"),
)
.unwrap();
let (second_id, update, _, _) = mgr
.join(
"room",
addr,
ParticipantSender::WebSocket(tx2),
Some("fp-a"),
Some("new"),
)
.unwrap();
assert_ne!(first_id, second_id);
assert!(!mgr.contains_participant("room", first_id));
assert!(mgr.contains_participant("room", second_id));
assert_eq!(mgr.room_size("room"), 1);
if let wzp_proto::SignalMessage::RoomUpdate {
count,
participants,
..
} = update
{
assert_eq!(count, 1);
assert_eq!(participants[0].fingerprint, "fp-a");
assert_eq!(participants[0].alias.as_deref(), Some("new"));
} else {
panic!("expected RoomUpdate");
}
}
#[test]
fn outbound_video_stream_ids_are_sender_distinct_and_nonzero() {
assert_eq!(outbound_video_stream_id(1), 1);
assert_eq!(outbound_video_stream_id(2), 2);
assert_eq!(outbound_video_stream_id(250), 250);
assert_eq!(outbound_video_stream_id(251), 1);
}
#[test]
fn rewrite_only_changes_video_stream_id() {
let mut video = make_test_packet(b"video");
video.header.media_type = wzp_proto::MediaType::Video;
video.header.stream_id = 0;
let rewritten = with_outbound_video_stream_id(&video, 42);
assert_eq!(rewritten.header.stream_id, 42);
assert_eq!(video.header.stream_id, 0);
let audio = make_test_packet(b"audio");
let rewritten_audio = with_outbound_video_stream_id(&audio, 42);
assert_eq!(rewritten_audio.header.stream_id, audio.header.stream_id);
}
#[test]
fn acl_open_mode_allows_all() {
let mgr = RoomManager::new();