From a08a37b5ebe73774d42ee85d60ec8df6fa1f886b Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Tue, 26 May 2026 07:18:22 +0400 Subject: [PATCH] fix(video): stabilize relay streams and remote rendering --- crates/wzp-client/src/analyzer.rs | 330 +++++++++++++++++++++++++++++- crates/wzp-relay/src/main.rs | 31 ++- crates/wzp-relay/src/room.rs | 201 +++++++++++++++++- desktop/src/main.ts | 52 ++++- 4 files changed, 594 insertions(+), 20 deletions(-) diff --git a/crates/wzp-client/src/analyzer.rs b/crates/wzp-client/src/analyzer.rs index 3f06319..fd1f4bd 100644 --- a/crates/wzp-client/src/analyzer.rs +++ b/crates/wzp-client/src/analyzer.rs @@ -15,7 +15,8 @@ use std::time::{Duration, Instant}; use clap::Parser; use tracing::info; -use wzp_proto::{CodecId, MediaPacket, MediaTransport, default_signal_version}; +use wzp_proto::{CodecId, MediaPacket, MediaTransport, MediaType, default_signal_version}; +use wzp_video::{VideoDecoder, create_video_decoder, transport::VideoReassembler}; // --------------------------------------------------------------------------- // CLI @@ -68,6 +69,14 @@ struct Args { // For now, header-only analysis provides loss%, jitter, codec stats. #[arg(long)] key: Option, + + /// Track video fragmentation, completed frames, keyframes, and decode health. + #[arg(long)] + video_probe: bool, + + /// Decode completed video frames in --video-probe mode. + #[arg(long)] + video_decode: bool, } // --------------------------------------------------------------------------- @@ -198,6 +207,295 @@ fn find_or_create_participant( id } +// --------------------------------------------------------------------------- +// Video probe +// --------------------------------------------------------------------------- + +#[derive(Default, Clone)] +struct PlaneSample { + min: u8, + max: u8, + mean: f64, +} + +#[derive(Default, Clone)] +struct I420Sample { + y: PlaneSample, + u: PlaneSample, + v: PlaneSample, + valid_i420: bool, +} + +struct VideoStreamProbe { + id: usize, + codec: CodecId, + wire_stream_id: u8, + packets: u64, + lost: u64, + last_seq: u32, + seq_initialized: bool, + frames: u64, + keyframes: u64, + bytes: u64, + max_frame_bytes: usize, + first_seen: Instant, + last_seen: Instant, + last_frame: Option, + reassembler: VideoReassembler, + decoder: Option>, + decode_ok: u64, + decode_pending: u64, + decode_err: u64, + last_decode_debug: Option, + last_i420_sample: Option, +} + +impl VideoStreamProbe { + fn new(id: usize, codec: CodecId, wire_stream_id: u8, decode: bool) -> Self { + let decoder = if decode { + create_video_decoder(codec, 1280, 720).ok() + } else { + None + }; + let now = Instant::now(); + Self { + id, + codec, + wire_stream_id, + packets: 0, + lost: 0, + last_seq: 0, + seq_initialized: false, + frames: 0, + keyframes: 0, + bytes: 0, + max_frame_bytes: 0, + first_seen: now, + last_seen: now, + last_frame: None, + reassembler: VideoReassembler::new(), + decoder, + decode_ok: 0, + decode_pending: 0, + decode_err: 0, + last_decode_debug: None, + last_i420_sample: None, + } + } + + fn ingest(&mut self, pkt: &MediaPacket, now: Instant) { + self.packets += 1; + self.last_seen = now; + if pkt.header.codec_id != self.codec { + self.codec = pkt.header.codec_id; + self.reassembler = VideoReassembler::new(); + self.decoder = self + .decoder + .is_some() + .then(|| create_video_decoder(self.codec, 1280, 720).ok()) + .flatten(); + } + if self.seq_initialized { + let expected = self.last_seq.wrapping_add(1); + let gap = pkt.header.seq.wrapping_sub(expected); + if gap > 0 && gap < 100 { + self.lost += gap as u64; + } + } + self.last_seq = pkt.header.seq; + self.seq_initialized = true; + + if let Some((codec, keyframe, frame)) = self.reassembler.push(pkt) { + self.frames += 1; + self.bytes += frame.len() as u64; + self.max_frame_bytes = self.max_frame_bytes.max(frame.len()); + self.last_frame = Some(now); + if keyframe { + self.keyframes += 1; + } + if codec != self.codec { + self.codec = codec; + } + if let Some(decoder) = self.decoder.as_mut() { + match decoder.decode(&frame) { + Ok(Some(decoded)) => { + self.decode_ok += 1; + self.last_decode_debug = decoder.debug_snapshot(); + self.last_i420_sample = + Some(sample_i420(&decoded.data, decoded.width, decoded.height)); + } + Ok(None) => { + self.decode_pending += 1; + self.last_decode_debug = decoder.debug_snapshot(); + } + Err(err) => { + self.decode_err += 1; + self.last_decode_debug = Some(err.to_string()); + } + } + } + } + } + + fn loss_percent(&self) -> f64 { + let total = self.packets + self.lost; + if total == 0 { + 0.0 + } else { + (self.lost as f64 / total as f64) * 100.0 + } + } + + fn avg_frame_bytes(&self) -> u64 { + if self.frames == 0 { + 0 + } else { + self.bytes / self.frames + } + } + + fn fps(&self) -> f64 { + let secs = self.last_seen.duration_since(self.first_seen).as_secs_f64(); + if secs <= 0.0 { + 0.0 + } else { + self.frames as f64 / secs + } + } +} + +struct VideoProbe { + streams: Vec, + decode: bool, +} + +impl VideoProbe { + fn new(decode: bool) -> Self { + Self { + streams: Vec::new(), + decode, + } + } + + fn ingest(&mut self, pkt: &MediaPacket, now: Instant) { + if pkt.header.media_type != MediaType::Video { + return; + } + let idx = self.find_or_create_stream(pkt); + self.streams[idx].ingest(pkt, now); + } + + fn find_or_create_stream(&mut self, pkt: &MediaPacket) -> usize { + for (i, s) in self.streams.iter().enumerate() { + if s.seq_initialized + && s.wire_stream_id == pkt.header.stream_id + && s.codec == pkt.header.codec_id + { + let delta = pkt.header.seq.wrapping_sub(s.last_seq); + if delta > 0 && delta < 80 { + return i; + } + } + } + let id = self.streams.len(); + self.streams.push(VideoStreamProbe::new( + id, + pkt.header.codec_id, + pkt.header.stream_id, + self.decode, + )); + id + } + + fn print(&self) { + if self.streams.is_empty() { + eprintln!(" video: no packets yet"); + return; + } + for s in &self.streams { + let age_ms = s + .last_frame + .map(|t| t.elapsed().as_millis() as u64) + .unwrap_or(u64::MAX); + let mut line = format!( + " video#{} wire_stream={} {:?}: {} pkts {:.1}% loss | {} frames ({:.1} fps), {} key, avg={}B max={}B, last_frame={}ms", + s.id, + s.wire_stream_id, + s.codec, + s.packets, + s.loss_percent(), + s.frames, + s.fps(), + s.keyframes, + s.avg_frame_bytes(), + s.max_frame_bytes, + if age_ms == u64::MAX { 0 } else { age_ms }, + ); + if s.decoder.is_some() || s.decode_ok > 0 || s.decode_err > 0 { + line.push_str(&format!( + " | dec ok={} pending={} err={}", + s.decode_ok, s.decode_pending, s.decode_err + )); + } + if let Some(sample) = &s.last_i420_sample { + line.push_str(&format!( + " | i420={} y={:.1}/{}/{} u={:.1}/{}/{} v={:.1}/{}/{}", + sample.valid_i420, + sample.y.mean, + sample.y.min, + sample.y.max, + sample.u.mean, + sample.u.min, + sample.u.max, + sample.v.mean, + sample.v.min, + sample.v.max, + )); + } + if let Some(debug) = &s.last_decode_debug { + line.push_str(&format!(" | {debug}")); + } + eprintln!("{line}"); + } + } +} + +fn sample_i420(data: &[u8], width: u32, height: u32) -> I420Sample { + let y_len = width as usize * height as usize; + let uv_len = y_len / 4; + if data.len() < y_len + uv_len * 2 { + return I420Sample { + valid_i420: false, + ..I420Sample::default() + }; + } + I420Sample { + valid_i420: true, + y: sample_plane(&data[..y_len]), + u: sample_plane(&data[y_len..y_len + uv_len]), + v: sample_plane(&data[y_len + uv_len..y_len + uv_len * 2]), + } +} + +fn sample_plane(data: &[u8]) -> PlaneSample { + if data.is_empty() { + return PlaneSample::default(); + } + let mut min = u8::MAX; + let mut max = u8::MIN; + let mut sum: u64 = 0; + for &b in data { + min = min.min(b); + max = max.max(b); + sum += b as u64; + } + PlaneSample { + min, + max, + mean: sum as f64 / data.len() as f64, + } +} + // --------------------------------------------------------------------------- // Capture writer (binary packet log for later replay) // --------------------------------------------------------------------------- @@ -580,6 +878,7 @@ async fn run_no_tui( total_packets: &mut u64, deadline: Option, mut capture_writer: Option<&mut CaptureWriter>, + mut video_probe: Option<&mut VideoProbe>, ) -> anyhow::Result<()> { let mut print_timer = Instant::now(); loop { @@ -594,6 +893,9 @@ async fn run_no_tui( let idx = find_or_create_participant(participants, pkt.header.seq, pkt.header.codec_id); participants[idx].ingest(&pkt, now); + if let Some(ref mut probe) = video_probe { + probe.ingest(&pkt, now); + } *total_packets += 1; if let Some(ref mut w) = capture_writer { w.write_packet(&pkt, now)?; @@ -608,6 +910,9 @@ async fn run_no_tui( } if print_timer.elapsed() >= Duration::from_secs(2) { print_stats(participants, *total_packets); + if let Some(ref probe) = video_probe { + probe.print(); + } print_timer = Instant::now(); } } @@ -616,7 +921,7 @@ async fn run_no_tui( fn print_stats(participants: &[ParticipantStats], total: u64) { eprintln!( - "--- {} participants | {} total packets ---", + "--- {} packet streams | {} total packets ---", participants.len(), total ); @@ -644,6 +949,7 @@ async fn run_tui( start_time: Instant, deadline: Option, mut capture_writer: Option<&mut CaptureWriter>, + mut video_probe: Option<&mut VideoProbe>, ) -> anyhow::Result<()> { crossterm::terminal::enable_raw_mode()?; let mut stdout = std::io::stdout(); @@ -684,6 +990,9 @@ async fn run_tui( pkt.header.codec_id, ); participants[idx].ingest(&pkt, now); + if let Some(ref mut probe) = video_probe { + probe.ingest(&pkt, now); + } *total_packets += 1; if let Some(ref mut w) = capture_writer { w.write_packet(&pkt, now)?; @@ -941,6 +1250,17 @@ async fn main() -> anyhow::Result<()> { let mut participants: Vec = Vec::new(); let mut total_packets: u64 = 0; let start_time = Instant::now(); + let mut video_probe = (args.video_probe || args.video_decode).then(|| { + eprintln!( + "Video probe enabled{}", + if args.video_decode { + " with decode" + } else { + "" + } + ); + VideoProbe::new(args.video_decode) + }); if args.no_tui { run_no_tui( @@ -949,6 +1269,7 @@ async fn main() -> anyhow::Result<()> { &mut total_packets, deadline, capture_writer.as_mut(), + video_probe.as_mut(), ) .await?; } else { @@ -959,12 +1280,17 @@ async fn main() -> anyhow::Result<()> { start_time, deadline, capture_writer.as_mut(), + video_probe.as_mut(), ) .await?; } // Print summary print_summary(&participants, total_packets, start_time.elapsed()); + if let Some(probe) = &video_probe { + eprintln!("\n=== Video Probe Summary ==="); + probe.print(); + } // Clean close transport.close().await?; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 9cf8baa..51fc87d 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -2028,7 +2028,7 @@ async fn main() -> anyhow::Result<()> { (None, None) }; - let media_handle = tokio::spawn(room::run_participant( + let mut media_handle = tokio::spawn(room::run_participant( room_mgr.clone(), room_name.clone(), participant_id, @@ -2041,15 +2041,38 @@ async fn main() -> anyhow::Result<()> { federation_room_hash, authenticated_fp.is_some(), )); - let signal_handle = tokio::spawn(room::run_participant_signals( + let mut signal_handle = tokio::spawn(room::run_participant_signals( room_mgr.clone(), room_name.clone(), participant_id, transport.clone(), )); tokio::select! { - _ = media_handle => {}, - _ = signal_handle => {}, + _ = &mut media_handle => { + signal_handle.abort(); + let _ = signal_handle.await; + }, + _ = &mut signal_handle => { + close_transport(&*transport, "signal-loop-ended").await; + match tokio::time::timeout(Duration::from_secs(2), &mut media_handle).await { + Ok(_) => {} + Err(_) => { + warn!( + %addr, + room = %room_name, + participant = participant_id, + "media loop did not exit after signal close; forcing room leave" + ); + media_handle.abort(); + let _ = media_handle.await; + if let Some((update, senders)) = + room_mgr.leave(&room_name, participant_id) + { + room::broadcast_signal(&senders, &update).await; + } + } + } + }, } // Participant disconnected — clean up presence + per-session metrics diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 010991d..cbfdf5e 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -354,6 +354,24 @@ fn next_id() -> ParticipantId { NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed) } +fn outbound_video_stream_id(participant_id: ParticipantId) -> u8 { + // Reserve stream 0 for the sender's local/simulcast layer id. Forwarded + // room video needs a sender-distinct stream id so receivers and analyzers + // do not merge independent H264 access-unit sequences. + ((participant_id.saturating_sub(1) % 250) + 1) as u8 +} + +fn with_outbound_video_stream_id( + pkt: &wzp_proto::MediaPacket, + participant_id: ParticipantId, +) -> wzp_proto::MediaPacket { + let mut out = pkt.clone(); + if out.header.media_type == wzp_proto::MediaType::Video { + out.header.stream_id = outbound_video_stream_id(participant_id); + } + out +} + /// Events emitted by RoomManager for federation to observe. #[derive(Clone, Debug)] pub enum RoomEvent { @@ -488,6 +506,25 @@ impl Room { ); } + fn remove_by_fingerprint(&mut self, fingerprint: &str) -> Vec { + let mut removed = Vec::new(); + self.participants.retain(|p| { + let matches = p.fingerprint.as_deref() == Some(fingerprint); + if matches { + removed.push(p.id); + } + !matches + }); + for id in &removed { + self.qualities.remove(id); + } + removed + } + + fn contains(&self, id: ParticipantId) -> bool { + self.participants.iter().any(|p| p.id == id) + } + fn others(&self, exclude_id: ParticipantId) -> Vec { self.participants .iter() @@ -682,6 +719,18 @@ impl RoomManager { .entry(room_name.to_string()) .or_insert_with(|| Arc::new(RwLock::new(Room::new()))); let mut room = arc.write().unwrap(); + if let Some(fp) = fingerprint { + let removed = room.remove_by_fingerprint(fp); + for old_id in removed { + warn!( + room = room_name, + participant = old_id, + fingerprint = fp, + "replacing existing participant with same fingerprint" + ); + self.clear_participant_state(room_name, old_id); + } + } let id = room.add( addr, sender, @@ -758,6 +807,7 @@ impl RoomManager { let mut room = arc.write().unwrap(); room.qualities.remove(&participant_id); room.remove(participant_id); + self.clear_participant_state(room_name, participant_id); if room.is_empty() { drop(room); // release room lock drop(arc); // release DashMap guard @@ -849,7 +899,14 @@ impl RoomManager { self.keyframe_cache .iter() .filter(|e| e.key().0 == room_name) - .map(|e| e.value().packets.clone()) + .map(|e| { + let sender_id = e.key().1; + e.value() + .packets + .iter() + .map(|pkt| with_outbound_video_stream_id(pkt, sender_id)) + .collect() + }) .collect() } @@ -859,6 +916,27 @@ impl RoomManager { self.keyframe_buffer.retain(|k, _| k.0 != room_name); self.pli_state.retain(|k, _| k.0 != room_name); self.stream_owner.retain(|k, _| k.0 != room_name); + self.receiver_states.retain(|k, _| k.0 != room_name); + } + + fn clear_participant_state(&self, room_name: &str, participant_id: ParticipantId) { + self.keyframe_cache + .retain(|k, _| !(k.0 == room_name && k.1 == participant_id)); + self.keyframe_buffer + .retain(|k, _| !(k.0 == room_name && k.1 == participant_id)); + self.pli_state + .retain(|k, _| !(k.0 == room_name && k.1 == participant_id)); + self.stream_owner + .retain(|k, owner| !(k.0 == room_name && *owner == participant_id)); + self.receiver_states + .retain(|k, _| !(k.0 == room_name && k.1 == participant_id)); + } + + pub fn contains_participant(&self, room_name: &str, participant_id: ParticipantId) -> bool { + self.rooms + .get(room_name) + .map(|arc| arc.read().unwrap().contains(participant_id)) + .unwrap_or(false) } /// PLI suppression window (PRD-video-v1 T4.7). @@ -1276,6 +1354,16 @@ async fn run_participant_plain( } }; + if !room_mgr.contains_participant(&room_name, participant_id) { + info!( + room = %room_name, + participant = participant_id, + forwarded = packets_forwarded, + "stale participant loop stopped" + ); + break; + } + // Cache keyframe packets for fast join-to-first-frame replay. room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt); // Register this participant as the owner of this stream for PLI routing. @@ -1283,6 +1371,12 @@ async fn run_participant_plain( room_mgr .stream_owner .insert((room_name.clone(), pkt.header.stream_id), participant_id); + if pkt.header.media_type == wzp_proto::MediaType::Video { + room_mgr.stream_owner.insert( + (room_name.clone(), outbound_video_stream_id(participant_id)), + participant_id, + ); + } } let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; @@ -1326,9 +1420,8 @@ async fn run_participant_plain( room = %room_name, participant = participant_id, seq = pkt.header.seq, - "VideoScorer: Abusive verdict — dropping packet" + "VideoScorer: Abusive verdict — observe-only" ); - continue; } } @@ -1419,7 +1512,12 @@ async fn run_participant_plain( } match other { ParticipantSender::Quic(t) => { - if let Err(e) = t.send_media(&pkt).await { + let outbound_pkt = if is_video { + with_outbound_video_stream_id(&pkt, participant_id) + } else { + pkt.clone() + }; + if let Err(e) = t.send_media(&outbound_pkt).await { send_errors += 1; if send_errors <= 5 || send_errors % 100 == 0 { warn!( @@ -1599,6 +1697,16 @@ async fn run_participant_trunked( } }; + if !room_mgr.contains_participant(&room_name, participant_id) { + info!( + room = %room_name, + participant = participant_id, + forwarded = packets_forwarded, + "stale participant loop stopped (trunked)" + ); + break; + } + // Cache keyframe packets for fast join-to-first-frame replay. room_mgr.update_keyframe_cache(&room_name, participant_id, &pkt); // Register this participant as the owner of this stream for PLI routing. @@ -1607,6 +1715,15 @@ async fn run_participant_trunked( (room_name.clone(), pkt.header.stream_id), participant_id, ); + if pkt.header.media_type == wzp_proto::MediaType::Video { + room_mgr.stream_owner.insert( + ( + room_name.clone(), + outbound_video_stream_id(participant_id), + ), + participant_id, + ); + } } let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; @@ -1649,9 +1766,8 @@ async fn run_participant_trunked( room = %room_name, participant = participant_id, seq = pkt.header.seq, - "VideoScorer: Abusive verdict — dropping packet (trunked)" + "VideoScorer: Abusive verdict — observe-only (trunked)" ); - continue; } } @@ -1735,7 +1851,12 @@ async fn run_participant_trunked( let fwd = forwarders .entry(peer_addr) .or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes)); - if let Err(e) = fwd.send(&pkt).await { + let outbound_pkt = if is_video { + with_outbound_video_stream_id(&pkt, participant_id) + } else { + pkt.clone() + }; + if let Err(e) = fwd.send(&outbound_pkt).await { send_errors += 1; if send_errors <= 5 || send_errors % 100 == 0 { warn!( @@ -1859,6 +1980,72 @@ mod tests { assert!(mgr.list().is_empty()); } + #[test] + fn join_replaces_existing_fingerprint_in_same_room() { + let mgr = RoomManager::new(); + let addr: std::net::SocketAddr = "127.0.0.1:10000".parse().unwrap(); + let (tx1, _rx1) = tokio::sync::mpsc::channel(1); + let (tx2, _rx2) = tokio::sync::mpsc::channel(1); + + let (first_id, _, _, _) = mgr + .join( + "room", + addr, + ParticipantSender::WebSocket(tx1), + Some("fp-a"), + Some("old"), + ) + .unwrap(); + let (second_id, update, _, _) = mgr + .join( + "room", + addr, + ParticipantSender::WebSocket(tx2), + Some("fp-a"), + Some("new"), + ) + .unwrap(); + + assert_ne!(first_id, second_id); + assert!(!mgr.contains_participant("room", first_id)); + assert!(mgr.contains_participant("room", second_id)); + assert_eq!(mgr.room_size("room"), 1); + if let wzp_proto::SignalMessage::RoomUpdate { + count, + participants, + .. + } = update + { + assert_eq!(count, 1); + assert_eq!(participants[0].fingerprint, "fp-a"); + assert_eq!(participants[0].alias.as_deref(), Some("new")); + } else { + panic!("expected RoomUpdate"); + } + } + + #[test] + fn outbound_video_stream_ids_are_sender_distinct_and_nonzero() { + assert_eq!(outbound_video_stream_id(1), 1); + assert_eq!(outbound_video_stream_id(2), 2); + assert_eq!(outbound_video_stream_id(250), 250); + assert_eq!(outbound_video_stream_id(251), 1); + } + + #[test] + fn rewrite_only_changes_video_stream_id() { + let mut video = make_test_packet(b"video"); + video.header.media_type = wzp_proto::MediaType::Video; + video.header.stream_id = 0; + let rewritten = with_outbound_video_stream_id(&video, 42); + assert_eq!(rewritten.header.stream_id, 42); + assert_eq!(video.header.stream_id, 0); + + let audio = make_test_packet(b"audio"); + let rewritten_audio = with_outbound_video_stream_id(&audio, 42); + assert_eq!(rewritten_audio.header.stream_id, audio.header.stream_id); + } + #[test] fn acl_open_mode_allows_all() { let mgr = RoomManager::new(); diff --git a/desktop/src/main.ts b/desktop/src/main.ts index 782ebf2..7e48623 100644 --- a/desktop/src/main.ts +++ b/desktop/src/main.ts @@ -607,6 +607,45 @@ const vdRemotePlaceholder = document.getElementById("vd-remote-placeholder")!; const vdRemoteCounter = document.getElementById("vd-remote-counter")!; let remoteFrameCount = 0; let remoteFrameSerial = 0; +let remoteDrawInFlight = false; +let remotePendingFrame: { serial: number; width: number; height: number; jpeg_b64: string } | null = null; + +async function drawRemoteFrame(frame: { serial: number; width: number; height: number; jpeg_b64: string }) { + const img = new Image(); + img.src = `data:image/jpeg;base64,${frame.jpeg_b64}`; + if ("decode" in img) { + await img.decode(); + } else { + await new Promise((resolve, reject) => { + img.onload = () => resolve(); + img.onerror = () => reject(new Error("remote video image decode failed")); + }); + } + + if (frame.serial !== remoteFrameSerial) return; + if (vdRemoteVideo.width !== frame.width) vdRemoteVideo.width = frame.width; + if (vdRemoteVideo.height !== frame.height) vdRemoteVideo.height = frame.height; + remoteCtx.drawImage(img, 0, 0, vdRemoteVideo.width, vdRemoteVideo.height); +} + +async function pumpRemoteVideoFrames() { + if (remoteDrawInFlight) return; + remoteDrawInFlight = true; + try { + while (remotePendingFrame) { + const frame = remotePendingFrame; + remotePendingFrame = null; + try { + await drawRemoteFrame(frame); + } catch (e) { + console.warn("remote video draw failed:", e); + } + } + } finally { + remoteDrawInFlight = false; + if (remotePendingFrame) void pumpRemoteVideoFrames(); + } +} listen("video:frame", (event: any) => { const { width, height, jpeg_b64 } = event.payload; @@ -616,17 +655,16 @@ listen("video:frame", (event: any) => { remoteVideoActive = true; vdVideoStrip.classList.remove("hidden"); vdRemotePlaceholder.classList.add("hidden"); - vdRemoteVideo.width = width ?? vdRemoteVideo.width; - vdRemoteVideo.height = height ?? vdRemoteVideo.height; remoteFrameCount++; if (remoteFrameCount === 1) console.log("first remote video frame:", width, "x", height); - const img = new Image(); - img.onload = () => { - if (frameSerial !== remoteFrameSerial) return; - remoteCtx.drawImage(img, 0, 0, vdRemoteVideo.width, vdRemoteVideo.height); + remotePendingFrame = { + serial: frameSerial, + width: width ?? vdRemoteVideo.width, + height: height ?? vdRemoteVideo.height, + jpeg_b64, }; - img.src = `data:image/jpeg;base64,${jpeg_b64}`; + void pumpRemoteVideoFrames(); }); // ── Poll status ───────────────────────────────────────────────────