feat(android): wire video send + recv in Android engine; add video:* debug events
Some checks failed
Mirror to GitHub / mirror (push) Failing after 30s
Build Release Binaries / build-amd64 (push) Failing after 3m5s

Mirror the desktop video pipeline into the #[cfg(target_os="android")] start
function: capture _negotiated_video_codec from the handshake, spawn a video
send task that pulls VideoFrames from camera_tx, encodes/packetizes/sends.
Add video reassembly + decode + emit "video:frame" in the recv task before
the audio branch so Android can both send and receive video.

Instrumentation: emit video:first_send and video:first_recv on both desktop
and android paths so we can verify the pipeline end-to-end.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-05-25 18:19:42 +04:00
parent cbc3a8d37e
commit 25b3278d31

View File

@@ -575,7 +575,7 @@ impl CallEngine {
// through the signal channel (DirectCallOffer/Answer carry
// identity_pub + ephemeral_pub + signature).
let quinn_transport = transport.clone();
let transport: Arc<dyn wzp_proto::MediaTransport> = if !is_direct_p2p {
let (_negotiated_video_codec, transport): (Option<wzp_proto::CodecId>, Arc<dyn wzp_proto::MediaTransport>) = if !is_direct_p2p {
crate::emit_call_debug(
&app,
"connect:handshake_start",
@@ -619,14 +619,13 @@ impl CallEngine {
// do NOT wrap with EncryptingTransport. The pairwise client↔relay session
// key can't be used end-to-end without MLS or relay re-encryption.
drop(hs.session);
let _ = hs.video_codec;
transport
(hs.video_codec, transport)
} else {
info!(
t_ms = call_t0.elapsed().as_millis(),
"first-join diag: direct P2P — skipping relay handshake (QUIC TLS is the encryption layer)"
);
transport
(None, transport)
};
// Do not emit the legacy "connected" call-event here. The frontend
// ignores it and enters voice only after the command resolves; on
@@ -1130,6 +1129,11 @@ impl CallEngine {
let mut last_recv_fr_for_watchdog: u64 = 0;
let mut no_recv_ticks: u32 = 0;
let mut media_degraded_emitted = false;
// Video pipeline state — mirror of the desktop recv task.
let mut video_reassembler = wzp_video::transport::VideoReassembler::new();
let mut video_decoder: Option<Box<dyn wzp_video::decoder::VideoDecoder>> = None;
let mut video_decoder_codec: Option<wzp_proto::CodecId> = None;
let mut video_first_recv_logged = false;
loop {
if !recv_r.load(Ordering::Relaxed) {
@@ -1142,6 +1146,67 @@ impl CallEngine {
.await
{
Ok(Ok(Some(pkt))) => {
// Route Video packets through the reassembler/decoder and emit
// a JPEG-encoded frame to the WebView. Done before audio path so
// we don't drop into the audio decoder branches.
if pkt.header.media_type == wzp_proto::MediaType::Video {
if !video_first_recv_logged {
video_first_recv_logged = true;
crate::emit_call_debug(
&recv_app,
"video:first_recv",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", pkt.header.codec_id),
"payload_bytes": pkt.payload.len(),
}),
);
}
if let Some((codec_id, is_kf, frame)) =
video_reassembler.push(&pkt)
{
if video_decoder_codec != Some(codec_id) {
match wzp_video::factory::create_video_decoder(codec_id, 1280, 720) {
Ok(d) => {
info!(codec = ?codec_id, "video decoder created (android)");
video_decoder = Some(d);
video_decoder_codec = Some(codec_id);
}
Err(e) => {
error!("video decoder init failed: {e}");
}
}
}
if let Some(ref mut dec) = video_decoder {
match dec.decode(&frame) {
Ok(Some(yuv_frame)) => {
let jpeg_b64 = crate::i420_to_jpeg_b64(
&yuv_frame.data,
yuv_frame.width,
yuv_frame.height,
);
let _ = recv_app.emit(
"video:frame",
serde_json::json!({
"is_keyframe": is_kf,
"width": yuv_frame.width,
"height": yuv_frame.height,
"jpeg_b64": jpeg_b64,
"codec": format!("{:?}", codec_id),
}),
);
}
Ok(None) => {}
Err(e) => {
error!("video decode error: {e}");
}
}
}
video_reassembler.evict_stale(pkt.header.timestamp, 5_000);
}
continue; // handled — skip audio path
}
if !first_packet_logged {
info!(
t_ms = recv_t0.elapsed().as_millis(),
@@ -1487,6 +1552,92 @@ impl CallEngine {
event_cb.clone(),
));
// Video send task (Android) — mirror of the desktop branch. Only
// spawns when the relay handshake negotiated a video codec; on
// direct P2P video is currently disabled.
let camera_tx = if let Some(vid_codec) = _negotiated_video_codec {
let (tx, mut rx) = tokio::sync::mpsc::channel::<wzp_video::encoder::VideoFrame>(4);
let vid_transport = transport.clone();
let vid_running = running.clone();
let vid_t0 = call_t0;
let vid_app = app.clone();
tokio::spawn(async move {
let mut encoder = match wzp_video::factory::create_video_encoder(
vid_codec, 1280, 720, 1_500_000,
) {
Ok(e) => e,
Err(e) => {
error!("video encoder init failed (android): {e}");
return;
}
};
let mut seq: u32 = 0;
let mut frames_since_keyframe: u32 = 0;
let mut first_send_logged = false;
info!(codec = ?vid_codec, "video send task started (android)");
while vid_running.load(Ordering::Relaxed) {
let frame = match tokio::time::timeout(
std::time::Duration::from_millis(200),
rx.recv(),
)
.await
{
Ok(Some(f)) => f,
Ok(None) => break,
Err(_) => continue,
};
if frames_since_keyframe >= 150 {
encoder.request_keyframe();
frames_since_keyframe = 0;
}
let encoded = match encoder.encode(&frame) {
Ok(b) => b,
Err(e) => {
error!("video encode error (android): {e}");
continue;
}
};
let is_keyframe = encoder.is_keyframe(&encoded);
let ts_ms = vid_t0.elapsed().as_millis() as u32;
let pkts = wzp_video::transport::packetize_video_frame(
&encoded, vid_codec, is_keyframe, &mut seq, ts_ms,
);
if !first_send_logged && !pkts.is_empty() {
first_send_logged = true;
crate::emit_call_debug(
&vid_app,
"video:first_send",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"packets": pkts.len(),
"first_pkt_bytes": pkts[0].payload.len(),
"is_keyframe": is_keyframe,
}),
);
}
for pkt in &pkts {
if let Err(e) = vid_transport.send_media(pkt).await {
crate::emit_call_debug(
&vid_app,
"video:send_error",
serde_json::json!({"error": e.to_string()}),
);
break;
}
}
frames_since_keyframe += 1;
}
info!("video send task exited (android)");
});
Some(tx)
} else {
None
};
Ok(Self {
running,
mic_muted,
@@ -1504,7 +1655,7 @@ impl CallEngine {
// is a static dlopen'd library, the audio streams live inside
// the standalone cdylib's process-global singleton.
_audio_handle: SyncWrapper(Box::new(())),
camera_tx: None, // video not yet wired on Android
camera_tx,
})
}
@@ -1893,6 +2044,7 @@ impl CallEngine {
let mut video_reassembler = wzp_video::transport::VideoReassembler::new();
let mut video_decoder: Option<Box<dyn wzp_video::decoder::VideoDecoder>> = None;
let mut video_decoder_codec: Option<wzp_proto::CodecId> = None;
let mut video_first_recv_logged_desktop = false;
let mut decoded_frames: u64 = 0;
let mut decode_errs: u64 = 0;
let mut last_written: usize = 0;
@@ -1914,6 +2066,18 @@ impl CallEngine {
Ok(Ok(Some(pkt))) => {
// Route video packets to the reassembler before any audio processing.
if pkt.header.media_type == wzp_proto::MediaType::Video {
if !video_first_recv_logged_desktop {
video_first_recv_logged_desktop = true;
crate::emit_call_debug(
&recv_app,
"video:first_recv",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", pkt.header.codec_id),
"payload_bytes": pkt.payload.len(),
}),
);
}
if let Some((codec_id, is_kf, frame)) =
video_reassembler.push(&pkt)
{
@@ -2175,6 +2339,7 @@ impl CallEngine {
};
let mut seq: u32 = 0;
let mut frames_since_keyframe: u32 = 0;
let mut first_send_logged = false;
info!(codec = ?vid_codec, "video send task started");
while vid_running.load(Ordering::Relaxed) {
let frame = match tokio::time::timeout(
@@ -2206,6 +2371,20 @@ impl CallEngine {
let pkts = wzp_video::transport::packetize_video_frame(
&encoded, vid_codec, is_keyframe, &mut seq, ts_ms,
);
if !first_send_logged && !pkts.is_empty() {
first_send_logged = true;
crate::emit_call_debug(
&vid_app,
"video:first_send",
serde_json::json!({
"t_ms": vid_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", vid_codec),
"packets": pkts.len(),
"first_pkt_bytes": pkts[0].payload.len(),
"is_keyframe": is_keyframe,
}),
);
}
for pkt in &pkts {
if let Err(e) = vid_transport.send_media(pkt).await {
crate::emit_call_debug(