diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index af7b1eb..df75108 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -182,12 +182,14 @@ fn should_log_video_sample(frame_no: u64, is_keyframe: bool) -> bool { frame_no <= 5 || is_keyframe || frame_no % 30 == 0 } -const VIDEO_KEYFRAME_INTERVAL_FRAMES: u32 = 30; +const VIDEO_KEYFRAME_INTERVAL_FRAMES: u32 = 120; +const VIDEO_PLI_MIN_INTERVAL_MS: u128 = 250; #[derive(Default)] struct VideoContinuity { expected_seq: Option, wait_for_keyframe: bool, + last_pli_sent: Option, gaps: u64, dropped_frames: u64, } @@ -225,6 +227,18 @@ impl VideoContinuity { } true } + + fn should_send_pli(&mut self) -> bool { + let now = Instant::now(); + if self + .last_pli_sent + .is_none_or(|last| now.duration_since(last).as_millis() >= VIDEO_PLI_MIN_INTERVAL_MS) + { + self.last_pli_sent = Some(now); + return true; + } + false + } } fn is_startup_black_i420(data: &[u8], width: u32, height: u32) -> bool { @@ -312,6 +326,7 @@ async fn run_signal_task( app: tauri::AppHandle, transport: Arc, running: Arc, + force_video_keyframe: Arc, pending_profile: Arc, participants: Arc>>, event_cb: Arc, @@ -382,6 +397,17 @@ async fn run_signal_task( ); pending_profile.store(idx, Ordering::Release); } + Ok(Ok(Some(wzp_proto::SignalMessage::PictureLossIndication { + stream_id, + .. + }))) => { + force_video_keyframe.store(true, Ordering::Release); + crate::emit_call_debug( + &app, + "video:pli_recv", + serde_json::json!({ "stream_id": stream_id }), + ); + } Ok(Ok(Some(_))) => {} Ok(Ok(None)) => break, Ok(Err(_)) => break, @@ -995,6 +1021,7 @@ impl CallEngine { let audio_level = Arc::new(AtomicU32::new(0)); let tx_codec = Arc::new(Mutex::new(String::new())); let rx_codec = Arc::new(Mutex::new(String::new())); + let force_video_keyframe = Arc::new(AtomicBool::new(false)); // Adaptive quality: shared pending-profile bridge between recv → send. let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE)); @@ -1357,6 +1384,7 @@ impl CallEngine { // we don't drop into the audio decoder branches. if pkt.header.media_type == wzp_proto::MediaType::Video { if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) { + let stream_id = pkt.header.stream_id; crate::emit_call_debug( &recv_app, "video:seq_gap", @@ -1365,10 +1393,38 @@ impl CallEngine { "seq": pkt.header.seq, "gap": gap, "gaps": video_continuity.gaps, - "stream_id": pkt.header.stream_id, + "stream_id": stream_id, "platform": "android", }), ); + if video_continuity.should_send_pli() { + let pli = wzp_proto::SignalMessage::PictureLossIndication { + version: wzp_proto::default_signal_version(), + stream_id, + }; + match recv_t.send_signal(&pli).await { + Ok(()) => crate::emit_call_debug( + &recv_app, + "video:pli_sent", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "stream_id": stream_id, + "gap": gap, + "platform": "android", + }), + ), + Err(e) => crate::emit_call_debug( + &recv_app, + "video:pli_send_failed", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "stream_id": stream_id, + "error": e.to_string(), + "platform": "android", + }), + ), + } + } } if !video_first_recv_logged { video_first_recv_logged = true; @@ -1957,6 +2013,7 @@ impl CallEngine { app.clone(), transport.clone(), running.clone(), + force_video_keyframe.clone(), pending_profile.clone(), participants.clone(), event_cb.clone(), @@ -1970,6 +2027,7 @@ impl CallEngine { let (tx, mut rx) = tokio::sync::mpsc::channel::(4); let vid_transport = transport.clone(); let vid_running = running.clone(); + let vid_force_keyframe = force_video_keyframe.clone(); let vid_t0 = call_t0; let vid_app = app.clone(); crate::emit_call_debug( @@ -2133,7 +2191,15 @@ impl CallEngine { continue; } - if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { + let keyframe_reason = + if vid_force_keyframe.swap(false, Ordering::AcqRel) { + Some("pli") + } else if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { + Some("periodic") + } else { + None + }; + if let Some(reason) = keyframe_reason { encoder.request_keyframe(); crate::emit_call_debug( &vid_app, @@ -2141,7 +2207,7 @@ impl CallEngine { serde_json::json!({ "t_ms": vid_t0.elapsed().as_millis() as u64, "codec": format!("{:?}", vid_codec), - "reason": "periodic", + "reason": reason, "camera_frames": camera_frames, "platform": "android", }), @@ -2468,6 +2534,7 @@ impl CallEngine { let audio_level = Arc::new(AtomicU32::new(0)); let tx_codec = Arc::new(Mutex::new(String::new())); let rx_codec = Arc::new(Mutex::new(String::new())); + let force_video_keyframe = Arc::new(AtomicBool::new(false)); // Adaptive quality: shared pending-profile bridge between recv → send. let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE)); @@ -2737,6 +2804,7 @@ impl CallEngine { // Route video packets to the reassembler before any audio processing. if pkt.header.media_type == wzp_proto::MediaType::Video { if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) { + let stream_id = pkt.header.stream_id; crate::emit_call_debug( &recv_app, "video:seq_gap", @@ -2745,10 +2813,38 @@ impl CallEngine { "seq": pkt.header.seq, "gap": gap, "gaps": video_continuity.gaps, - "stream_id": pkt.header.stream_id, + "stream_id": stream_id, "platform": "desktop", }), ); + if video_continuity.should_send_pli() { + let pli = wzp_proto::SignalMessage::PictureLossIndication { + version: wzp_proto::default_signal_version(), + stream_id, + }; + match recv_t.send_signal(&pli).await { + Ok(()) => crate::emit_call_debug( + &recv_app, + "video:pli_sent", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "stream_id": stream_id, + "gap": gap, + "platform": "desktop", + }), + ), + Err(e) => crate::emit_call_debug( + &recv_app, + "video:pli_send_failed", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "stream_id": stream_id, + "error": e.to_string(), + "platform": "desktop", + }), + ), + } + } } if !video_first_recv_logged_desktop { video_first_recv_logged_desktop = true; @@ -3184,6 +3280,7 @@ impl CallEngine { _app.clone(), transport.clone(), running.clone(), + force_video_keyframe.clone(), pending_profile.clone(), participants.clone(), event_cb.clone(), @@ -3196,6 +3293,7 @@ impl CallEngine { let (tx, mut rx) = tokio::sync::mpsc::channel::(4); let vid_transport = transport.clone(); let vid_running = running.clone(); + let vid_force_keyframe = force_video_keyframe.clone(); let vid_t0 = call_t0; let vid_app = _app.clone(); crate::emit_call_debug( @@ -3359,7 +3457,15 @@ impl CallEngine { continue; } - if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { + let keyframe_reason = + if vid_force_keyframe.swap(false, Ordering::AcqRel) { + Some("pli") + } else if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { + Some("periodic") + } else { + None + }; + if let Some(reason) = keyframe_reason { encoder.request_keyframe(); crate::emit_call_debug( &vid_app, @@ -3367,7 +3473,7 @@ impl CallEngine { serde_json::json!({ "t_ms": vid_t0.elapsed().as_millis() as u64, "codec": format!("{:?}", vid_codec), - "reason": "periodic", + "reason": reason, "camera_frames": camera_frames, "platform": "desktop", }),