diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 134c019..8aa3d70 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -665,6 +665,19 @@ async fn run_call( t_opus_us += t0.elapsed().as_micros() as u64; let encoded = &encode_buf[..encoded_len]; + // Phase 2: Opus tiers bypass RaptorQ (DRED handles loss recovery + // at the codec layer). Codec2 tiers keep RaptorQ unchanged. + let is_opus = current_profile.codec.is_opus(); + let (hdr_fec_block, hdr_fec_symbol, hdr_fec_ratio) = if is_opus { + (0u8, 0u8, 0u8) + } else { + ( + block_id, + frame_in_block, + MediaHeader::encode_fec_ratio(current_profile.fec_ratio), + ) + }; + // Build source packet let s = seq.fetch_add(1, Ordering::Relaxed); let t = ts.fetch_add(frame_samples as u32, Ordering::Relaxed); @@ -675,11 +688,11 @@ async fn run_call( is_repair: false, codec_id: current_profile.codec, has_quality_report: false, - fec_ratio_encoded: MediaHeader::encode_fec_ratio(current_profile.fec_ratio), + fec_ratio_encoded: hdr_fec_ratio, seq: s, timestamp: t, - fec_block: block_id, - fec_symbol: frame_in_block, + fec_block: hdr_fec_block, + fec_symbol: hdr_fec_symbol, reserved: 0, csrc_count: 0, }, @@ -709,63 +722,66 @@ async fn run_call( t_send_us += t0.elapsed().as_micros() as u64; frames_sent += 1; - // Feed encoded frame to FEC encoder + // Codec2-only: feed RaptorQ and emit repair packets when the + // block is full. Opus tiers skip this entire block — DRED + // (enabled in Phase 1) provides codec-layer loss recovery. let t0 = Instant::now(); - if let Err(e) = fec_enc.add_source_symbol(encoded) { - warn!("fec add_source error: {e}"); - } - frame_in_block += 1; + if !is_opus { + if let Err(e) = fec_enc.add_source_symbol(encoded) { + warn!("fec add_source error: {e}"); + } + frame_in_block += 1; - // When block is full, generate repair packets - if frame_in_block >= current_profile.frames_per_block { - match fec_enc.generate_repair(current_profile.fec_ratio) { - Ok(repairs) => { - let repair_count = repairs.len(); - for (sym_idx, repair_data) in repairs { - let rs = seq.fetch_add(1, Ordering::Relaxed); - let repair_pkt = MediaPacket { - header: MediaHeader { - version: 0, - is_repair: true, - codec_id: current_profile.codec, - has_quality_report: false, - fec_ratio_encoded: MediaHeader::encode_fec_ratio( - current_profile.fec_ratio, - ), - seq: rs, - timestamp: t, - fec_block: block_id, - fec_symbol: sym_idx, - reserved: 0, - csrc_count: 0, - }, - payload: Bytes::from(repair_data), - quality_report: None, - }; - // Drop repair packets on error — never break - if let Err(_e) = transport.send_media(&repair_pkt).await { - send_errors += 1; - frames_dropped += 1; - // Don't log every repair failure — source error log covers it + if frame_in_block >= current_profile.frames_per_block { + match fec_enc.generate_repair(current_profile.fec_ratio) { + Ok(repairs) => { + let repair_count = repairs.len(); + for (sym_idx, repair_data) in repairs { + let rs = seq.fetch_add(1, Ordering::Relaxed); + let repair_pkt = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: true, + codec_id: current_profile.codec, + has_quality_report: false, + fec_ratio_encoded: MediaHeader::encode_fec_ratio( + current_profile.fec_ratio, + ), + seq: rs, + timestamp: t, + fec_block: block_id, + fec_symbol: sym_idx, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(repair_data), + quality_report: None, + }; + // Drop repair packets on error — never break + if let Err(_e) = transport.send_media(&repair_pkt).await { + send_errors += 1; + frames_dropped += 1; + // Don't log every repair failure — source error log covers it + } + } + if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) { + info!( + block_id, + repair_count, + fec_ratio = current_profile.fec_ratio, + "FEC block complete" + ); } } - if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) { - info!( - block_id, - repair_count, - fec_ratio = current_profile.fec_ratio, - "FEC block complete" - ); + Err(e) => { + warn!("fec generate_repair error: {e}"); } } - Err(e) => { - warn!("fec generate_repair error: {e}"); - } - } - let _ = fec_enc.finalize_block(); - block_id = block_id.wrapping_add(1); - frame_in_block = 0; + let _ = fec_enc.finalize_block(); + block_id = block_id.wrapping_add(1); + frame_in_block = 0; + } } t_fec_us += t0.elapsed().as_micros() as u64; t_frames += 1; @@ -850,14 +866,21 @@ async fn run_call( let is_repair = pkt.header.is_repair; let pkt_block = pkt.header.fec_block; let pkt_symbol = pkt.header.fec_symbol; + let pkt_is_opus = pkt.header.codec_id.is_opus(); - // Feed every packet (source + repair) to FEC decoder - let _ = fec_dec.add_symbol( - pkt_block, - pkt_symbol, - is_repair, - &pkt.payload, - ); + // Phase 2: Opus packets bypass RaptorQ entirely — DRED + // (enabled Phase 1) handles codec-layer loss recovery, + // and feeding these symbols into the RaptorQ decoder + // would accumulate block_id=0 duplicates that never + // decode. Codec2 packets still feed RaptorQ. + if !pkt_is_opus { + let _ = fec_dec.add_symbol( + pkt_block, + pkt_symbol, + is_repair, + &pkt.payload, + ); + } // Source packets: decode directly if !is_repair && pkt.header.codec_id != CodecId::ComfortNoise { @@ -904,22 +927,29 @@ async fn run_call( } } - // Try FEC recovery - if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) { - fec_recovered += recovered_frames.len() as u64; - if fec_recovered % 50 == 1 { - info!( - fec_recovered, - block = pkt_block, - frames = recovered_frames.len(), - "FEC block recovered" - ); + // Codec2-only: try FEC recovery and expire old blocks. + // Opus packets skip both — the Phase 2 Opus path has no + // RaptorQ state to query or clean up. The `fec_recovered` + // counter is now effectively Codec2-only, which is + // correct because DRED reconstructions will be counted + // separately once Phase 3 lands (new telemetry field). + if !pkt_is_opus { + if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) { + fec_recovered += recovered_frames.len() as u64; + if fec_recovered % 50 == 1 { + info!( + fec_recovered, + block = pkt_block, + frames = recovered_frames.len(), + "FEC block recovered" + ); + } } - } - // Expire old blocks to prevent memory growth - if pkt_block > 3 { - fec_dec.expire_before(pkt_block.wrapping_sub(3)); + // Expire old blocks to prevent memory growth + if pkt_block > 3 { + fec_dec.expire_before(pkt_block.wrapping_sub(3)); + } } let mut stats = state.stats.lock().unwrap(); diff --git a/crates/wzp-client/src/call.rs b/crates/wzp-client/src/call.rs index 9605230..de5d140 100644 --- a/crates/wzp-client/src/call.rs +++ b/crates/wzp-client/src/call.rs @@ -344,6 +344,22 @@ impl CallEncoder { let enc_len = self.audio_enc.encode(pcm, &mut encoded)?; encoded.truncate(enc_len); + // Phase 2: Opus tiers bypass RaptorQ entirely (DRED handles loss + // recovery at the codec layer). Codec2 tiers keep RaptorQ unchanged. + // On Opus packets, zero the FEC header fields so old receivers + // can cleanly identify "no RaptorQ block to assemble" and new + // receivers can short-circuit their FEC ingest path. + let is_opus = self.profile.codec.is_opus(); + let (fec_block, fec_symbol, fec_ratio_encoded) = if is_opus { + (0u8, 0u8, 0u8) + } else { + ( + self.block_id, + self.frame_in_block, + MediaHeader::encode_fec_ratio(self.profile.fec_ratio), + ) + }; + // Build source media packet let source_pkt = MediaPacket { header: MediaHeader { @@ -351,11 +367,11 @@ impl CallEncoder { is_repair: false, codec_id: self.profile.codec, has_quality_report: false, - fec_ratio_encoded: MediaHeader::encode_fec_ratio(self.profile.fec_ratio), + fec_ratio_encoded, seq: self.seq, timestamp: self.timestamp_ms, - fec_block: self.block_id, - fec_symbol: self.frame_in_block, + fec_block, + fec_symbol, reserved: 0, csrc_count: 0, }, @@ -370,39 +386,42 @@ impl CallEncoder { let mut output = vec![source_pkt]; - // Add to FEC encoder - self.fec_enc.add_source_symbol(&encoded)?; - self.frame_in_block += 1; + // Codec2-only: feed RaptorQ and generate repair packets when the + // block is full. Opus tiers skip this entire block — DRED (active + // in Phase 1) provides codec-layer loss recovery. + if !is_opus { + self.fec_enc.add_source_symbol(&encoded)?; + self.frame_in_block += 1; - // If block is full, generate repair and finalize - if self.frame_in_block >= self.profile.frames_per_block { - if let Ok(repairs) = self.fec_enc.generate_repair(self.profile.fec_ratio) { - for (sym_idx, repair_data) in repairs { - output.push(MediaPacket { - header: MediaHeader { - version: 0, - is_repair: true, - codec_id: self.profile.codec, - has_quality_report: false, - fec_ratio_encoded: MediaHeader::encode_fec_ratio( - self.profile.fec_ratio, - ), - seq: self.seq, - timestamp: self.timestamp_ms, - fec_block: self.block_id, - fec_symbol: sym_idx, - reserved: 0, - csrc_count: 0, - }, - payload: Bytes::from(repair_data), - quality_report: None, - }); - self.seq = self.seq.wrapping_add(1); + if self.frame_in_block >= self.profile.frames_per_block { + if let Ok(repairs) = self.fec_enc.generate_repair(self.profile.fec_ratio) { + for (sym_idx, repair_data) in repairs { + output.push(MediaPacket { + header: MediaHeader { + version: 0, + is_repair: true, + codec_id: self.profile.codec, + has_quality_report: false, + fec_ratio_encoded: MediaHeader::encode_fec_ratio( + self.profile.fec_ratio, + ), + seq: self.seq, + timestamp: self.timestamp_ms, + fec_block: self.block_id, + fec_symbol: sym_idx, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(repair_data), + quality_report: None, + }); + self.seq = self.seq.wrapping_add(1); + } } + let _ = self.fec_enc.finalize_block(); + self.block_id = self.block_id.wrapping_add(1); + self.frame_in_block = 0; } - let _ = self.fec_enc.finalize_block(); - self.block_id = self.block_id.wrapping_add(1); - self.frame_in_block = 0; } Ok(output) @@ -486,15 +505,23 @@ impl CallDecoder { /// Feed a received media packet into the decode pipeline. pub fn ingest(&mut self, packet: MediaPacket) { - // Feed to FEC decoder - let _ = self.fec_dec.add_symbol( - packet.header.fec_block, - packet.header.fec_symbol, - packet.header.is_repair, - &packet.payload, - ); + // Phase 2: Opus packets bypass RaptorQ. Codec2 packets still feed + // the FEC decoder for recovery. This also cleanly drops any stray + // Opus repair packets from an old sender (we don't push repair + // packets to the jitter buffer either, so they're effectively + // ignored — a graceful mixed-version degradation). + if !packet.header.codec_id.is_opus() { + let _ = self.fec_dec.add_symbol( + packet.header.fec_block, + packet.header.fec_symbol, + packet.header.is_repair, + &packet.payload, + ); + } - // If not a repair packet, also feed directly to jitter buffer + // Source packets (Opus or Codec2) go to the jitter buffer for decode. + // Repair packets never reach the jitter buffer; for Codec2 they're + // used by the FEC decoder above, for Opus they're dropped here. if !packet.header.is_repair { self.jitter.push(packet); } @@ -673,18 +700,83 @@ mod tests { assert!(!packets[0].header.is_repair); } + /// Phase 2: Opus packets have zero FEC header fields — no block, no + /// symbol index, no repair ratio. The RaptorQ layer is bypassed + /// entirely on the Opus tiers. #[test] - fn encoder_generates_repair_on_full_block() { + fn opus_source_packets_have_zero_fec_header_fields() { let config = CallConfig { - profile: QualityProfile::GOOD, // 5 frames/block + profile: QualityProfile::GOOD, // Opus 24k + suppression_enabled: false, // skip silence gate for this test ..Default::default() }; let mut enc = CallEncoder::new(&config); - let pcm = vec![0i16; 960]; + // Non-silent sine wave so silence detection doesn't suppress us + // even with suppression_enabled=false (belt and braces). + let pcm: Vec = (0..960) + .map(|i| ((i as f32 * 0.1).sin() * 10_000.0) as i16) + .collect(); + let packets = enc.encode_frame(&pcm).unwrap(); + assert_eq!(packets.len(), 1, "Opus must emit exactly 1 source packet"); + let hdr = &packets[0].header; + assert!(hdr.codec_id.is_opus()); + assert!(!hdr.is_repair); + assert_eq!(hdr.fec_block, 0, "Opus fec_block must be 0"); + assert_eq!(hdr.fec_symbol, 0, "Opus fec_symbol must be 0"); + assert_eq!(hdr.fec_ratio_encoded, 0, "Opus fec_ratio_encoded must be 0"); + } - let mut total_packets = 0; - let mut repair_count = 0; - for _ in 0..5 { + /// Phase 2: Opus never emits repair packets, regardless of how many + /// source frames are fed in. DRED (Phase 1) provides loss recovery at + /// the codec layer; RaptorQ is disabled on Opus tiers. + #[test] + fn opus_encoder_never_emits_repair_packets() { + let config = CallConfig { + profile: QualityProfile::GOOD, // 5 frames/block in the Codec2 sense + suppression_enabled: false, + ..Default::default() + }; + let mut enc = CallEncoder::new(&config); + let pcm: Vec = (0..960) + .map(|i| ((i as f32 * 0.1).sin() * 10_000.0) as i16) + .collect(); + + // Encode well beyond a block boundary to prove no repair ever comes out. + let mut total_packets = 0usize; + let mut repair_count = 0usize; + for _ in 0..20 { + let packets = enc.encode_frame(&pcm).unwrap(); + total_packets += packets.len(); + repair_count += packets.iter().filter(|p| p.header.is_repair).count(); + } + assert_eq!(repair_count, 0, "Opus must emit zero repair packets"); + assert_eq!( + total_packets, 20, + "20 source frames → 20 source packets (1:1, no RaptorQ expansion)" + ); + } + + /// Phase 2: Codec2 still emits repair packets with RaptorQ ratio unchanged. + /// DRED is libopus-only and does not apply here, so RaptorQ is still the + /// primary loss-recovery mechanism on Codec2 tiers. + #[test] + fn codec2_encoder_generates_repair_on_full_block() { + let config = CallConfig { + profile: QualityProfile::CATASTROPHIC, // Codec2 1200, 8 frames/block, ratio 1.0 + suppression_enabled: false, + ..Default::default() + }; + let mut enc = CallEncoder::new(&config); + // Codec2 takes 48 kHz samples and downsamples internally. + // CATASTROPHIC uses 40 ms frames → 1920 samples. + let pcm: Vec = (0..1920) + .map(|i| ((i as f32 * 0.1).sin() * 10_000.0) as i16) + .collect(); + + let mut total_packets = 0usize; + let mut repair_count = 0usize; + // Run long enough to cross the 8-frame block boundary and see repairs. + for _ in 0..16 { let packets = enc.encode_frame(&pcm).unwrap(); for p in &packets { if p.header.is_repair { @@ -693,8 +785,10 @@ mod tests { } total_packets += packets.len(); } - assert!(repair_count > 0, "should have repair packets after full block"); - assert!(total_packets > 5, "total {total_packets} should exceed 5 source"); + assert!( + repair_count > 0, + "Codec2 must still emit repair packets (got {repair_count} repairs, {total_packets} total)" + ); } #[test]