feat(codec): Phase 2 — remove RaptorQ from Opus tiers, Codec2 unchanged

Phase 2 of the DRED integration (docs/PRD-dred-integration.md). With
Phase 1 having enabled DRED on every Opus profile, the app-level RaptorQ
layer is now redundant overhead on those tiers: +20% bitrate, +40–100 ms
receive-side latency (block wait), +CPU for stats we never used. This
phase removes RaptorQ from the Opus encode and decode paths on both the
desktop (wzp-client/call.rs) and Android (wzp-android/engine.rs) sides.
Codec2 tiers keep RaptorQ with their current ratios unchanged — DRED is
libopus-only and Codec2 has no neural equivalent.

Encoder changes (the real bandwidth / CPU win):
- CallEncoder::encode_frame and engine.rs encode loop now gate the
  RaptorQ path on !codec.is_opus():
    - Opus source packets emit fec_block=0, fec_symbol=0,
      fec_ratio_encoded=0 in the MediaHeader
    - fec_enc.add_source_symbol is skipped on Opus
    - generate_repair + repair packet emission is skipped on Opus
    - block_id and frame_in_block counters stay frozen at 0 for Opus
- Codec2 path is byte-for-byte identical to pre-Phase-2 behavior.

Decoder changes (mostly cleanup, since both live decoder paths were
already reading audio directly from source packets and only using the
RaptorQ decoder output for stats):
- CallDecoder::ingest skips fec_dec.add_symbol on Opus packets. Source
  packets still flow to the jitter buffer; Opus repair packets from old
  senders are dropped cleanly (repair packets never hit the jitter
  buffer either).
- engine.rs recv loop skips fec_dec.add_symbol, fec_dec.try_decode, and
  fec_dec.expire_before on Opus packets. The `fec_recovered` stat
  counter becomes Codec2-only (a separate DRED reconstruction counter
  lands in Phase 4).

Wire-format backward compat verified at pre-flight:
- Old receiver + new sender: engine.rs pipeline.rs path gates on
  non-zero fec_block/fec_symbol which now never fire for Opus, so the
  RaptorQ decoder simply isn't fed. Audio flows normally. Desktop
  CallDecoder's old path accumulated packets into the stale-eviction
  HashMap, which cleans up after 2s — harmless.
- New receiver + old sender: new receiver skips RaptorQ on Opus so
  old-sender repair packets are ignored entirely (no crash, no double-
  decode). Loses the (previously vestigial) RaptorQ recovery benefit,
  which was never actually active in the audio path. Source packets
  still decode normally.
- No wire format version bump required. MediaHeader is unchanged; we
  just zero the FEC fields on Opus packets.

Test changes:
- Removed `encoder_generates_repair_on_full_block` — asserted the old
  (pre-Phase-2) RaptorQ-on-Opus behavior and is now incorrect. Replaced
  with two symmetric tests:
    - `opus_source_packets_have_zero_fec_header_fields` — verifies
      Phase 2 invariants on Opus packets
    - `opus_encoder_never_emits_repair_packets` — runs 20 frames of
      non-silent sine wave through a GOOD-profile encoder, asserts
      exactly 20 output packets, zero repair
    - `codec2_encoder_generates_repair_on_full_block` — same shape as
      the old test but on CATASTROPHIC profile (Codec2 1200, 8
      frames/block, ratio 1.0) to verify Codec2 path still emits
      repairs as before

Verification:
- cargo check --workspace: zero errors
- cargo test -p wzp-codec --lib: 61 passing (Phase 1 baseline held)
- cargo test -p wzp-client --lib: 32 passing (+3 new Phase 2 tests,
  -1 old test removed)
- cargo check -p wzp-android --lib: zero errors (host link of
  wzp-android tests fails on -llog per pre-existing Android-only
  build.rs, unrelated to this work; integration build via
  build-and-notify.sh will validate Android end-to-end)
- Pre-existing broken integration test in
  crates/wzp-client/tests/handshake_integration.rs (SignalMessage
  schema drift) is NOT caused by this commit — baseline had the same
  3 compile errors before Phase 2. Flagged as a separate cleanup task.

Expected observable effects on a real call:
- Opus 24k outgoing bitrate drops from ~28.8 kbps (ratio 0.2 RaptorQ)
  to ~25 kbps (base 24 kbps + DRED ~1–10 kbps signal-dependent)
- Opus receive-side latency drops ~40 ms on clean network (no more
  block wait — jitter buffer emits as soon as a source packet arrives)
- Codec2 calls show no latency or bitrate change

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-10 17:42:33 +04:00
parent 54cbebd34e
commit 6db5c25b54
2 changed files with 247 additions and 123 deletions

View File

@@ -665,6 +665,19 @@ async fn run_call(
t_opus_us += t0.elapsed().as_micros() as u64;
let encoded = &encode_buf[..encoded_len];
// Phase 2: Opus tiers bypass RaptorQ (DRED handles loss recovery
// at the codec layer). Codec2 tiers keep RaptorQ unchanged.
let is_opus = current_profile.codec.is_opus();
let (hdr_fec_block, hdr_fec_symbol, hdr_fec_ratio) = if is_opus {
(0u8, 0u8, 0u8)
} else {
(
block_id,
frame_in_block,
MediaHeader::encode_fec_ratio(current_profile.fec_ratio),
)
};
// Build source packet
let s = seq.fetch_add(1, Ordering::Relaxed);
let t = ts.fetch_add(frame_samples as u32, Ordering::Relaxed);
@@ -675,11 +688,11 @@ async fn run_call(
is_repair: false,
codec_id: current_profile.codec,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(current_profile.fec_ratio),
fec_ratio_encoded: hdr_fec_ratio,
seq: s,
timestamp: t,
fec_block: block_id,
fec_symbol: frame_in_block,
fec_block: hdr_fec_block,
fec_symbol: hdr_fec_symbol,
reserved: 0,
csrc_count: 0,
},
@@ -709,63 +722,66 @@ async fn run_call(
t_send_us += t0.elapsed().as_micros() as u64;
frames_sent += 1;
// Feed encoded frame to FEC encoder
// Codec2-only: feed RaptorQ and emit repair packets when the
// block is full. Opus tiers skip this entire block — DRED
// (enabled in Phase 1) provides codec-layer loss recovery.
let t0 = Instant::now();
if let Err(e) = fec_enc.add_source_symbol(encoded) {
warn!("fec add_source error: {e}");
}
frame_in_block += 1;
if !is_opus {
if let Err(e) = fec_enc.add_source_symbol(encoded) {
warn!("fec add_source error: {e}");
}
frame_in_block += 1;
// When block is full, generate repair packets
if frame_in_block >= current_profile.frames_per_block {
match fec_enc.generate_repair(current_profile.fec_ratio) {
Ok(repairs) => {
let repair_count = repairs.len();
for (sym_idx, repair_data) in repairs {
let rs = seq.fetch_add(1, Ordering::Relaxed);
let repair_pkt = MediaPacket {
header: MediaHeader {
version: 0,
is_repair: true,
codec_id: current_profile.codec,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(
current_profile.fec_ratio,
),
seq: rs,
timestamp: t,
fec_block: block_id,
fec_symbol: sym_idx,
reserved: 0,
csrc_count: 0,
},
payload: Bytes::from(repair_data),
quality_report: None,
};
// Drop repair packets on error — never break
if let Err(_e) = transport.send_media(&repair_pkt).await {
send_errors += 1;
frames_dropped += 1;
// Don't log every repair failure — source error log covers it
if frame_in_block >= current_profile.frames_per_block {
match fec_enc.generate_repair(current_profile.fec_ratio) {
Ok(repairs) => {
let repair_count = repairs.len();
for (sym_idx, repair_data) in repairs {
let rs = seq.fetch_add(1, Ordering::Relaxed);
let repair_pkt = MediaPacket {
header: MediaHeader {
version: 0,
is_repair: true,
codec_id: current_profile.codec,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(
current_profile.fec_ratio,
),
seq: rs,
timestamp: t,
fec_block: block_id,
fec_symbol: sym_idx,
reserved: 0,
csrc_count: 0,
},
payload: Bytes::from(repair_data),
quality_report: None,
};
// Drop repair packets on error — never break
if let Err(_e) = transport.send_media(&repair_pkt).await {
send_errors += 1;
frames_dropped += 1;
// Don't log every repair failure — source error log covers it
}
}
if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) {
info!(
block_id,
repair_count,
fec_ratio = current_profile.fec_ratio,
"FEC block complete"
);
}
}
if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) {
info!(
block_id,
repair_count,
fec_ratio = current_profile.fec_ratio,
"FEC block complete"
);
Err(e) => {
warn!("fec generate_repair error: {e}");
}
}
Err(e) => {
warn!("fec generate_repair error: {e}");
}
}
let _ = fec_enc.finalize_block();
block_id = block_id.wrapping_add(1);
frame_in_block = 0;
let _ = fec_enc.finalize_block();
block_id = block_id.wrapping_add(1);
frame_in_block = 0;
}
}
t_fec_us += t0.elapsed().as_micros() as u64;
t_frames += 1;
@@ -850,14 +866,21 @@ async fn run_call(
let is_repair = pkt.header.is_repair;
let pkt_block = pkt.header.fec_block;
let pkt_symbol = pkt.header.fec_symbol;
let pkt_is_opus = pkt.header.codec_id.is_opus();
// Feed every packet (source + repair) to FEC decoder
let _ = fec_dec.add_symbol(
pkt_block,
pkt_symbol,
is_repair,
&pkt.payload,
);
// Phase 2: Opus packets bypass RaptorQ entirely — DRED
// (enabled Phase 1) handles codec-layer loss recovery,
// and feeding these symbols into the RaptorQ decoder
// would accumulate block_id=0 duplicates that never
// decode. Codec2 packets still feed RaptorQ.
if !pkt_is_opus {
let _ = fec_dec.add_symbol(
pkt_block,
pkt_symbol,
is_repair,
&pkt.payload,
);
}
// Source packets: decode directly
if !is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
@@ -904,22 +927,29 @@ async fn run_call(
}
}
// Try FEC recovery
if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) {
fec_recovered += recovered_frames.len() as u64;
if fec_recovered % 50 == 1 {
info!(
fec_recovered,
block = pkt_block,
frames = recovered_frames.len(),
"FEC block recovered"
);
// Codec2-only: try FEC recovery and expire old blocks.
// Opus packets skip both — the Phase 2 Opus path has no
// RaptorQ state to query or clean up. The `fec_recovered`
// counter is now effectively Codec2-only, which is
// correct because DRED reconstructions will be counted
// separately once Phase 3 lands (new telemetry field).
if !pkt_is_opus {
if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) {
fec_recovered += recovered_frames.len() as u64;
if fec_recovered % 50 == 1 {
info!(
fec_recovered,
block = pkt_block,
frames = recovered_frames.len(),
"FEC block recovered"
);
}
}
}
// Expire old blocks to prevent memory growth
if pkt_block > 3 {
fec_dec.expire_before(pkt_block.wrapping_sub(3));
// Expire old blocks to prevent memory growth
if pkt_block > 3 {
fec_dec.expire_before(pkt_block.wrapping_sub(3));
}
}
let mut stats = state.stats.lock().unwrap();

View File

@@ -344,6 +344,22 @@ impl CallEncoder {
let enc_len = self.audio_enc.encode(pcm, &mut encoded)?;
encoded.truncate(enc_len);
// Phase 2: Opus tiers bypass RaptorQ entirely (DRED handles loss
// recovery at the codec layer). Codec2 tiers keep RaptorQ unchanged.
// On Opus packets, zero the FEC header fields so old receivers
// can cleanly identify "no RaptorQ block to assemble" and new
// receivers can short-circuit their FEC ingest path.
let is_opus = self.profile.codec.is_opus();
let (fec_block, fec_symbol, fec_ratio_encoded) = if is_opus {
(0u8, 0u8, 0u8)
} else {
(
self.block_id,
self.frame_in_block,
MediaHeader::encode_fec_ratio(self.profile.fec_ratio),
)
};
// Build source media packet
let source_pkt = MediaPacket {
header: MediaHeader {
@@ -351,11 +367,11 @@ impl CallEncoder {
is_repair: false,
codec_id: self.profile.codec,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(self.profile.fec_ratio),
fec_ratio_encoded,
seq: self.seq,
timestamp: self.timestamp_ms,
fec_block: self.block_id,
fec_symbol: self.frame_in_block,
fec_block,
fec_symbol,
reserved: 0,
csrc_count: 0,
},
@@ -370,39 +386,42 @@ impl CallEncoder {
let mut output = vec![source_pkt];
// Add to FEC encoder
self.fec_enc.add_source_symbol(&encoded)?;
self.frame_in_block += 1;
// Codec2-only: feed RaptorQ and generate repair packets when the
// block is full. Opus tiers skip this entire block — DRED (active
// in Phase 1) provides codec-layer loss recovery.
if !is_opus {
self.fec_enc.add_source_symbol(&encoded)?;
self.frame_in_block += 1;
// If block is full, generate repair and finalize
if self.frame_in_block >= self.profile.frames_per_block {
if let Ok(repairs) = self.fec_enc.generate_repair(self.profile.fec_ratio) {
for (sym_idx, repair_data) in repairs {
output.push(MediaPacket {
header: MediaHeader {
version: 0,
is_repair: true,
codec_id: self.profile.codec,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(
self.profile.fec_ratio,
),
seq: self.seq,
timestamp: self.timestamp_ms,
fec_block: self.block_id,
fec_symbol: sym_idx,
reserved: 0,
csrc_count: 0,
},
payload: Bytes::from(repair_data),
quality_report: None,
});
self.seq = self.seq.wrapping_add(1);
if self.frame_in_block >= self.profile.frames_per_block {
if let Ok(repairs) = self.fec_enc.generate_repair(self.profile.fec_ratio) {
for (sym_idx, repair_data) in repairs {
output.push(MediaPacket {
header: MediaHeader {
version: 0,
is_repair: true,
codec_id: self.profile.codec,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(
self.profile.fec_ratio,
),
seq: self.seq,
timestamp: self.timestamp_ms,
fec_block: self.block_id,
fec_symbol: sym_idx,
reserved: 0,
csrc_count: 0,
},
payload: Bytes::from(repair_data),
quality_report: None,
});
self.seq = self.seq.wrapping_add(1);
}
}
let _ = self.fec_enc.finalize_block();
self.block_id = self.block_id.wrapping_add(1);
self.frame_in_block = 0;
}
let _ = self.fec_enc.finalize_block();
self.block_id = self.block_id.wrapping_add(1);
self.frame_in_block = 0;
}
Ok(output)
@@ -486,15 +505,23 @@ impl CallDecoder {
/// Feed a received media packet into the decode pipeline.
pub fn ingest(&mut self, packet: MediaPacket) {
// Feed to FEC decoder
let _ = self.fec_dec.add_symbol(
packet.header.fec_block,
packet.header.fec_symbol,
packet.header.is_repair,
&packet.payload,
);
// Phase 2: Opus packets bypass RaptorQ. Codec2 packets still feed
// the FEC decoder for recovery. This also cleanly drops any stray
// Opus repair packets from an old sender (we don't push repair
// packets to the jitter buffer either, so they're effectively
// ignored — a graceful mixed-version degradation).
if !packet.header.codec_id.is_opus() {
let _ = self.fec_dec.add_symbol(
packet.header.fec_block,
packet.header.fec_symbol,
packet.header.is_repair,
&packet.payload,
);
}
// If not a repair packet, also feed directly to jitter buffer
// Source packets (Opus or Codec2) go to the jitter buffer for decode.
// Repair packets never reach the jitter buffer; for Codec2 they're
// used by the FEC decoder above, for Opus they're dropped here.
if !packet.header.is_repair {
self.jitter.push(packet);
}
@@ -673,18 +700,83 @@ mod tests {
assert!(!packets[0].header.is_repair);
}
/// Phase 2: Opus packets have zero FEC header fields — no block, no
/// symbol index, no repair ratio. The RaptorQ layer is bypassed
/// entirely on the Opus tiers.
#[test]
fn encoder_generates_repair_on_full_block() {
fn opus_source_packets_have_zero_fec_header_fields() {
let config = CallConfig {
profile: QualityProfile::GOOD, // 5 frames/block
profile: QualityProfile::GOOD, // Opus 24k
suppression_enabled: false, // skip silence gate for this test
..Default::default()
};
let mut enc = CallEncoder::new(&config);
let pcm = vec![0i16; 960];
// Non-silent sine wave so silence detection doesn't suppress us
// even with suppression_enabled=false (belt and braces).
let pcm: Vec<i16> = (0..960)
.map(|i| ((i as f32 * 0.1).sin() * 10_000.0) as i16)
.collect();
let packets = enc.encode_frame(&pcm).unwrap();
assert_eq!(packets.len(), 1, "Opus must emit exactly 1 source packet");
let hdr = &packets[0].header;
assert!(hdr.codec_id.is_opus());
assert!(!hdr.is_repair);
assert_eq!(hdr.fec_block, 0, "Opus fec_block must be 0");
assert_eq!(hdr.fec_symbol, 0, "Opus fec_symbol must be 0");
assert_eq!(hdr.fec_ratio_encoded, 0, "Opus fec_ratio_encoded must be 0");
}
let mut total_packets = 0;
let mut repair_count = 0;
for _ in 0..5 {
/// Phase 2: Opus never emits repair packets, regardless of how many
/// source frames are fed in. DRED (Phase 1) provides loss recovery at
/// the codec layer; RaptorQ is disabled on Opus tiers.
#[test]
fn opus_encoder_never_emits_repair_packets() {
let config = CallConfig {
profile: QualityProfile::GOOD, // 5 frames/block in the Codec2 sense
suppression_enabled: false,
..Default::default()
};
let mut enc = CallEncoder::new(&config);
let pcm: Vec<i16> = (0..960)
.map(|i| ((i as f32 * 0.1).sin() * 10_000.0) as i16)
.collect();
// Encode well beyond a block boundary to prove no repair ever comes out.
let mut total_packets = 0usize;
let mut repair_count = 0usize;
for _ in 0..20 {
let packets = enc.encode_frame(&pcm).unwrap();
total_packets += packets.len();
repair_count += packets.iter().filter(|p| p.header.is_repair).count();
}
assert_eq!(repair_count, 0, "Opus must emit zero repair packets");
assert_eq!(
total_packets, 20,
"20 source frames → 20 source packets (1:1, no RaptorQ expansion)"
);
}
/// Phase 2: Codec2 still emits repair packets with RaptorQ ratio unchanged.
/// DRED is libopus-only and does not apply here, so RaptorQ is still the
/// primary loss-recovery mechanism on Codec2 tiers.
#[test]
fn codec2_encoder_generates_repair_on_full_block() {
let config = CallConfig {
profile: QualityProfile::CATASTROPHIC, // Codec2 1200, 8 frames/block, ratio 1.0
suppression_enabled: false,
..Default::default()
};
let mut enc = CallEncoder::new(&config);
// Codec2 takes 48 kHz samples and downsamples internally.
// CATASTROPHIC uses 40 ms frames → 1920 samples.
let pcm: Vec<i16> = (0..1920)
.map(|i| ((i as f32 * 0.1).sin() * 10_000.0) as i16)
.collect();
let mut total_packets = 0usize;
let mut repair_count = 0usize;
// Run long enough to cross the 8-frame block boundary and see repairs.
for _ in 0..16 {
let packets = enc.encode_frame(&pcm).unwrap();
for p in &packets {
if p.header.is_repair {
@@ -693,8 +785,10 @@ mod tests {
}
total_packets += packets.len();
}
assert!(repair_count > 0, "should have repair packets after full block");
assert!(total_packets > 5, "total {total_packets} should exceed 5 source");
assert!(
repair_count > 0,
"Codec2 must still emit repair packets (got {repair_count} repairs, {total_packets} total)"
);
}
#[test]