feat: jitter buffer instrumentation — drift test, telemetry, parameter sweep
WZP-P2-T1-S1: Automated drift measurement - New drift_test.rs: DriftTestConfig, DriftResult, run_drift_test() - CLI --drift-test <secs>: sends tone, measures actual vs expected duration - Interpretation tiers: EXCELLENT (<50ms) / GOOD / FAIR / POOR - 2 unit tests: drift math verification, config defaults WZP-P2-T1-S2: Jitter buffer telemetry - JitterStats gains: total_decoded, underruns, overruns, max_depth_seen - JitterBuffer: record_underrun(), record_decode(), reset_stats() - CallDecoder: stats() getter, reset_stats() - JitterTelemetry: periodic tracing::info! logger with configurable interval - 4 unit tests: ingestion tracking, underrun tracking, reset, interval WZP-P2-T1-S3: Parameter sweep - New sweep.rs: SweepConfig, SweepResult, run_local_sweep() - Tests 20 jitter buffer configs (5 target × 4 max depths) locally - CLI --sweep: runs sweep, prints ASCII comparison table - No network needed — pure encoder→decoder pipeline test - 3 unit tests: config defaults, local sweep runs, table formatting 216 tests passing across all crates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,8 +2,10 @@
|
||||
//!
|
||||
//! Pipeline: mic → encode → FEC → encrypt → send / recv → decrypt → FEC → decode → speaker
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder};
|
||||
use wzp_proto::jitter::{JitterBuffer, PlayoutResult};
|
||||
@@ -323,25 +325,37 @@ impl CallDecoder {
|
||||
pub fn decode_next(&mut self, pcm: &mut [i16]) -> Option<usize> {
|
||||
match self.jitter.pop() {
|
||||
PlayoutResult::Packet(pkt) => {
|
||||
match self.audio_dec.decode(&pkt.payload, pcm) {
|
||||
let result = match self.audio_dec.decode(&pkt.payload, pcm) {
|
||||
Ok(n) => Some(n),
|
||||
Err(e) => {
|
||||
warn!("decode error: {e}, using PLC");
|
||||
self.audio_dec.decode_lost(pcm).ok()
|
||||
}
|
||||
};
|
||||
if result.is_some() {
|
||||
self.jitter.record_decode();
|
||||
}
|
||||
result
|
||||
}
|
||||
PlayoutResult::Missing { seq } => {
|
||||
// Only generate PLC if there are still packets buffered ahead.
|
||||
// Otherwise we've drained everything — return None to stop.
|
||||
if self.jitter.depth() > 0 {
|
||||
debug!(seq, "packet loss, generating PLC");
|
||||
self.audio_dec.decode_lost(pcm).ok()
|
||||
let result = self.audio_dec.decode_lost(pcm).ok();
|
||||
if result.is_some() {
|
||||
self.jitter.record_decode();
|
||||
}
|
||||
result
|
||||
} else {
|
||||
self.jitter.record_underrun();
|
||||
None
|
||||
}
|
||||
}
|
||||
PlayoutResult::NotReady => None,
|
||||
PlayoutResult::NotReady => {
|
||||
self.jitter.record_underrun();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,8 +365,54 @@ impl CallDecoder {
|
||||
}
|
||||
|
||||
/// Get jitter buffer statistics.
|
||||
pub fn jitter_stats(&self) -> wzp_proto::jitter::JitterStats {
|
||||
self.jitter.stats().clone()
|
||||
pub fn stats(&self) -> &wzp_proto::jitter::JitterStats {
|
||||
self.jitter.stats()
|
||||
}
|
||||
|
||||
/// Reset jitter buffer statistics counters.
|
||||
pub fn reset_stats(&mut self) {
|
||||
self.jitter.reset_stats();
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodic telemetry logger for jitter buffer statistics.
|
||||
///
|
||||
/// Call `maybe_log` on each decode tick; it will emit a `tracing::info!` event
|
||||
/// no more frequently than the configured interval.
|
||||
pub struct JitterTelemetry {
|
||||
interval: Duration,
|
||||
last_report: Instant,
|
||||
}
|
||||
|
||||
impl JitterTelemetry {
|
||||
/// Create a new telemetry logger that reports at most once per `interval_secs`.
|
||||
pub fn new(interval_secs: u64) -> Self {
|
||||
Self {
|
||||
interval: Duration::from_secs(interval_secs),
|
||||
last_report: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Log jitter statistics if the interval has elapsed. Returns `true` when a
|
||||
/// log line was emitted.
|
||||
pub fn maybe_log(&mut self, stats: &wzp_proto::jitter::JitterStats) -> bool {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(self.last_report) >= self.interval {
|
||||
info!(
|
||||
buffer_depth = stats.current_depth,
|
||||
underruns = stats.underruns,
|
||||
overruns = stats.overruns,
|
||||
late_packets = stats.packets_late,
|
||||
total_received = stats.packets_received,
|
||||
total_decoded = stats.total_decoded,
|
||||
max_depth_seen = stats.max_depth_seen,
|
||||
"jitter buffer telemetry"
|
||||
);
|
||||
self.last_report = now;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -558,4 +618,101 @@ mod tests {
|
||||
assert_eq!(catastrophic.profile, QualityProfile::CATASTROPHIC);
|
||||
assert!(catastrophic.jitter_max > degraded.jitter_max);
|
||||
}
|
||||
|
||||
// ---- JitterStats telemetry tests ----
|
||||
|
||||
fn make_test_packet(seq: u16) -> MediaPacket {
|
||||
MediaPacket {
|
||||
header: MediaHeader {
|
||||
version: 0,
|
||||
is_repair: false,
|
||||
codec_id: CodecId::Opus24k,
|
||||
has_quality_report: false,
|
||||
fec_ratio_encoded: 0,
|
||||
seq,
|
||||
timestamp: seq as u32 * 20,
|
||||
fec_block: 0,
|
||||
fec_symbol: seq as u8,
|
||||
reserved: 0,
|
||||
csrc_count: 0,
|
||||
},
|
||||
payload: Bytes::from(vec![0u8; 60]),
|
||||
quality_report: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stats_track_ingestion() {
|
||||
let config = CallConfig::default();
|
||||
let mut dec = CallDecoder::new(&config);
|
||||
|
||||
for i in 0..5u16 {
|
||||
dec.ingest(make_test_packet(i));
|
||||
}
|
||||
|
||||
let stats = dec.stats();
|
||||
assert_eq!(stats.packets_received, 5);
|
||||
assert_eq!(stats.current_depth, 5);
|
||||
assert_eq!(stats.max_depth_seen, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stats_track_underruns() {
|
||||
let config = CallConfig::default();
|
||||
let mut dec = CallDecoder::new(&config);
|
||||
|
||||
// Empty buffer — decode_next should record underruns
|
||||
let mut pcm = vec![0i16; 960];
|
||||
dec.decode_next(&mut pcm);
|
||||
dec.decode_next(&mut pcm);
|
||||
dec.decode_next(&mut pcm);
|
||||
|
||||
assert_eq!(dec.stats().underruns, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stats_reset() {
|
||||
let config = CallConfig::default();
|
||||
let mut dec = CallDecoder::new(&config);
|
||||
|
||||
// Generate some stats: ingest packets and trigger underruns on empty buffer
|
||||
for i in 0..3u16 {
|
||||
dec.ingest(make_test_packet(i));
|
||||
}
|
||||
// Also call decode on empty decoder to get underruns
|
||||
let config2 = CallConfig::default();
|
||||
let mut dec2 = CallDecoder::new(&config2);
|
||||
let mut pcm = vec![0i16; 960];
|
||||
dec2.decode_next(&mut pcm); // underrun — nothing in buffer
|
||||
|
||||
assert!(dec.stats().packets_received > 0);
|
||||
assert!(dec2.stats().underruns > 0);
|
||||
|
||||
// Test reset on the decoder with ingested packets
|
||||
dec.reset_stats();
|
||||
let stats = dec.stats();
|
||||
assert_eq!(stats.packets_received, 0);
|
||||
assert_eq!(stats.underruns, 0);
|
||||
assert_eq!(stats.overruns, 0);
|
||||
assert_eq!(stats.total_decoded, 0);
|
||||
assert_eq!(stats.packets_late, 0);
|
||||
assert_eq!(stats.max_depth_seen, 0);
|
||||
|
||||
// Test reset on the decoder with underruns
|
||||
dec2.reset_stats();
|
||||
assert_eq!(dec2.stats().underruns, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telemetry_respects_interval() {
|
||||
use wzp_proto::jitter::JitterStats;
|
||||
|
||||
let mut telemetry = JitterTelemetry::new(60); // 60-second interval
|
||||
let stats = JitterStats::default();
|
||||
|
||||
// First call right after creation — should not log because no time has passed
|
||||
// (the interval hasn't elapsed since construction)
|
||||
let logged = telemetry.maybe_log(&stats);
|
||||
assert!(!logged, "should not log before interval elapses");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user