fix(video): request keyframes after packet loss
Some checks failed
Mirror to GitHub / mirror (push) Failing after 31s
Build Release Binaries / build-amd64 (push) Failing after 3m14s

This commit is contained in:
Siavash Sameni
2026-05-26 09:23:08 +04:00
parent 079e21e174
commit 31b2caa54d

View File

@@ -182,12 +182,14 @@ fn should_log_video_sample(frame_no: u64, is_keyframe: bool) -> bool {
frame_no <= 5 || is_keyframe || frame_no % 30 == 0 frame_no <= 5 || is_keyframe || frame_no % 30 == 0
} }
const VIDEO_KEYFRAME_INTERVAL_FRAMES: u32 = 30; const VIDEO_KEYFRAME_INTERVAL_FRAMES: u32 = 120;
const VIDEO_PLI_MIN_INTERVAL_MS: u128 = 250;
#[derive(Default)] #[derive(Default)]
struct VideoContinuity { struct VideoContinuity {
expected_seq: Option<u32>, expected_seq: Option<u32>,
wait_for_keyframe: bool, wait_for_keyframe: bool,
last_pli_sent: Option<Instant>,
gaps: u64, gaps: u64,
dropped_frames: u64, dropped_frames: u64,
} }
@@ -225,6 +227,18 @@ impl VideoContinuity {
} }
true true
} }
fn should_send_pli(&mut self) -> bool {
let now = Instant::now();
if self
.last_pli_sent
.is_none_or(|last| now.duration_since(last).as_millis() >= VIDEO_PLI_MIN_INTERVAL_MS)
{
self.last_pli_sent = Some(now);
return true;
}
false
}
} }
fn is_startup_black_i420(data: &[u8], width: u32, height: u32) -> bool { fn is_startup_black_i420(data: &[u8], width: u32, height: u32) -> bool {
@@ -312,6 +326,7 @@ async fn run_signal_task(
app: tauri::AppHandle, app: tauri::AppHandle,
transport: Arc<dyn wzp_proto::MediaTransport>, transport: Arc<dyn wzp_proto::MediaTransport>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
force_video_keyframe: Arc<AtomicBool>,
pending_profile: Arc<AtomicU8>, pending_profile: Arc<AtomicU8>,
participants: Arc<Mutex<Vec<ParticipantInfo>>>, participants: Arc<Mutex<Vec<ParticipantInfo>>>,
event_cb: Arc<dyn Fn(&str, &str) + Send + Sync>, event_cb: Arc<dyn Fn(&str, &str) + Send + Sync>,
@@ -382,6 +397,17 @@ async fn run_signal_task(
); );
pending_profile.store(idx, Ordering::Release); pending_profile.store(idx, Ordering::Release);
} }
Ok(Ok(Some(wzp_proto::SignalMessage::PictureLossIndication {
stream_id,
..
}))) => {
force_video_keyframe.store(true, Ordering::Release);
crate::emit_call_debug(
&app,
"video:pli_recv",
serde_json::json!({ "stream_id": stream_id }),
);
}
Ok(Ok(Some(_))) => {} Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break, Ok(Ok(None)) => break,
Ok(Err(_)) => break, Ok(Err(_)) => break,
@@ -995,6 +1021,7 @@ impl CallEngine {
let audio_level = Arc::new(AtomicU32::new(0)); let audio_level = Arc::new(AtomicU32::new(0));
let tx_codec = Arc::new(Mutex::new(String::new())); let tx_codec = Arc::new(Mutex::new(String::new()));
let rx_codec = Arc::new(Mutex::new(String::new())); let rx_codec = Arc::new(Mutex::new(String::new()));
let force_video_keyframe = Arc::new(AtomicBool::new(false));
// Adaptive quality: shared pending-profile bridge between recv → send. // Adaptive quality: shared pending-profile bridge between recv → send.
let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE)); let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE));
@@ -1357,6 +1384,7 @@ impl CallEngine {
// we don't drop into the audio decoder branches. // we don't drop into the audio decoder branches.
if pkt.header.media_type == wzp_proto::MediaType::Video { if pkt.header.media_type == wzp_proto::MediaType::Video {
if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) { if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) {
let stream_id = pkt.header.stream_id;
crate::emit_call_debug( crate::emit_call_debug(
&recv_app, &recv_app,
"video:seq_gap", "video:seq_gap",
@@ -1365,10 +1393,38 @@ impl CallEngine {
"seq": pkt.header.seq, "seq": pkt.header.seq,
"gap": gap, "gap": gap,
"gaps": video_continuity.gaps, "gaps": video_continuity.gaps,
"stream_id": pkt.header.stream_id, "stream_id": stream_id,
"platform": "android", "platform": "android",
}), }),
); );
if video_continuity.should_send_pli() {
let pli = wzp_proto::SignalMessage::PictureLossIndication {
version: wzp_proto::default_signal_version(),
stream_id,
};
match recv_t.send_signal(&pli).await {
Ok(()) => crate::emit_call_debug(
&recv_app,
"video:pli_sent",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"stream_id": stream_id,
"gap": gap,
"platform": "android",
}),
),
Err(e) => crate::emit_call_debug(
&recv_app,
"video:pli_send_failed",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"stream_id": stream_id,
"error": e.to_string(),
"platform": "android",
}),
),
}
}
} }
if !video_first_recv_logged { if !video_first_recv_logged {
video_first_recv_logged = true; video_first_recv_logged = true;
@@ -1957,6 +2013,7 @@ impl CallEngine {
app.clone(), app.clone(),
transport.clone(), transport.clone(),
running.clone(), running.clone(),
force_video_keyframe.clone(),
pending_profile.clone(), pending_profile.clone(),
participants.clone(), participants.clone(),
event_cb.clone(), event_cb.clone(),
@@ -1970,6 +2027,7 @@ impl CallEngine {
let (tx, mut rx) = tokio::sync::mpsc::channel::<wzp_video::encoder::VideoFrame>(4); let (tx, mut rx) = tokio::sync::mpsc::channel::<wzp_video::encoder::VideoFrame>(4);
let vid_transport = transport.clone(); let vid_transport = transport.clone();
let vid_running = running.clone(); let vid_running = running.clone();
let vid_force_keyframe = force_video_keyframe.clone();
let vid_t0 = call_t0; let vid_t0 = call_t0;
let vid_app = app.clone(); let vid_app = app.clone();
crate::emit_call_debug( crate::emit_call_debug(
@@ -2133,7 +2191,15 @@ impl CallEngine {
continue; continue;
} }
if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { let keyframe_reason =
if vid_force_keyframe.swap(false, Ordering::AcqRel) {
Some("pli")
} else if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES {
Some("periodic")
} else {
None
};
if let Some(reason) = keyframe_reason {
encoder.request_keyframe(); encoder.request_keyframe();
crate::emit_call_debug( crate::emit_call_debug(
&vid_app, &vid_app,
@@ -2141,7 +2207,7 @@ impl CallEngine {
serde_json::json!({ serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64, "t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec), "codec": format!("{:?}", vid_codec),
"reason": "periodic", "reason": reason,
"camera_frames": camera_frames, "camera_frames": camera_frames,
"platform": "android", "platform": "android",
}), }),
@@ -2468,6 +2534,7 @@ impl CallEngine {
let audio_level = Arc::new(AtomicU32::new(0)); let audio_level = Arc::new(AtomicU32::new(0));
let tx_codec = Arc::new(Mutex::new(String::new())); let tx_codec = Arc::new(Mutex::new(String::new()));
let rx_codec = Arc::new(Mutex::new(String::new())); let rx_codec = Arc::new(Mutex::new(String::new()));
let force_video_keyframe = Arc::new(AtomicBool::new(false));
// Adaptive quality: shared pending-profile bridge between recv → send. // Adaptive quality: shared pending-profile bridge between recv → send.
let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE)); let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE));
@@ -2737,6 +2804,7 @@ impl CallEngine {
// Route video packets to the reassembler before any audio processing. // Route video packets to the reassembler before any audio processing.
if pkt.header.media_type == wzp_proto::MediaType::Video { if pkt.header.media_type == wzp_proto::MediaType::Video {
if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) { if let Some(gap) = video_continuity.observe_packet(pkt.header.seq) {
let stream_id = pkt.header.stream_id;
crate::emit_call_debug( crate::emit_call_debug(
&recv_app, &recv_app,
"video:seq_gap", "video:seq_gap",
@@ -2745,10 +2813,38 @@ impl CallEngine {
"seq": pkt.header.seq, "seq": pkt.header.seq,
"gap": gap, "gap": gap,
"gaps": video_continuity.gaps, "gaps": video_continuity.gaps,
"stream_id": pkt.header.stream_id, "stream_id": stream_id,
"platform": "desktop", "platform": "desktop",
}), }),
); );
if video_continuity.should_send_pli() {
let pli = wzp_proto::SignalMessage::PictureLossIndication {
version: wzp_proto::default_signal_version(),
stream_id,
};
match recv_t.send_signal(&pli).await {
Ok(()) => crate::emit_call_debug(
&recv_app,
"video:pli_sent",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"stream_id": stream_id,
"gap": gap,
"platform": "desktop",
}),
),
Err(e) => crate::emit_call_debug(
&recv_app,
"video:pli_send_failed",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"stream_id": stream_id,
"error": e.to_string(),
"platform": "desktop",
}),
),
}
}
} }
if !video_first_recv_logged_desktop { if !video_first_recv_logged_desktop {
video_first_recv_logged_desktop = true; video_first_recv_logged_desktop = true;
@@ -3184,6 +3280,7 @@ impl CallEngine {
_app.clone(), _app.clone(),
transport.clone(), transport.clone(),
running.clone(), running.clone(),
force_video_keyframe.clone(),
pending_profile.clone(), pending_profile.clone(),
participants.clone(), participants.clone(),
event_cb.clone(), event_cb.clone(),
@@ -3196,6 +3293,7 @@ impl CallEngine {
let (tx, mut rx) = tokio::sync::mpsc::channel::<wzp_video::encoder::VideoFrame>(4); let (tx, mut rx) = tokio::sync::mpsc::channel::<wzp_video::encoder::VideoFrame>(4);
let vid_transport = transport.clone(); let vid_transport = transport.clone();
let vid_running = running.clone(); let vid_running = running.clone();
let vid_force_keyframe = force_video_keyframe.clone();
let vid_t0 = call_t0; let vid_t0 = call_t0;
let vid_app = _app.clone(); let vid_app = _app.clone();
crate::emit_call_debug( crate::emit_call_debug(
@@ -3359,7 +3457,15 @@ impl CallEngine {
continue; continue;
} }
if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES { let keyframe_reason =
if vid_force_keyframe.swap(false, Ordering::AcqRel) {
Some("pli")
} else if frames_since_keyframe >= VIDEO_KEYFRAME_INTERVAL_FRAMES {
Some("periodic")
} else {
None
};
if let Some(reason) = keyframe_reason {
encoder.request_keyframe(); encoder.request_keyframe();
crate::emit_call_debug( crate::emit_call_debug(
&vid_app, &vid_app,
@@ -3367,7 +3473,7 @@ impl CallEngine {
serde_json::json!({ serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64, "t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec), "codec": format!("{:?}", vid_codec),
"reason": "periodic", "reason": reason,
"camera_frames": camera_frames, "camera_frames": camera_frames,
"platform": "desktop", "platform": "desktop",
}), }),