fix(video): force h264 and trace frame pipeline
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m32s
Mirror to GitHub / mirror (push) Failing after 28s

This commit is contained in:
Siavash Sameni
2026-05-25 20:03:11 +04:00
parent 7eca79846f
commit d57ebe3d2c
9 changed files with 761 additions and 43 deletions

View File

@@ -634,6 +634,7 @@ impl CallEngine {
"connect:handshake_done",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"video_codec": hs.video_codec.map(|c| format!("{:?}", c)),
}),
);
info!(
@@ -653,6 +654,16 @@ impl CallEngine {
);
(None, transport)
};
crate::emit_call_debug(
&app,
"video:negotiated",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"codec": _negotiated_video_codec.map(|c| format!("{:?}", c)),
"enabled": _negotiated_video_codec.is_some(),
"direct_p2p": is_direct_p2p,
}),
);
// Do not emit the legacy "connected" call-event here. The frontend
// ignores it and enters voice only after the command resolves; on
// Android this synchronous emit was the only operation between
@@ -1172,6 +1183,9 @@ impl CallEngine {
let mut video_decoder: Option<Box<dyn wzp_video::decoder::VideoDecoder>> = None;
let mut video_decoder_codec: Option<wzp_proto::CodecId> = None;
let mut video_first_recv_logged = false;
let mut video_first_reassembled_logged = false;
let mut video_first_decoded_logged = false;
let mut video_decoder_buffering_count: u64 = 0;
loop {
if !recv_r.load(Ordering::Relaxed) {
@@ -1203,15 +1217,59 @@ impl CallEngine {
if let Some((codec_id, is_kf, frame)) =
video_reassembler.push(&pkt)
{
if !video_first_reassembled_logged {
video_first_reassembled_logged = true;
crate::emit_call_debug(
&recv_app,
"video:first_reassembled",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"is_keyframe": is_kf,
"frame_bytes": frame.len(),
"platform": "android",
}),
);
}
if video_decoder_codec != Some(codec_id) {
crate::emit_call_debug(
&recv_app,
"video:decoder_init_start",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"width": 1280,
"height": 720,
"platform": "android",
}),
);
match wzp_video::factory::create_video_decoder(codec_id, 1280, 720) {
Ok(d) => {
info!(codec = ?codec_id, "video decoder created (android)");
crate::emit_call_debug(
&recv_app,
"video:decoder_started",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"platform": "android",
}),
);
video_decoder = Some(d);
video_decoder_codec = Some(codec_id);
}
Err(e) => {
error!("video decoder init failed: {e}");
crate::emit_call_debug(
&recv_app,
"video:decoder_init_failed",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"error": e.to_string(),
"platform": "android",
}),
);
}
}
}
@@ -1223,6 +1281,37 @@ impl CallEngine {
yuv_frame.width,
yuv_frame.height,
);
let jpeg_ok = jpeg_b64.is_some();
if !video_first_decoded_logged {
video_first_decoded_logged = true;
crate::emit_call_debug(
&recv_app,
"video:first_decoded_frame",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"width": yuv_frame.width,
"height": yuv_frame.height,
"yuv_bytes": yuv_frame.data.len(),
"jpeg_ok": jpeg_ok,
"platform": "android",
}),
);
}
if !jpeg_ok {
crate::emit_call_debug(
&recv_app,
"video:jpeg_encode_failed",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"width": yuv_frame.width,
"height": yuv_frame.height,
"yuv_bytes": yuv_frame.data.len(),
"platform": "android",
}),
);
}
let _ = recv_app.emit(
"video:frame",
serde_json::json!({
@@ -1234,9 +1323,35 @@ impl CallEngine {
}),
);
}
Ok(None) => {}
Ok(None) => {
video_decoder_buffering_count += 1;
if video_decoder_buffering_count == 1
|| video_decoder_buffering_count % 30 == 0
{
crate::emit_call_debug(
&recv_app,
"video:decoder_buffering",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"buffering": video_decoder_buffering_count,
"platform": "android",
}),
);
}
}
Err(e) => {
error!("video decode error: {e}");
crate::emit_call_debug(
&recv_app,
"video:decode_error",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"error": e.to_string(),
"platform": "android",
}),
);
}
}
}
@@ -1604,19 +1719,77 @@ impl CallEngine {
let vid_running = running.clone();
let vid_t0 = call_t0;
let vid_app = app.clone();
crate::emit_call_debug(
&app,
"video:sender_channel_ready",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"codec": format!("{:?}", vid_codec),
"queue_depth": 4,
"platform": "android",
}),
);
tokio::spawn(async move {
crate::emit_call_debug(
&vid_app,
"video:encoder_init_start",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"width": 1280,
"height": 720,
"bitrate_bps": 1_500_000,
"platform": "android",
}),
);
let mut encoder = match wzp_video::factory::create_video_encoder(
vid_codec, 1280, 720, 1_500_000,
) {
Ok(e) => e,
Ok(e) => {
crate::emit_call_debug(
&vid_app,
"video:encoder_started",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"platform": "android",
}),
);
e
}
Err(e) => {
error!("video encoder init failed (android): {e}");
crate::emit_call_debug(
&vid_app,
"video:encoder_init_failed",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"platform": "android",
"error": e.to_string(),
}),
);
return;
}
};
let mut seq: u32 = 0;
let mut frames_since_keyframe: u32 = 0;
let mut first_send_logged = false;
let mut first_camera_frame_logged = false;
let mut camera_frames: u64 = 0;
let mut empty_encodes: u64 = 0;
let mut wait_ticks: u64 = 0;
encoder.request_keyframe();
crate::emit_call_debug(
&vid_app,
"video:keyframe_requested",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"reason": "initial",
"platform": "android",
}),
);
info!(codec = ?vid_codec, "video send task started (android)");
while vid_running.load(Ordering::Relaxed) {
let frame = match tokio::time::timeout(
@@ -1625,13 +1798,58 @@ impl CallEngine {
)
.await
{
Ok(Some(f)) => f,
Ok(Some(f)) => {
wait_ticks = 0;
camera_frames += 1;
if !first_camera_frame_logged {
first_camera_frame_logged = true;
crate::emit_call_debug(
&vid_app,
"video:first_camera_frame",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"width": f.width,
"height": f.height,
"data_bytes": f.data.len(),
"platform": "android",
}),
);
}
f
}
Ok(None) => break,
Err(_) => continue,
Err(_) => {
wait_ticks += 1;
if wait_ticks == 10 || wait_ticks % 50 == 0 {
crate::emit_call_debug(
&vid_app,
"video:waiting_for_camera_frames",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"wait_ms": wait_ticks * 200,
"codec": format!("{:?}", vid_codec),
"platform": "android",
}),
);
}
continue;
}
};
if frames_since_keyframe >= 150 {
encoder.request_keyframe();
crate::emit_call_debug(
&vid_app,
"video:keyframe_requested",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"reason": "periodic",
"camera_frames": camera_frames,
"platform": "android",
}),
);
frames_since_keyframe = 0;
}
@@ -1639,9 +1857,37 @@ impl CallEngine {
Ok(b) => b,
Err(e) => {
error!("video encode error (android): {e}");
crate::emit_call_debug(
&vid_app,
"video:encode_error",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"camera_frames": camera_frames,
"error": e.to_string(),
"platform": "android",
}),
);
continue;
}
};
if encoded.is_empty() {
empty_encodes += 1;
if empty_encodes == 1 || empty_encodes % 30 == 0 {
crate::emit_call_debug(
&vid_app,
"video:encode_empty",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"camera_frames": camera_frames,
"empty_encodes": empty_encodes,
"platform": "android",
}),
);
}
continue;
}
let is_keyframe = encoder.is_keyframe(&encoded);
let ts_ms = vid_t0.elapsed().as_millis() as u32;
@@ -1674,10 +1920,34 @@ impl CallEngine {
}
frames_since_keyframe += 1;
}
crate::emit_call_debug(
&vid_app,
"video:sender_exit",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"camera_frames": camera_frames,
"empty_encodes": empty_encodes,
"platform": "android",
}),
);
info!("video send task exited (android)");
});
Some(tx)
} else {
crate::emit_call_debug(
&app,
"video:send_disabled",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"reason": if is_direct_p2p {
"direct_p2p_skips_relay_handshake"
} else {
"no_video_codec_negotiated"
},
"platform": "android",
}),
);
None
};
@@ -1795,6 +2065,14 @@ impl CallEngine {
error!("perform_handshake failed: {e}");
e
})?;
crate::emit_call_debug(
&_app,
"connect:handshake_done",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"video_codec": hs.video_codec.map(|c| format!("{:?}", c)),
}),
);
info!(video_codec = ?hs.video_codec, "handshake complete");
drop(hs.session);
(hs.video_codec, transport)
@@ -1802,6 +2080,16 @@ impl CallEngine {
info!("direct P2P — skipping relay handshake (QUIC TLS is the encryption layer)");
(None, transport)
};
crate::emit_call_debug(
&_app,
"video:negotiated",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"codec": _negotiated_video_codec.map(|c| format!("{:?}", c)),
"enabled": _negotiated_video_codec.is_some(),
"direct_p2p": is_direct_p2p,
}),
);
info!("connected to relay, handshake complete");
event_cb("connected", &format!("joined room {room}"));
@@ -2100,6 +2388,9 @@ impl CallEngine {
let mut video_decoder: Option<Box<dyn wzp_video::decoder::VideoDecoder>> = None;
let mut video_decoder_codec: Option<wzp_proto::CodecId> = None;
let mut video_first_recv_logged_desktop = false;
let mut video_first_reassembled_logged = false;
let mut video_first_decoded_logged = false;
let mut video_decoder_buffering_count: u64 = 0;
let mut decoded_frames: u64 = 0;
let mut decode_errs: u64 = 0;
let mut last_written: usize = 0;
@@ -2136,16 +2427,60 @@ impl CallEngine {
if let Some((codec_id, is_kf, frame)) =
video_reassembler.push(&pkt)
{
if !video_first_reassembled_logged {
video_first_reassembled_logged = true;
crate::emit_call_debug(
&recv_app,
"video:first_reassembled",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"is_keyframe": is_kf,
"frame_bytes": frame.len(),
"platform": "desktop",
}),
);
}
// Lazy-init or switch decoder on codec change.
if video_decoder_codec != Some(codec_id) {
crate::emit_call_debug(
&recv_app,
"video:decoder_init_start",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"width": 1280,
"height": 720,
"platform": "desktop",
}),
);
match wzp_video::factory::create_video_decoder(codec_id, 1280, 720) {
Ok(d) => {
info!(codec = ?codec_id, "video decoder created");
crate::emit_call_debug(
&recv_app,
"video:decoder_started",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"platform": "desktop",
}),
);
video_decoder = Some(d);
video_decoder_codec = Some(codec_id);
}
Err(e) => {
error!("video decoder init failed: {e}");
crate::emit_call_debug(
&recv_app,
"video:decoder_init_failed",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"error": e.to_string(),
"platform": "desktop",
}),
);
}
}
}
@@ -2160,6 +2495,37 @@ impl CallEngine {
yuv_frame.width,
yuv_frame.height,
);
let jpeg_ok = jpeg_b64.is_some();
if !video_first_decoded_logged {
video_first_decoded_logged = true;
crate::emit_call_debug(
&recv_app,
"video:first_decoded_frame",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"width": yuv_frame.width,
"height": yuv_frame.height,
"yuv_bytes": yuv_frame.data.len(),
"jpeg_ok": jpeg_ok,
"platform": "desktop",
}),
);
}
if !jpeg_ok {
crate::emit_call_debug(
&recv_app,
"video:jpeg_encode_failed",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"width": yuv_frame.width,
"height": yuv_frame.height,
"yuv_bytes": yuv_frame.data.len(),
"platform": "desktop",
}),
);
}
let _ = recv_app.emit(
"video:frame",
serde_json::json!({
@@ -2171,9 +2537,35 @@ impl CallEngine {
}),
);
}
Ok(None) => {} // decoder buffering — no output yet
Ok(None) => {
video_decoder_buffering_count += 1;
if video_decoder_buffering_count == 1
|| video_decoder_buffering_count % 30 == 0
{
crate::emit_call_debug(
&recv_app,
"video:decoder_buffering",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"buffering": video_decoder_buffering_count,
"platform": "desktop",
}),
);
}
}
Err(e) => {
error!("video decode error: {e}");
crate::emit_call_debug(
&recv_app,
"video:decode_error",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", codec_id),
"error": e.to_string(),
"platform": "desktop",
}),
);
}
}
}
@@ -2387,19 +2779,77 @@ impl CallEngine {
let vid_running = running.clone();
let vid_t0 = call_t0;
let vid_app = _app.clone();
crate::emit_call_debug(
&_app,
"video:sender_channel_ready",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"codec": format!("{:?}", vid_codec),
"queue_depth": 4,
"platform": "desktop",
}),
);
tokio::spawn(async move {
crate::emit_call_debug(
&vid_app,
"video:encoder_init_start",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"width": 1280,
"height": 720,
"bitrate_bps": 1_500_000,
"platform": "desktop",
}),
);
let mut encoder = match wzp_video::factory::create_video_encoder(
vid_codec, 1280, 720, 1_500_000,
) {
Ok(e) => e,
Ok(e) => {
crate::emit_call_debug(
&vid_app,
"video:encoder_started",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"platform": "desktop",
}),
);
e
}
Err(e) => {
error!("video encoder init failed: {e}");
crate::emit_call_debug(
&vid_app,
"video:encoder_init_failed",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"platform": "desktop",
"error": e.to_string(),
}),
);
return;
}
};
let mut seq: u32 = 0;
let mut frames_since_keyframe: u32 = 0;
let mut first_send_logged = false;
let mut first_camera_frame_logged = false;
let mut camera_frames: u64 = 0;
let mut empty_encodes: u64 = 0;
let mut wait_ticks: u64 = 0;
encoder.request_keyframe();
crate::emit_call_debug(
&vid_app,
"video:keyframe_requested",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"reason": "initial",
"platform": "desktop",
}),
);
info!(codec = ?vid_codec, "video send task started");
while vid_running.load(Ordering::Relaxed) {
let frame = match tokio::time::timeout(
@@ -2408,13 +2858,58 @@ impl CallEngine {
)
.await
{
Ok(Some(f)) => f,
Ok(Some(f)) => {
wait_ticks = 0;
camera_frames += 1;
if !first_camera_frame_logged {
first_camera_frame_logged = true;
crate::emit_call_debug(
&vid_app,
"video:first_camera_frame",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"width": f.width,
"height": f.height,
"data_bytes": f.data.len(),
"platform": "desktop",
}),
);
}
f
}
Ok(None) => break, // sender dropped
Err(_) => continue, // no frame yet — keep looping
Err(_) => {
wait_ticks += 1;
if wait_ticks == 10 || wait_ticks % 50 == 0 {
crate::emit_call_debug(
&vid_app,
"video:waiting_for_camera_frames",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"wait_ms": wait_ticks * 200,
"codec": format!("{:?}", vid_codec),
"platform": "desktop",
}),
);
}
continue;
}
};
if frames_since_keyframe >= 150 {
encoder.request_keyframe();
crate::emit_call_debug(
&vid_app,
"video:keyframe_requested",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"reason": "periodic",
"camera_frames": camera_frames,
"platform": "desktop",
}),
);
frames_since_keyframe = 0;
}
@@ -2422,9 +2917,37 @@ impl CallEngine {
Ok(b) => b,
Err(e) => {
error!("video encode error: {e}");
crate::emit_call_debug(
&vid_app,
"video:encode_error",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"camera_frames": camera_frames,
"error": e.to_string(),
"platform": "desktop",
}),
);
continue;
}
};
if encoded.is_empty() {
empty_encodes += 1;
if empty_encodes == 1 || empty_encodes % 30 == 0 {
crate::emit_call_debug(
&vid_app,
"video:encode_empty",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"camera_frames": camera_frames,
"empty_encodes": empty_encodes,
"platform": "desktop",
}),
);
}
continue;
}
let is_keyframe = encoder.is_keyframe(&encoded);
let ts_ms = vid_t0.elapsed().as_millis() as u32;
@@ -2457,10 +2980,34 @@ impl CallEngine {
}
frames_since_keyframe += 1;
}
crate::emit_call_debug(
&vid_app,
"video:sender_exit",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"camera_frames": camera_frames,
"empty_encodes": empty_encodes,
"platform": "desktop",
}),
);
info!("video send task exited");
});
Some(tx)
} else {
crate::emit_call_debug(
&_app,
"video:send_disabled",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"reason": if is_direct_p2p {
"direct_p2p_skips_relay_handshake"
} else {
"no_video_codec_negotiated"
},
"platform": "desktop",
}),
);
None
};