feat(video+desktop): camera capture, video UI, E2E AEAD wiring, test fixes

Blockers 4 & 5: browser getUserMedia → JPEG IPC → Rust I420 pipeline;
remote video strip renders decoded frames via canvas; EncryptingTransport
wraps QuinnTransport so WZP AEAD is applied to all media (C2 fix).

Test fixes: HandshakeResult.session destructuring across relay/client/crypto
integration tests; video_codecs field added to all CallOffer/CallAnswer
structs; wzp-video pipeline_roundtrip integration tests added.

PRD docs: five Kimi-ready specs for E2E encryption, Android NDK 0.9 migration,
quality upgrade flow, wire-format hardening, and clippy debt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-05-25 15:30:26 +04:00
parent 01f55caa96
commit 06253fdeeb
44 changed files with 3221 additions and 163 deletions

View File

@@ -26,7 +26,7 @@ use wzp_client::audio_io::{AudioCapture, AudioPlayback};
use wzp_client::call::{CallConfig, CallEncoder};
use wzp_proto::traits::{AudioDecoder, QualityController};
use wzp_proto::{AdaptiveQualityController, CodecId, MediaTransport, QualityProfile};
use wzp_proto::{AdaptiveQualityController, CodecId, QualityProfile};
const FRAME_SAMPLES_40MS: usize = 1920;
const CAPTURE_POLL_MS: u64 = 5;
@@ -134,7 +134,7 @@ fn codec_to_profile(codec: CodecId) -> QualityProfile {
/// codec switch), and Hangup from the relay signal stream.
async fn run_signal_task(
app: tauri::AppHandle,
transport: Arc<wzp_transport::QuinnTransport>,
transport: Arc<dyn wzp_proto::MediaTransport>,
running: Arc<AtomicBool>,
pending_profile: Arc<AtomicU8>,
participants: Arc<Mutex<Vec<ParticipantInfo>>>,
@@ -250,12 +250,15 @@ pub struct CallEngine {
audio_level: Arc<AtomicU32>,
tx_codec: Arc<Mutex<String>>,
rx_codec: Arc<Mutex<String>>,
transport: Arc<wzp_transport::QuinnTransport>,
transport: Arc<dyn wzp_proto::MediaTransport>,
start_time: Instant,
fingerprint: String,
/// Keep audio handles alive for the duration of the call.
/// Wrapped in SyncWrapper because AudioUnit isn't Sync.
_audio_handle: SyncWrapper,
/// Push raw YUV frames here to be encoded and sent to peers.
/// `None` when video was not negotiated or the remote is audio-only.
pub camera_tx: Option<tokio::sync::mpsc::Sender<wzp_video::encoder::VideoFrame>>,
}
/// Phase 3b/3c DRED reconstruction state for a recv task.
@@ -479,6 +482,8 @@ impl CallEngine {
// debug log pane show first-send/first-recv/heartbeat
// events when the user has call debug logs enabled.
app: tauri::AppHandle,
active_quality: Arc<std::sync::Mutex<wzp_proto::QualityProfile>>,
peer_max_quality: Arc<std::sync::Mutex<Option<wzp_proto::QualityProfile>>>,
event_cb: F,
) -> Result<Self, anyhow::Error>
where
@@ -569,7 +574,8 @@ impl CallEngine {
// encryption, and both peers' identities were verified
// through the signal channel (DirectCallOffer/Answer carry
// identity_pub + ephemeral_pub + signature).
if !is_direct_p2p {
let quinn_transport = transport.clone();
let transport: Arc<dyn wzp_proto::MediaTransport> = if !is_direct_p2p {
crate::emit_call_debug(
&app,
"connect:handshake_start",
@@ -579,27 +585,24 @@ impl CallEngine {
"remote": transport.remote_address().to_string(),
}),
);
let _session = match wzp_client::handshake::perform_handshake(
&*transport,
&seed.0,
Some(&alias),
)
.await
{
Ok(session) => session,
Err(e) => {
error!("perform_handshake failed: {e}");
crate::emit_call_debug(
&app,
"connect:handshake_failed",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"error": e.to_string(),
}),
);
return Err(e.into());
}
};
let hs =
match wzp_client::handshake::perform_handshake(&*transport, &seed.0, Some(&alias))
.await
{
Ok(hs) => hs,
Err(e) => {
error!("perform_handshake failed: {e}");
crate::emit_call_debug(
&app,
"connect:handshake_failed",
serde_json::json!({
"t_ms": call_t0.elapsed().as_millis(),
"error": e.to_string(),
}),
);
return Err(e.into());
}
};
crate::emit_call_debug(
&app,
"connect:handshake_done",
@@ -609,14 +612,20 @@ impl CallEngine {
);
info!(
t_ms = call_t0.elapsed().as_millis(),
video_codec = ?hs.video_codec,
"first-join diag: connected to relay, handshake complete"
);
Arc::new(wzp_client::encrypted_transport::EncryptingTransport::new(
transport,
hs.session,
))
} else {
info!(
t_ms = call_t0.elapsed().as_millis(),
"first-join diag: direct P2P — skipping relay handshake (QUIC TLS is the encryption layer)"
);
}
transport
};
// Do not emit the legacy "connected" call-event here. The frontend
// ignores it and enters voice only after the command resolves; on
// Android this synchronous emit was the only operation between
@@ -802,6 +811,7 @@ impl CallEngine {
// Send task — drain Oboe capture ring, Opus-encode, push to transport.
let send_t = transport.clone();
let quinn_t = quinn_transport.clone();
let send_r = running.clone();
let send_mic = mic_muted.clone();
let send_fs = frames_sent.clone();
@@ -813,6 +823,8 @@ impl CallEngine {
let send_t0 = call_t0;
let send_app = app.clone();
let send_pending_profile = pending_profile.clone();
let send_active_quality = active_quality.clone();
let send_peer_max = peer_max_quality.clone();
tokio::spawn(async move {
let config = build_call_config(&send_quality);
let mut frame_samples = (config.profile.frame_duration_ms as usize) * 48;
@@ -832,7 +844,7 @@ impl CallEngine {
let mut frames_since_quality_report: u32 = 0;
let mut heartbeat = std::time::Instant::now();
let mut last_rms: u32 = 0;
let mut last_rms: u32;
let mut last_pkt_bytes: usize = 0;
let mut short_reads: u64 = 0;
// First-join diagnostic: latch the wall-clock offset of the
@@ -842,8 +854,28 @@ impl CallEngine {
// after returning a "started" status from audio_start.
let mut first_full_read_logged = false;
let mut first_nonzero_rms_logged = false;
let mut last_applied_profile: Option<QualityProfile> = None;
loop {
// Quality upgrade flow: apply active_quality / peer_max_quality.
let effective_profile = {
let active = send_active_quality.lock().unwrap().clone();
let peer_cap = send_peer_max.lock().unwrap().clone();
match peer_cap {
Some(cap) if cap.codec.bitrate_bps() < active.codec.bitrate_bps() => cap,
_ => active,
}
};
if Some(&effective_profile) != last_applied_profile.as_ref() {
let new_fs = (effective_profile.frame_duration_ms as usize) * 48;
info!(to = ?effective_profile.codec, frame_samples = new_fs, "quality: switching encoder profile (android)");
if encoder.set_profile(effective_profile).is_ok() {
frame_samples = new_fs;
dred_tuner.set_codec(effective_profile.codec);
*send_tx_codec.lock().await = format!("{:?}", effective_profile.codec);
last_applied_profile = Some(effective_profile);
}
}
if !send_r.load(Ordering::Relaxed) {
break;
}
@@ -948,7 +980,7 @@ impl CallEngine {
frames_since_dred_poll += 1;
if frames_since_dred_poll >= DRED_POLL_INTERVAL {
frames_since_dred_poll = 0;
let snap = send_t.quinn_path_stats();
let snap = quinn_t.quinn_path_stats();
let pq = send_t.path_quality();
if let Some(tuning) =
dred_tuner.update(snap.loss_pct, snap.rtt_ms, pq.jitter_ms)
@@ -974,7 +1006,7 @@ impl CallEngine {
frames_since_quality_report += 1;
if frames_since_quality_report >= QUALITY_REPORT_INTERVAL {
frames_since_quality_report = 0;
let snap = send_t.quinn_path_stats();
let snap = quinn_t.quinn_path_stats();
let pq = send_t.path_quality();
let report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
@@ -1023,6 +1055,7 @@ impl CallEngine {
// Recv task — decode incoming packets, push PCM into Oboe playout.
let recv_t = transport.clone();
let quinn_t = quinn_transport.clone();
let recv_r = running.clone();
let recv_spk = spk_muted.clone();
let recv_fr = frames_received.clone();
@@ -1198,7 +1231,7 @@ impl CallEngine {
recv_quality_counter += 1;
if recv_quality_counter >= QUALITY_REPORT_INTERVAL {
recv_quality_counter = 0;
let snap = recv_t.quinn_path_stats();
let snap = quinn_t.quinn_path_stats();
let pq = recv_t.path_quality();
let local_report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
@@ -1469,6 +1502,7 @@ impl CallEngine {
// is a static dlopen'd library, the audio streams live inside
// the standalone cdylib's process-global singleton.
_audio_handle: SyncWrapper(Box::new(())),
camera_tx: None, // video not yet wired on Android
})
}
@@ -1486,6 +1520,8 @@ impl CallEngine {
// Phase 6: explicit is_direct_p2p flag (see android branch).
is_direct_p2p: bool,
_app: tauri::AppHandle,
active_quality: Arc<std::sync::Mutex<wzp_proto::QualityProfile>>,
peer_max_quality: Arc<std::sync::Mutex<Option<wzp_proto::QualityProfile>>>,
event_cb: F,
) -> Result<Self, anyhow::Error>
where
@@ -1498,6 +1534,7 @@ impl CallEngine {
is_direct_p2p,
"CallEngine::start (desktop) invoked"
);
let call_t0 = Instant::now();
let _ = rustls::crypto::ring::default_provider().install_default();
let relay_addr: SocketAddr = relay.parse()?;
@@ -1546,23 +1583,35 @@ impl CallEngine {
// this because the peer is a phone, not a relay with an
// accept_handshake handler. See the android branch's
// comment for the full rationale.
if !is_direct_p2p {
let _session =
wzp_client::handshake::perform_handshake(&*transport, &seed.0, Some(&alias))
.await
.map_err(|e| {
error!("perform_handshake failed: {e}");
e
})?;
} else {
info!("direct P2P — skipping relay handshake (QUIC TLS is the encryption layer)");
}
let quinn_transport = transport.clone();
let (_negotiated_video_codec, transport): (_, Arc<dyn wzp_proto::MediaTransport>) =
if !is_direct_p2p {
let hs =
wzp_client::handshake::perform_handshake(&*transport, &seed.0, Some(&alias))
.await
.map_err(|e| {
error!("perform_handshake failed: {e}");
e
})?;
info!(video_codec = ?hs.video_codec, "handshake complete");
let enc = Arc::new(
wzp_client::encrypted_transport::EncryptingTransport::new(
transport,
hs.session,
),
);
(hs.video_codec, enc)
} else {
info!("direct P2P — skipping relay handshake (QUIC TLS is the encryption layer)");
(None, transport)
};
info!("connected to relay, handshake complete");
event_cb("connected", &format!("joined room {room}"));
// Audio I/O — VPIO (OS AEC) on macOS, plain CPAL otherwise.
// The audio handle must be stored in CallEngine to keep streams alive.
let mut vpio_stats_for_debug = None;
let (capture_ring, playout_ring, audio_handle): (_, _, Box<dyn std::any::Any + Send>) =
if _os_aec {
#[cfg(target_os = "macos")]
@@ -1571,6 +1620,7 @@ impl CallEngine {
Ok(v) => {
let cr = v.capture_ring().clone();
let pr = v.playout_ring().clone();
vpio_stats_for_debug = Some(v.stats());
info!("using VoiceProcessingIO (OS AEC)");
(cr, pr, Box::new(v))
}
@@ -1615,8 +1665,38 @@ impl CallEngine {
let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE));
let auto_profile = resolve_quality(&quality).is_none();
if let Some(vpio_stats) = vpio_stats_for_debug {
let app = _app.clone();
let running = running.clone();
tokio::spawn(async move {
while running.load(Ordering::Relaxed) {
tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS))
.await;
let s = vpio_stats.snapshot();
crate::emit_call_debug(
&app,
"vpio:render_heartbeat",
serde_json::json!({
"capture_callbacks": s.capture_callbacks,
"capture_samples": s.capture_samples,
"render_callbacks": s.render_callbacks,
"render_requested_samples": s.render_requested_samples,
"render_read_samples": s.render_read_samples,
"render_underrun_callbacks": s.render_underrun_callbacks,
"render_nonzero_callbacks": s.render_nonzero_callbacks,
"render_last_requested": s.render_last_requested,
"render_last_read": s.render_last_read,
"render_last_rms": s.render_last_rms,
"render_last_ring_available": s.render_last_ring_available,
}),
);
}
});
}
// Send task
let send_t = transport.clone();
let quinn_t = quinn_transport.clone();
let send_r = running.clone();
let send_mic = mic_muted.clone();
let send_fs = frames_sent.clone();
@@ -1625,6 +1705,10 @@ impl CallEngine {
let send_quality = quality.clone();
let send_tx_codec = tx_codec.clone();
let send_pending_profile = pending_profile.clone();
let send_app = _app.clone();
let send_t0 = call_t0;
let send_active_quality = active_quality.clone();
let send_peer_max = peer_max_quality.clone();
tokio::spawn(async move {
let config = build_call_config(&send_quality);
let mut frame_samples = (config.profile.frame_duration_ms as usize) * 48;
@@ -1638,12 +1722,37 @@ impl CallEngine {
let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec);
let mut frames_since_dred_poll: u32 = 0;
let mut frames_since_quality_report: u32 = 0;
let mut heartbeat = std::time::Instant::now();
let mut last_rms: u32;
let mut last_pkt_bytes: usize = 0;
let mut short_reads: u64 = 0;
let mut last_applied_profile: Option<QualityProfile> = None;
loop {
// Quality upgrade flow: apply active_quality / peer_max_quality.
let effective_profile = {
let active = send_active_quality.lock().unwrap().clone();
let peer_cap = send_peer_max.lock().unwrap().clone();
match peer_cap {
Some(cap) if cap.codec.bitrate_bps() < active.codec.bitrate_bps() => cap,
_ => active,
}
};
if Some(&effective_profile) != last_applied_profile.as_ref() {
let new_fs = (effective_profile.frame_duration_ms as usize) * 48;
info!(to = ?effective_profile.codec, frame_samples = new_fs, "quality: switching encoder profile (desktop)");
if encoder.set_profile(effective_profile).is_ok() {
frame_samples = new_fs;
dred_tuner.set_codec(effective_profile.codec);
*send_tx_codec.lock().await = format!("{:?}", effective_profile.codec);
last_applied_profile = Some(effective_profile);
}
}
if !send_r.load(Ordering::Relaxed) {
break;
}
if capture_ring.available() < frame_samples {
short_reads += 1;
tokio::time::sleep(std::time::Duration::from_millis(CAPTURE_POLL_MS)).await;
continue;
}
@@ -1655,6 +1764,7 @@ impl CallEngine {
let sum_sq: f64 = pcm.iter().map(|&s| (s as f64) * (s as f64)).sum();
let rms = (sum_sq / pcm.len() as f64).sqrt() as u32;
send_level.store(rms, Ordering::Relaxed);
last_rms = rms;
}
if send_mic.load(Ordering::Relaxed) {
@@ -1663,6 +1773,7 @@ impl CallEngine {
match encoder.encode_frame(&buf[..frame_samples]) {
Ok(pkts) => {
for pkt in &pkts {
last_pkt_bytes = pkt.payload.len();
if let Err(e) = send_t.send_media(pkt).await {
// Transient congestion (Blocked) — drop packet, keep going
send_drops.fetch_add(1, Ordering::Relaxed);
@@ -1671,7 +1782,17 @@ impl CallEngine {
}
}
}
send_fs.fetch_add(1, Ordering::Relaxed);
let before = send_fs.fetch_add(1, Ordering::Relaxed);
if before == 0 {
crate::emit_call_debug(
&send_app,
"media:first_send",
serde_json::json!({
"t_ms": send_t0.elapsed().as_millis() as u64,
"pkt_bytes": last_pkt_bytes,
}),
);
}
}
Err(e) => error!("encode: {e}"),
}
@@ -1696,7 +1817,7 @@ impl CallEngine {
frames_since_dred_poll += 1;
if frames_since_dred_poll >= DRED_POLL_INTERVAL {
frames_since_dred_poll = 0;
let snap = send_t.quinn_path_stats();
let snap = quinn_t.quinn_path_stats();
let pq = send_t.path_quality();
if let Some(tuning) =
dred_tuner.update(snap.loss_pct, snap.rtt_ms, pq.jitter_ms)
@@ -1710,7 +1831,7 @@ impl CallEngine {
frames_since_quality_report += 1;
if frames_since_quality_report >= QUALITY_REPORT_INTERVAL {
frames_since_quality_report = 0;
let snap = send_t.quinn_path_stats();
let snap = quinn_t.quinn_path_stats();
let pq = send_t.path_quality();
let report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
@@ -1719,16 +1840,37 @@ impl CallEngine {
);
encoder.set_pending_quality_report(report);
}
if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) {
let fs = send_fs.load(Ordering::Relaxed);
let drops = send_drops.load(Ordering::Relaxed);
crate::emit_call_debug(
&send_app,
"media:send_heartbeat",
serde_json::json!({
"frames_sent": fs,
"last_rms": last_rms,
"last_pkt_bytes": last_pkt_bytes,
"short_reads": short_reads,
"drops": drops,
"last_send_err": serde_json::Value::Null,
}),
);
heartbeat = std::time::Instant::now();
}
}
});
// Recv task (direct playout with auto codec switch)
let recv_t = transport.clone();
let quinn_t = quinn_transport.clone();
let recv_r = running.clone();
let recv_spk = spk_muted.clone();
let recv_fr = frames_received.clone();
let recv_rx_codec = rx_codec.clone();
let pending_profile_recv = pending_profile.clone();
let recv_app = _app.clone();
let recv_t0 = call_t0;
tokio::spawn(async move {
let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD);
// Phase 3b/3c: concrete AdaptiveDecoder (not Box<dyn>) so we
@@ -1743,6 +1885,18 @@ impl CallEngine {
let mut dred_recv = DredRecvState::new();
let mut quality_ctrl = AdaptiveQualityController::new();
let mut recv_quality_counter: u32 = 0;
let mut heartbeat = std::time::Instant::now();
let mut first_packet_logged = false;
let mut video_reassembler = wzp_video::transport::VideoReassembler::new();
let mut video_decoder: Option<Box<dyn wzp_video::decoder::VideoDecoder>> = None;
let mut video_decoder_codec: Option<wzp_proto::CodecId> = None;
let mut decoded_frames: u64 = 0;
let mut decode_errs: u64 = 0;
let mut last_written: usize = 0;
let mut written_samples: u64 = 0;
let mut last_recv_fr_for_watchdog: u64 = 0;
let mut no_recv_ticks: u32 = 0;
let mut media_degraded_emitted = false;
loop {
if !recv_r.load(Ordering::Relaxed) {
@@ -1755,6 +1909,74 @@ impl CallEngine {
.await
{
Ok(Ok(Some(pkt))) => {
// Route video packets to the reassembler before any audio processing.
if pkt.header.media_type == wzp_proto::MediaType::Video {
if let Some((codec_id, is_kf, frame)) =
video_reassembler.push(&pkt)
{
// Lazy-init or switch decoder on codec change.
if video_decoder_codec != Some(codec_id) {
match wzp_video::factory::create_video_decoder(codec_id, 1280, 720) {
Ok(d) => {
info!(codec = ?codec_id, "video decoder created");
video_decoder = Some(d);
video_decoder_codec = Some(codec_id);
}
Err(e) => {
error!("video decoder init failed: {e}");
}
}
}
if let Some(ref mut dec) = video_decoder {
match dec.decode(&frame) {
Ok(Some(yuv_frame)) => {
recv_fr.fetch_add(1, Ordering::Relaxed);
// Emit video frame to WebView for rendering.
// Always-on (not gated on debug flag) so the UI can show video.
let jpeg_b64 = crate::i420_to_jpeg_b64(
&yuv_frame.data,
yuv_frame.width,
yuv_frame.height,
);
let _ = recv_app.emit(
"video:frame",
serde_json::json!({
"is_keyframe": is_kf,
"width": yuv_frame.width,
"height": yuv_frame.height,
"jpeg_b64": jpeg_b64,
"codec": format!("{:?}", codec_id),
}),
);
}
Ok(None) => {} // decoder buffering — no output yet
Err(e) => {
error!("video decode error: {e}");
}
}
}
// Evict stale partial frames every ~10 frames received.
video_reassembler.evict_stale(
pkt.header.timestamp,
5_000,
);
}
continue; // video packet handled — skip audio path
}
if !first_packet_logged {
first_packet_logged = true;
crate::emit_call_debug(
&recv_app,
"media:first_recv",
serde_json::json!({
"t_ms": recv_t0.elapsed().as_millis() as u64,
"codec": format!("{:?}", pkt.header.codec_id),
"payload_bytes": pkt.payload.len(),
"is_repair": pkt.header.is_repair(),
}),
);
}
if !pkt.header.is_repair() && pkt.header.codec_id != CodecId::ComfortNoise {
// Track RX codec
{
@@ -1812,7 +2034,7 @@ impl CallEngine {
recv_quality_counter += 1;
if recv_quality_counter >= QUALITY_REPORT_INTERVAL {
recv_quality_counter = 0;
let snap = recv_t.quinn_path_stats();
let snap = quinn_t.quinn_path_stats();
let pq = recv_t.path_quality();
let local_report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
@@ -1828,10 +2050,21 @@ impl CallEngine {
}
}
if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) {
agc.process_frame(&mut pcm[..n]);
if !recv_spk.load(Ordering::Relaxed) {
playout_ring.write(&pcm[..n]);
match decoder.decode(&pkt.payload, &mut pcm) {
Ok(n) => {
decoded_frames += 1;
agc.process_frame(&mut pcm[..n]);
if !recv_spk.load(Ordering::Relaxed) {
playout_ring.write(&pcm[..n]);
last_written = n;
written_samples = written_samples.saturating_add(n as u64);
}
}
Err(e) => {
decode_errs += 1;
if decode_errs <= 3 {
tracing::warn!("decode error: {e}");
}
}
}
}
@@ -1847,6 +2080,63 @@ impl CallEngine {
}
Err(_) => {}
}
if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) {
let fr = recv_fr.load(Ordering::Relaxed);
crate::emit_call_debug(
&recv_app,
"media:recv_heartbeat",
serde_json::json!({
"recv_fr": fr,
"decoded_frames": decoded_frames,
"last_written": last_written,
"written_samples": written_samples,
"decode_errs": decode_errs,
"codec": format!("{:?}", current_codec),
}),
);
if fr == last_recv_fr_for_watchdog {
no_recv_ticks += 1;
} else {
no_recv_ticks = 0;
if media_degraded_emitted {
media_degraded_emitted = false;
let _ = recv_app.emit(
"call-event",
serde_json::json!({
"kind": "media-recovered",
}),
);
crate::emit_call_debug(
&recv_app,
"media:recovered",
serde_json::json!({}),
);
}
}
last_recv_fr_for_watchdog = fr;
if no_recv_ticks >= 3 && !media_degraded_emitted {
media_degraded_emitted = true;
let _ = recv_app.emit(
"call-event",
serde_json::json!({
"kind": "media-degraded",
}),
);
crate::emit_call_debug(
&recv_app,
"media:no_recv_timeout",
serde_json::json!({
"recv_fr": fr,
"no_recv_ticks": no_recv_ticks,
}),
);
}
heartbeat = std::time::Instant::now();
}
}
});
@@ -1861,6 +2151,77 @@ impl CallEngine {
event_cb.clone(),
));
// Video send task — active only when the handshake negotiated a video codec.
// Camera frames arrive via camera_tx; the task encodes and packetizes them.
// Blocker 4 (camera capture) will push frames into this channel.
let camera_tx = if let Some(vid_codec) = _negotiated_video_codec {
let (tx, mut rx) = tokio::sync::mpsc::channel::<wzp_video::encoder::VideoFrame>(4);
let vid_transport = transport.clone();
let vid_running = running.clone();
let vid_t0 = call_t0;
let vid_app = _app.clone();
tokio::spawn(async move {
let mut encoder = match wzp_video::factory::create_video_encoder(
vid_codec, 1280, 720, 1_500_000,
) {
Ok(e) => e,
Err(e) => {
error!("video encoder init failed: {e}");
return;
}
};
let mut seq: u32 = 0;
let mut frames_since_keyframe: u32 = 0;
info!(codec = ?vid_codec, "video send task started");
while vid_running.load(Ordering::Relaxed) {
let frame = match tokio::time::timeout(
std::time::Duration::from_millis(200),
rx.recv(),
)
.await
{
Ok(Some(f)) => f,
Ok(None) => break, // sender dropped
Err(_) => continue, // no frame yet — keep looping
};
if frames_since_keyframe >= 150 {
encoder.request_keyframe();
frames_since_keyframe = 0;
}
let encoded = match encoder.encode(&frame) {
Ok(b) => b,
Err(e) => {
error!("video encode error: {e}");
continue;
}
};
let is_keyframe = encoder.is_keyframe(&encoded);
let ts_ms = vid_t0.elapsed().as_millis() as u32;
let pkts = wzp_video::transport::packetize_video_frame(
&encoded, vid_codec, is_keyframe, &mut seq, ts_ms,
);
for pkt in &pkts {
if let Err(e) = vid_transport.send_media(pkt).await {
crate::emit_call_debug(
&vid_app,
"video:send_error",
serde_json::json!({"error": e.to_string()}),
);
break;
}
}
frames_since_keyframe += 1;
}
info!("video send task exited");
});
Some(tx)
} else {
None
};
Ok(Self {
running,
mic_muted,
@@ -1875,6 +2236,7 @@ impl CallEngine {
tx_codec,
rx_codec,
_audio_handle: SyncWrapper(audio_handle),
camera_tx,
})
}
@@ -1949,3 +2311,101 @@ impl Drop for CallEngine {
self.running.store(false, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex as StdMutex};
use async_trait::async_trait;
use bytes::Bytes;
use wzp_client::encrypted_transport::EncryptingTransport;
use wzp_crypto::ChaChaSession;
use wzp_proto::{
CodecId, CryptoSession, MediaHeader, MediaPacket, MediaTransport, MediaType, PathQuality,
SignalMessage, TransportError,
};
struct LoopbackTransport {
sent: StdMutex<Vec<MediaPacket>>,
}
impl LoopbackTransport {
fn new() -> Arc<Self> {
Arc::new(Self {
sent: StdMutex::new(Vec::new()),
})
}
fn take_sent(&self) -> Vec<MediaPacket> {
self.sent.lock().unwrap().drain(..).collect()
}
}
#[async_trait]
impl MediaTransport for LoopbackTransport {
async fn send_media(&self, packet: &MediaPacket) -> Result<(), TransportError> {
self.sent.lock().unwrap().push(packet.clone());
Ok(())
}
async fn recv_media(&self) -> Result<Option<MediaPacket>, TransportError> {
Ok(None)
}
async fn send_signal(&self, _msg: &SignalMessage) -> Result<(), TransportError> {
Ok(())
}
async fn recv_signal(&self) -> Result<Option<SignalMessage>, TransportError> {
Ok(None)
}
fn path_quality(&self) -> PathQuality {
PathQuality::default()
}
async fn close(&self) -> Result<(), TransportError> {
Ok(())
}
}
fn make_header(seq: u32) -> MediaHeader {
MediaHeader {
version: 2,
flags: 0,
media_type: MediaType::Audio,
codec_id: CodecId::Opus24k,
stream_id: 0,
fec_ratio: 0,
seq,
timestamp: seq * 20,
fec_block: 0,
}
}
#[tokio::test]
async fn relay_path_encrypts_media_payload() {
// Simulate the exact wrapping pattern used in engine.rs for the relay path.
let key = [0x42u8; 32];
let session: Box<dyn CryptoSession> = Box::new(ChaChaSession::new(key));
let inner = LoopbackTransport::new();
let transport: Arc<dyn MediaTransport> =
Arc::new(EncryptingTransport::new(inner.clone(), session));
let header = make_header(1);
let plaintext = b"secret audio frame";
let pkt = MediaPacket {
header,
payload: Bytes::from_static(plaintext),
quality_report: None,
};
transport.send_media(&pkt).await.unwrap();
let sent = inner.take_sent();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0].header, header, "header must be preserved");
assert_ne!(
sent[0].payload.as_ref(),
plaintext.as_ref(),
"plaintext must not appear on wire"
);
// Ciphertext is longer by exactly the AEAD tag (16 bytes)
assert_eq!(sent[0].payload.len(), plaintext.len() + 16);
}
}