diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 8aa3d70..ff556fd 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -14,8 +14,10 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use bytes::Bytes; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; +use wzp_codec::AdaptiveDecoder; use wzp_codec::agc::AutoGainControl; +use wzp_codec::dred_ffi::{DredDecoderHandle, DredState}; use wzp_crypto::{KeyExchange, WarzoneKeyExchange}; use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder}; use wzp_proto::{ @@ -530,9 +532,12 @@ async fn run_call( stats.state = CallState::Active; } - // Initialize codec (Opus or Codec2 based on profile) + // Initialize codec (Opus or Codec2 based on profile). + // Phase 3c: decoder is a concrete AdaptiveDecoder (not Box) so the recv task can call reconstruct_from_dred on + // gaps detected via sequence tracking. let mut encoder = wzp_codec::create_encoder(profile); - let mut decoder = wzp_codec::create_decoder(profile); + let mut decoder = AdaptiveDecoder::new(profile).expect("failed to create adaptive decoder"); // Initialize FEC encoder/decoder let mut fec_enc = wzp_fec::create_encoder(&profile); @@ -824,7 +829,27 @@ async fn run_call( let mut last_stats_log = Instant::now(); let mut quality_ctrl = AdaptiveQualityController::new(); let mut last_peer_codec: Option = None; - info!("recv task started (Opus + RaptorQ FEC)"); + + // Phase 3c: DRED reconstruction state. Unlike the desktop + // CallDecoder (which sits behind a jitter buffer that emits + // Missing signals), engine.rs reads packets directly from the + // transport and decodes straight into the playout ring. Gap + // detection is therefore done via sequence-number tracking: + // when a packet arrives with seq > expected_seq, the frames in + // between are missing and we attempt to reconstruct them via + // DRED before decoding the newly-arrived packet. + let mut dred_decoder = + DredDecoderHandle::new().expect("opus_dred_decoder_create failed"); + let mut dred_parse_scratch = + DredState::new().expect("opus_dred_alloc failed (scratch)"); + let mut last_good_dred = + DredState::new().expect("opus_dred_alloc failed (good state)"); + let mut last_good_dred_seq: Option = None; + let mut expected_seq: Option = None; + let mut dred_reconstructions: u64 = 0; + let mut classical_plc_invocations: u64 = 0; + + info!("recv task started (Opus + DRED + Codec2/RaptorQ)"); loop { if !state.running.load(Ordering::Relaxed) { break; @@ -903,6 +928,13 @@ async fn run_call( }; info!(from = ?decoder.codec_id(), to = ?pkt.header.codec_id, "recv: switching decoder"); let _ = decoder.set_profile(switch_profile); + // Profile switch invalidates the cached DRED + // state because samples_available is measured + // in the old profile's sample rate. Reset the + // tracking so we don't try to reconstruct with + // stale offsets. + last_good_dred_seq = None; + expected_seq = None; } // Track peer codec for UI display if last_peer_codec != Some(pkt.header.codec_id) { @@ -911,6 +943,109 @@ async fn run_call( stats.peer_codec = format!("{:?}", pkt.header.codec_id); } } + + // Phase 3c: Opus path — parse DRED state out of + // the current packet FIRST so last_good_dred + // reflects the freshest available reconstruction + // source, then attempt gap recovery against it + // BEFORE decoding this packet's audio. Ordering + // matters because the playout ring is FIFO — gap + // samples must be written before this packet's + // samples, which come next. + if pkt_is_opus { + // Update DRED state from the current packet. + match dred_decoder.parse_into(&mut dred_parse_scratch, &pkt.payload) { + Ok(available) if available > 0 => { + std::mem::swap( + &mut dred_parse_scratch, + &mut last_good_dred, + ); + last_good_dred_seq = Some(pkt.header.seq); + } + Ok(_) => { + // Packet carried no DRED — keep cached state. + } + Err(e) => { + debug!("DRED parse error (ignored): {e}"); + } + } + + // Detect and fill gap from last-expected to this packet. + const MAX_GAP_FRAMES: u16 = 16; + if let Some(expected) = expected_seq { + let gap = pkt.header.seq.wrapping_sub(expected); + if gap > 0 && gap <= MAX_GAP_FRAMES { + let current_profile_frame_samples = + (48_000 * profile.frame_duration_ms as i32) / 1000; + let available = last_good_dred.samples_available(); + let pcm_slice_len = + current_profile_frame_samples as usize; + + for gap_idx in 0..gap { + let missing_seq = expected.wrapping_add(gap_idx); + // Offset from the DRED anchor (last_good_dred_seq) + // back to the missing seq, in samples. Skip if + // the anchor is not ahead of missing (defensive). + let offset_samples = match last_good_dred_seq { + Some(anchor) => { + let delta = anchor.wrapping_sub(missing_seq); + if delta == 0 || delta > MAX_GAP_FRAMES { + -1 // skip DRED, use PLC + } else { + delta as i32 * current_profile_frame_samples + } + } + None => -1, + }; + + let reconstructed = if offset_samples > 0 + && offset_samples <= available + { + decoder + .reconstruct_from_dred( + &last_good_dred, + offset_samples, + &mut decode_buf[..pcm_slice_len], + ) + .ok() + } else { + None + }; + + match reconstructed { + Some(samples) => { + playout_agc.process_frame( + &mut decode_buf[..samples], + ); + state + .playout_ring + .write(&decode_buf[..samples]); + dred_reconstructions += 1; + frames_decoded += 1; + } + None => { + // Fall through to classical PLC. + if let Ok(samples) = + decoder.decode_lost(&mut decode_buf) + { + playout_agc + .process_frame(&mut decode_buf[..samples]); + state + .playout_ring + .write(&decode_buf[..samples]); + classical_plc_invocations += 1; + frames_decoded += 1; + } + } + } + } + } + } + + // Advance the expected-seq tracker for the next arrival. + expected_seq = Some(pkt.header.seq.wrapping_add(1)); + } + match decoder.decode(&pkt.payload, &mut decode_buf) { Ok(samples) => { playout_agc.process_frame(&mut decode_buf[..samples]); @@ -922,6 +1057,9 @@ async fn run_call( if let Ok(samples) = decoder.decode_lost(&mut decode_buf) { playout_agc.process_frame(&mut decode_buf[..samples]); state.playout_ring.write(&decode_buf[..samples]); + // This is a decode-error fallback (not a + // detected gap), so count it as PLC. + classical_plc_invocations += 1; } } } @@ -955,6 +1093,8 @@ async fn run_call( let mut stats = state.stats.lock().unwrap(); stats.frames_decoded = frames_decoded; stats.fec_recovered = fec_recovered; + stats.dred_reconstructions = dred_reconstructions; + stats.classical_plc_invocations = classical_plc_invocations; drop(stats); // Periodic stats every 5 seconds @@ -962,6 +1102,8 @@ async fn run_call( info!( frames_decoded, fec_recovered, + dred_reconstructions, + classical_plc_invocations, recv_errors, max_recv_gap_ms, playout_avail = state.playout_ring.available(), diff --git a/crates/wzp-android/src/stats.rs b/crates/wzp-android/src/stats.rs index 7c162af..4fdf3a2 100644 --- a/crates/wzp-android/src/stats.rs +++ b/crates/wzp-android/src/stats.rs @@ -58,8 +58,16 @@ pub struct CallStats { pub frames_decoded: u64, /// Number of playout underruns (buffer empty when audio needed). pub underruns: u64, - /// Frames recovered by FEC. + /// Frames recovered by RaptorQ FEC (Codec2 tiers only; Opus bypasses + /// RaptorQ per Phase 2). pub fec_recovered: u64, + /// Phase 3c: Opus frames reconstructed via DRED side-channel data. + /// Only increments on the Opus tiers; always zero for Codec2. + pub dred_reconstructions: u64, + /// Phase 3c: Opus frames filled via classical Opus PLC because no DRED + /// state covered the gap, plus any decode-error fallbacks. Codec2 loss + /// also increments this counter via the Codec2 PLC path. + pub classical_plc_invocations: u64, /// Playout ring overflow count (reader was lapped by writer). pub playout_overflows: u64, /// Playout ring underrun count (reader found empty buffer).