feat(p2p): adaptive quality on direct calls (#23)
Some checks failed
Mirror to GitHub / mirror (push) Failing after 27s
Build Release Binaries / build-amd64 (push) Failing after 3m37s

P2P calls now adapt codec quality based on observed network conditions,
matching what relay calls already had.

Three-layer implementation:
- QualityReport::from_path_stats(): construct reports from local quinn
  stats (loss%, RTT, jitter) without needing relay-generated reports
- CallEncoder.pending_quality_report: one-shot attachment to next
  source packet (consumed on encode, not repeated)
- Engine send tasks: generate quality report every 50 frames (~1s)
  from quinn_path_stats() and attach via set_pending_quality_report()
- Engine recv tasks: self-observe from own QUIC path stats every 50
  packets, feed to AdaptiveQualityController for P2P adaptation
  (works even if peer isn't sending quality reports yet)

Both relay and P2P calls now have adaptive quality. On relay calls,
both peer-sent reports AND local observations feed the controller.
Hysteresis (3 consecutive bad reports to downgrade) prevents thrashing.

372 tests passing (+4 new: from_path_stats encoding, clamping, zero
values, encoder quality report attachment).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-13 16:14:06 +04:00
parent 81b5522942
commit 1e82811cc1
3 changed files with 157 additions and 2 deletions

View File

@@ -38,6 +38,8 @@ const CONNECT_TIMEOUT_SECS: u64 = 10;
#[cfg_attr(not(target_os = "android"), allow(dead_code))]
const HEARTBEAT_INTERVAL_SECS: u64 = 2;
const DRED_POLL_INTERVAL: u32 = 25;
/// Generate and attach a QualityReport every N frames (~1s at 20ms/frame).
const QUALITY_REPORT_INTERVAL: u32 = 50;
/// Profile index mapping for the AtomicU8 adaptive-quality bridge.
const PROFILE_NO_CHANGE: u8 = 0xFF;
@@ -643,6 +645,7 @@ impl CallEngine {
// expected-loss hint based on real-time network conditions.
let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec);
let mut frames_since_dred_poll: u32 = 0;
let mut frames_since_quality_report: u32 = 0;
let mut heartbeat = std::time::Instant::now();
let mut last_rms: u32 = 0;
@@ -782,6 +785,21 @@ impl CallEngine {
}
}
// Quality report: generate from quinn stats and attach to next packet.
// The peer's recv task (or relay) uses this for adaptive quality.
frames_since_quality_report += 1;
if frames_since_quality_report >= QUALITY_REPORT_INTERVAL {
frames_since_quality_report = 0;
let snap = send_t.quinn_path_stats();
let pq = send_t.path_quality();
let report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
snap.rtt_ms,
pq.jitter_ms,
);
encoder.set_pending_quality_report(report);
}
// Heartbeat every 2s with capture+encode+send state
if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) {
let fs = send_fs.load(Ordering::Relaxed);
@@ -843,6 +861,7 @@ impl CallEngine {
// above for the full flow.
let mut dred_recv = DredRecvState::new();
let mut quality_ctrl = AdaptiveQualityController::new();
let mut recv_quality_counter: u32 = 0;
info!(codec = ?current_codec, t_ms = recv_t0.elapsed().as_millis(), "first-join diag: recv task spawned (android/oboe)");
// First-join diagnostic latches — see send task above for the
// sibling capture milestones.
@@ -989,6 +1008,29 @@ impl CallEngine {
}
}
// P2P self-observation: if no quality reports from peer,
// generate local observations from our own QUIC path stats.
// This ensures adaptive quality works even on P2P calls
// where the peer hasn't been updated to send reports yet.
recv_quality_counter += 1;
if recv_quality_counter >= QUALITY_REPORT_INTERVAL {
recv_quality_counter = 0;
let snap = recv_t.quinn_path_stats();
let pq = recv_t.path_quality();
let local_report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
snap.rtt_ms,
pq.jitter_ms,
);
if auto_profile {
if let Some(new_profile) = quality_ctrl.observe(&local_report) {
let idx = profile_to_index(&new_profile);
info!(to = ?new_profile.codec, "auto: local quality observation recommends switch");
pending_profile_recv.store(idx, Ordering::Release);
}
}
}
match decoder.decode(&pkt.payload, &mut pcm) {
Ok(n) => {
last_decode_n = n;
@@ -1392,6 +1434,7 @@ impl CallEngine {
// Continuous DRED tuning (same as Android send task).
let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec);
let mut frames_since_dred_poll: u32 = 0;
let mut frames_since_quality_report: u32 = 0;
loop {
if !send_r.load(Ordering::Relaxed) {
@@ -1460,6 +1503,21 @@ impl CallEngine {
encoder.apply_dred_tuning(tuning);
}
}
// Quality report: generate from quinn stats and attach to next packet.
// The peer's recv task (or relay) uses this for adaptive quality.
frames_since_quality_report += 1;
if frames_since_quality_report >= QUALITY_REPORT_INTERVAL {
frames_since_quality_report = 0;
let snap = send_t.quinn_path_stats();
let pq = send_t.path_quality();
let report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
snap.rtt_ms,
pq.jitter_ms,
);
encoder.set_pending_quality_report(report);
}
}
});
@@ -1483,6 +1541,7 @@ impl CallEngine {
let mut pcm = vec![0i16; FRAME_SAMPLES_40MS]; // big enough for any codec
let mut dred_recv = DredRecvState::new();
let mut quality_ctrl = AdaptiveQualityController::new();
let mut recv_quality_counter: u32 = 0;
loop {
if !recv_r.load(Ordering::Relaxed) {
@@ -1544,6 +1603,29 @@ impl CallEngine {
}
}
// P2P self-observation: if no quality reports from peer,
// generate local observations from our own QUIC path stats.
// This ensures adaptive quality works even on P2P calls
// where the peer hasn't been updated to send reports yet.
recv_quality_counter += 1;
if recv_quality_counter >= QUALITY_REPORT_INTERVAL {
recv_quality_counter = 0;
let snap = recv_t.quinn_path_stats();
let pq = recv_t.path_quality();
let local_report = wzp_proto::QualityReport::from_path_stats(
snap.loss_pct,
snap.rtt_ms,
pq.jitter_ms,
);
if auto_profile {
if let Some(new_profile) = quality_ctrl.observe(&local_report) {
let idx = profile_to_index(&new_profile);
info!(to = ?new_profile.codec, "auto: local quality observation recommends switch");
pending_profile_recv.store(idx, Ordering::Release);
}
}
}
if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) {
agc.process_frame(&mut pcm[..n]);
if !recv_spk.load(Ordering::Relaxed) {