From 079e21e1741305e4a7999a5525698247c5b1130d Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Tue, 26 May 2026 09:16:02 +0400 Subject: [PATCH] fix(video): resync decoder after packet gaps --- desktop/src-tauri/src/engine.rs | 117 +++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 2 deletions(-) diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index a3b9a72..af7b1eb 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -182,6 +182,51 @@ fn should_log_video_sample(frame_no: u64, is_keyframe: bool) -> bool { frame_no <= 5 || is_keyframe || frame_no % 30 == 0 } +const VIDEO_KEYFRAME_INTERVAL_FRAMES: u32 = 30; + +#[derive(Default)] +struct VideoContinuity { + expected_seq: Option, + wait_for_keyframe: bool, + gaps: u64, + dropped_frames: u64, +} + +impl VideoContinuity { + fn observe_packet(&mut self, seq: u32) -> Option { + let Some(expected) = self.expected_seq else { + self.expected_seq = Some(seq.wrapping_add(1)); + return None; + }; + + let delta = seq.wrapping_sub(expected); + if delta >= 10_000 { + // Late/reordered packet from before the current expectation. + return None; + } + + self.expected_seq = Some(seq.wrapping_add(1)); + let gap = (delta > 0).then_some(delta); + if gap.is_some() { + self.gaps += 1; + self.wait_for_keyframe = true; + } + gap + } + + fn should_decode(&mut self, is_keyframe: bool) -> bool { + if is_keyframe { + self.wait_for_keyframe = false; + return true; + } + if self.wait_for_keyframe { + self.dropped_frames += 1; + return false; + } + true + } +} + fn is_startup_black_i420(data: &[u8], width: u32, height: u32) -> bool { let y_size = width as usize * height as usize; let uv_size = y_size / 4; @@ -1294,6 +1339,7 @@ impl CallEngine { let mut video_first_decoded_logged = false; let mut video_decoded_samples: u64 = 0; let mut video_decoder_buffering_count: u64 = 0; + let mut video_continuity = VideoContinuity::default(); loop { if !recv_r.load(Ordering::Relaxed) { @@ -1310,6 +1356,20 @@ impl CallEngine { // a JPEG-encoded frame to the WebView. Done before audio path so // we don't drop into the audio decoder branches. if pkt.header.media_type == wzp_proto::MediaType::Video { + if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) { + crate::emit_call_debug( + &recv_app, + "video:seq_gap", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "seq": pkt.header.seq, + "gap": gap, + "gaps": video_continuity.gaps, + "stream_id": pkt.header.stream_id, + "platform": "android", + }), + ); + } if !video_first_recv_logged { video_first_recv_logged = true; crate::emit_call_debug( @@ -1356,6 +1416,25 @@ impl CallEngine { }), ); } + if !video_continuity.should_decode(is_kf) { + if video_continuity.dropped_frames <= 5 + || video_continuity.dropped_frames % 30 == 0 + { + crate::emit_call_debug( + &recv_app, + "video:drop_until_keyframe", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "codec": format!("{:?}", codec_id), + "frame_no": video_reassembled_samples, + "dropped_frames": video_continuity.dropped_frames, + "platform": "android", + }), + ); + } + video_reassembler.evict_stale(pkt.header.timestamp, 5_000); + continue; + } if video_decoder_codec != Some(codec_id) { crate::emit_call_debug( &recv_app, @@ -2054,7 +2133,7 @@ impl CallEngine { continue; } - if frames_since_keyframe >= 150 { + if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { encoder.request_keyframe(); crate::emit_call_debug( &vid_app, @@ -2635,6 +2714,7 @@ impl CallEngine { let mut video_first_decoded_logged = false; let mut video_decoded_samples: u64 = 0; let mut video_decoder_buffering_count: u64 = 0; + let mut video_continuity = VideoContinuity::default(); let mut decoded_frames: u64 = 0; let mut decode_errs: u64 = 0; let mut last_written: usize = 0; @@ -2656,6 +2736,20 @@ impl CallEngine { Ok(Ok(Some(pkt))) => { // Route video packets to the reassembler before any audio processing. if pkt.header.media_type == wzp_proto::MediaType::Video { + if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) { + crate::emit_call_debug( + &recv_app, + "video:seq_gap", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "seq": pkt.header.seq, + "gap": gap, + "gaps": video_continuity.gaps, + "stream_id": pkt.header.stream_id, + "platform": "desktop", + }), + ); + } if !video_first_recv_logged_desktop { video_first_recv_logged_desktop = true; crate::emit_call_debug( @@ -2702,6 +2796,25 @@ impl CallEngine { }), ); } + if !video_continuity.should_decode(is_kf) { + if video_continuity.dropped_frames <= 5 + || video_continuity.dropped_frames % 30 == 0 + { + crate::emit_call_debug( + &recv_app, + "video:drop_until_keyframe", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "codec": format!("{:?}", codec_id), + "frame_no": video_reassembled_samples, + "dropped_frames": video_continuity.dropped_frames, + "platform": "desktop", + }), + ); + } + video_reassembler.evict_stale(pkt.header.timestamp, 5_000); + continue; + } // Lazy-init or switch decoder on codec change. if video_decoder_codec != Some(codec_id) { crate::emit_call_debug( @@ -3246,7 +3359,7 @@ impl CallEngine { continue; } - if frames_since_keyframe >= 150 { + if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { encoder.request_keyframe(); crate::emit_call_debug( &vid_app,