From 25b3278d31d2be7bc60b3827a6f10b222c00b10a Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 25 May 2026 18:19:42 +0400 Subject: [PATCH] feat(android): wire video send + recv in Android engine; add video:* debug events Mirror the desktop video pipeline into the #[cfg(target_os="android")] start function: capture _negotiated_video_codec from the handshake, spawn a video send task that pulls VideoFrames from camera_tx, encodes/packetizes/sends. Add video reassembly + decode + emit "video:frame" in the recv task before the audio branch so Android can both send and receive video. Instrumentation: emit video:first_send and video:first_recv on both desktop and android paths so we can verify the pipeline end-to-end. Co-Authored-By: Claude Sonnet 4.6 --- desktop/src-tauri/src/engine.rs | 189 +++++++++++++++++++++++++++++++- 1 file changed, 184 insertions(+), 5 deletions(-) diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index d7cbd7e..e065a6b 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -575,7 +575,7 @@ impl CallEngine { // through the signal channel (DirectCallOffer/Answer carry // identity_pub + ephemeral_pub + signature). let quinn_transport = transport.clone(); - let transport: Arc = if !is_direct_p2p { + let (_negotiated_video_codec, transport): (Option, Arc) = if !is_direct_p2p { crate::emit_call_debug( &app, "connect:handshake_start", @@ -619,14 +619,13 @@ impl CallEngine { // do NOT wrap with EncryptingTransport. The pairwise client↔relay session // key can't be used end-to-end without MLS or relay re-encryption. drop(hs.session); - let _ = hs.video_codec; - transport + (hs.video_codec, transport) } else { info!( t_ms = call_t0.elapsed().as_millis(), "first-join diag: direct P2P — skipping relay handshake (QUIC TLS is the encryption layer)" ); - transport + (None, transport) }; // Do not emit the legacy "connected" call-event here. The frontend // ignores it and enters voice only after the command resolves; on @@ -1130,6 +1129,11 @@ impl CallEngine { let mut last_recv_fr_for_watchdog: u64 = 0; let mut no_recv_ticks: u32 = 0; let mut media_degraded_emitted = false; + // Video pipeline state — mirror of the desktop recv task. + let mut video_reassembler = wzp_video::transport::VideoReassembler::new(); + let mut video_decoder: Option> = None; + let mut video_decoder_codec: Option = None; + let mut video_first_recv_logged = false; loop { if !recv_r.load(Ordering::Relaxed) { @@ -1142,6 +1146,67 @@ impl CallEngine { .await { Ok(Ok(Some(pkt))) => { + // Route Video packets through the reassembler/decoder and emit + // a JPEG-encoded frame to the WebView. Done before audio path so + // we don't drop into the audio decoder branches. + if pkt.header.media_type == wzp_proto::MediaType::Video { + if !video_first_recv_logged { + video_first_recv_logged = true; + crate::emit_call_debug( + &recv_app, + "video:first_recv", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "codec": format!("{:?}", pkt.header.codec_id), + "payload_bytes": pkt.payload.len(), + }), + ); + } + if let Some((codec_id, is_kf, frame)) = + video_reassembler.push(&pkt) + { + if video_decoder_codec != Some(codec_id) { + match wzp_video::factory::create_video_decoder(codec_id, 1280, 720) { + Ok(d) => { + info!(codec = ?codec_id, "video decoder created (android)"); + video_decoder = Some(d); + video_decoder_codec = Some(codec_id); + } + Err(e) => { + error!("video decoder init failed: {e}"); + } + } + } + if let Some(ref mut dec) = video_decoder { + match dec.decode(&frame) { + Ok(Some(yuv_frame)) => { + let jpeg_b64 = crate::i420_to_jpeg_b64( + &yuv_frame.data, + yuv_frame.width, + yuv_frame.height, + ); + let _ = recv_app.emit( + "video:frame", + serde_json::json!({ + "is_keyframe": is_kf, + "width": yuv_frame.width, + "height": yuv_frame.height, + "jpeg_b64": jpeg_b64, + "codec": format!("{:?}", codec_id), + }), + ); + } + Ok(None) => {} + Err(e) => { + error!("video decode error: {e}"); + } + } + } + video_reassembler.evict_stale(pkt.header.timestamp, 5_000); + } + continue; // handled — skip audio path + } + if !first_packet_logged { info!( t_ms = recv_t0.elapsed().as_millis(), @@ -1487,6 +1552,92 @@ impl CallEngine { event_cb.clone(), )); + // Video send task (Android) — mirror of the desktop branch. Only + // spawns when the relay handshake negotiated a video codec; on + // direct P2P video is currently disabled. + let camera_tx = if let Some(vid_codec) = _negotiated_video_codec { + let (tx, mut rx) = tokio::sync::mpsc::channel::(4); + let vid_transport = transport.clone(); + let vid_running = running.clone(); + let vid_t0 = call_t0; + let vid_app = app.clone(); + tokio::spawn(async move { + let mut encoder = match wzp_video::factory::create_video_encoder( + vid_codec, 1280, 720, 1_500_000, + ) { + Ok(e) => e, + Err(e) => { + error!("video encoder init failed (android): {e}"); + return; + } + }; + let mut seq: u32 = 0; + let mut frames_since_keyframe: u32 = 0; + let mut first_send_logged = false; + info!(codec = ?vid_codec, "video send task started (android)"); + while vid_running.load(Ordering::Relaxed) { + let frame = match tokio::time::timeout( + std::time::Duration::from_millis(200), + rx.recv(), + ) + .await + { + Ok(Some(f)) => f, + Ok(None) => break, + Err(_) => continue, + }; + + if frames_since_keyframe >= 150 { + encoder.request_keyframe(); + frames_since_keyframe = 0; + } + + let encoded = match encoder.encode(&frame) { + Ok(b) => b, + Err(e) => { + error!("video encode error (android): {e}"); + continue; + } + }; + + let is_keyframe = encoder.is_keyframe(&encoded); + let ts_ms = vid_t0.elapsed().as_millis() as u32; + let pkts = wzp_video::transport::packetize_video_frame( + &encoded, vid_codec, is_keyframe, &mut seq, ts_ms, + ); + if !first_send_logged && !pkts.is_empty() { + first_send_logged = true; + crate::emit_call_debug( + &vid_app, + "video:first_send", + serde_json::json!({ + "t_ms": vid_t0.elapsed().as_millis() as u64, + "codec": format!("{:?}", vid_codec), + "packets": pkts.len(), + "first_pkt_bytes": pkts[0].payload.len(), + "is_keyframe": is_keyframe, + }), + ); + } + for pkt in &pkts { + if let Err(e) = vid_transport.send_media(pkt).await { + crate::emit_call_debug( + &vid_app, + "video:send_error", + serde_json::json!({"error": e.to_string()}), + ); + break; + } + } + frames_since_keyframe += 1; + } + info!("video send task exited (android)"); + }); + Some(tx) + } else { + None + }; + Ok(Self { running, mic_muted, @@ -1504,7 +1655,7 @@ impl CallEngine { // is a static dlopen'd library, the audio streams live inside // the standalone cdylib's process-global singleton. _audio_handle: SyncWrapper(Box::new(())), - camera_tx: None, // video not yet wired on Android + camera_tx, }) } @@ -1893,6 +2044,7 @@ impl CallEngine { let mut video_reassembler = wzp_video::transport::VideoReassembler::new(); let mut video_decoder: Option> = None; let mut video_decoder_codec: Option = None; + let mut video_first_recv_logged_desktop = false; let mut decoded_frames: u64 = 0; let mut decode_errs: u64 = 0; let mut last_written: usize = 0; @@ -1914,6 +2066,18 @@ impl CallEngine { Ok(Ok(Some(pkt))) => { // Route video packets to the reassembler before any audio processing. if pkt.header.media_type == wzp_proto::MediaType::Video { + if !video_first_recv_logged_desktop { + video_first_recv_logged_desktop = true; + crate::emit_call_debug( + &recv_app, + "video:first_recv", + serde_json::json!({ + "t_ms": recv_t0.elapsed().as_millis() as u64, + "codec": format!("{:?}", pkt.header.codec_id), + "payload_bytes": pkt.payload.len(), + }), + ); + } if let Some((codec_id, is_kf, frame)) = video_reassembler.push(&pkt) { @@ -2175,6 +2339,7 @@ impl CallEngine { }; let mut seq: u32 = 0; let mut frames_since_keyframe: u32 = 0; + let mut first_send_logged = false; info!(codec = ?vid_codec, "video send task started"); while vid_running.load(Ordering::Relaxed) { let frame = match tokio::time::timeout( @@ -2206,6 +2371,20 @@ impl CallEngine { let pkts = wzp_video::transport::packetize_video_frame( &encoded, vid_codec, is_keyframe, &mut seq, ts_ms, ); + if !first_send_logged && !pkts.is_empty() { + first_send_logged = true; + crate::emit_call_debug( + &vid_app, + "video:first_send", + serde_json::json!({ + "t_ms": vid_t0.elapsed().as_millis() as u64, + "codec": format!("{:?}", vid_codec), + "packets": pkts.len(), + "first_pkt_bytes": pkts[0].payload.len(), + "is_keyframe": is_keyframe, + }), + ); + } for pkt in &pkts { if let Err(e) = vid_transport.send_media(pkt).await { crate::emit_call_debug(