diff --git a/crates/wzp-client/src/call.rs b/crates/wzp-client/src/call.rs index e8ce0ad..7ac57f1 100644 --- a/crates/wzp-client/src/call.rs +++ b/crates/wzp-client/src/call.rs @@ -234,6 +234,8 @@ pub struct CallEncoder { mini_frames_enabled: bool, /// Frames encoded since the last full header was emitted. frames_since_full: u32, + /// Pending quality report to attach to the next source packet. + pending_quality_report: Option, } impl CallEncoder { @@ -264,6 +266,7 @@ impl CallEncoder { mini_context: MiniFrameContext::default(), mini_frames_enabled: config.mini_frames_enabled, frames_since_full: 0, + pending_quality_report: None, } } @@ -367,7 +370,7 @@ impl CallEncoder { version: 0, is_repair: false, codec_id: self.profile.codec, - has_quality_report: false, + has_quality_report: self.pending_quality_report.is_some(), fec_ratio_encoded, seq: self.seq, timestamp: self.timestamp_ms, @@ -377,7 +380,7 @@ impl CallEncoder { csrc_count: 0, }, payload: Bytes::from(encoded.clone()), - quality_report: None, + quality_report: self.pending_quality_report.take(), }; self.seq = self.seq.wrapping_add(1); @@ -454,6 +457,13 @@ impl CallEncoder { self.audio_enc.set_expected_loss(tuning.expected_loss_pct); } + /// Queue a quality report for attachment to the next source packet. + /// Used by the send task to embed locally-observed path quality so + /// the peer can drive adaptive quality switching. + pub fn set_pending_quality_report(&mut self, report: QualityReport) { + self.pending_quality_report = Some(report); + } + /// Enable or disable acoustic echo cancellation. pub fn set_aec_enabled(&mut self, enabled: bool) { self.aec.set_enabled(enabled); @@ -1578,4 +1588,28 @@ mod tests { let packets = enc.encode_frame(&pcm).unwrap(); assert!(!packets.is_empty()); } + + #[test] + fn encoder_attaches_quality_report() { + let mut enc = CallEncoder::new(&CallConfig { + profile: QualityProfile::GOOD, + suppression_enabled: false, + ..Default::default() + }); + + // Set a quality report + enc.set_pending_quality_report(QualityReport::from_path_stats(5.0, 80, 10)); + + // Encode a frame — should have quality_report attached + let pcm = voice_frame_20ms(0); + let packets = enc.encode_frame(&pcm).unwrap(); + assert!(!packets.is_empty()); + assert!(packets[0].header.has_quality_report, "first packet should have quality report"); + assert!(packets[0].quality_report.is_some()); + + // Next frame should NOT have quality_report (it was consumed) + let packets2 = enc.encode_frame(&voice_frame_20ms(960)).unwrap(); + assert!(!packets2[0].header.has_quality_report, "second packet should not have quality report"); + assert!(packets2[0].quality_report.is_none()); + } } diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index da36cc1..19775e8 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -180,6 +180,19 @@ impl QualityReport { self.rtt_4ms as u16 * 4 } + /// Construct a QualityReport from locally-observed path statistics. + /// + /// Used by the send task to embed quality data in outgoing packets so + /// the peer's recv task (or relay) can drive adaptive quality switching. + pub fn from_path_stats(loss_pct: f32, rtt_ms: u32, jitter_ms: u32) -> Self { + Self { + loss_pct: (loss_pct / 100.0 * 255.0).clamp(0.0, 255.0) as u8, + rtt_4ms: (rtt_ms / 4).min(255) as u8, + jitter_ms: jitter_ms.min(255) as u8, + bitrate_cap_kbps: 200, + } + } + pub fn write_to(&self, buf: &mut impl BufMut) { buf.put_u8(self.loss_pct); buf.put_u8(self.rtt_4ms); @@ -966,6 +979,32 @@ pub enum HangupReason { mod tests { use super::*; + #[test] + fn quality_report_from_path_stats_basic() { + let qr = QualityReport::from_path_stats(10.0, 100, 20); + // 10.0 / 100.0 * 255.0 = 25.5 → truncated to 25 + assert_eq!(qr.loss_pct, 25); + assert_eq!(qr.rtt_4ms, 25); // 100 / 4 = 25 + assert_eq!(qr.jitter_ms, 20); + assert_eq!(qr.bitrate_cap_kbps, 200); + } + + #[test] + fn quality_report_from_path_stats_zero() { + let qr = QualityReport::from_path_stats(0.0, 0, 0); + assert_eq!(qr.loss_pct, 0); + assert_eq!(qr.rtt_4ms, 0); + assert_eq!(qr.jitter_ms, 0); + } + + #[test] + fn quality_report_from_path_stats_clamps_high() { + let qr = QualityReport::from_path_stats(100.0, 2000, 300); + assert_eq!(qr.loss_pct, 255); + assert_eq!(qr.rtt_4ms, 255); // 2000/4=500, clamped to 255 + assert_eq!(qr.jitter_ms, 255); + } + #[test] fn header_roundtrip() { let header = MediaHeader { diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 6c079ed..b48069b 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -38,6 +38,8 @@ const CONNECT_TIMEOUT_SECS: u64 = 10; #[cfg_attr(not(target_os = "android"), allow(dead_code))] const HEARTBEAT_INTERVAL_SECS: u64 = 2; const DRED_POLL_INTERVAL: u32 = 25; +/// Generate and attach a QualityReport every N frames (~1s at 20ms/frame). +const QUALITY_REPORT_INTERVAL: u32 = 50; /// Profile index mapping for the AtomicU8 adaptive-quality bridge. const PROFILE_NO_CHANGE: u8 = 0xFF; @@ -643,6 +645,7 @@ impl CallEngine { // expected-loss hint based on real-time network conditions. let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec); let mut frames_since_dred_poll: u32 = 0; + let mut frames_since_quality_report: u32 = 0; let mut heartbeat = std::time::Instant::now(); let mut last_rms: u32 = 0; @@ -782,6 +785,21 @@ impl CallEngine { } } + // Quality report: generate from quinn stats and attach to next packet. + // The peer's recv task (or relay) uses this for adaptive quality. + frames_since_quality_report += 1; + if frames_since_quality_report >= QUALITY_REPORT_INTERVAL { + frames_since_quality_report = 0; + let snap = send_t.quinn_path_stats(); + let pq = send_t.path_quality(); + let report = wzp_proto::QualityReport::from_path_stats( + snap.loss_pct, + snap.rtt_ms, + pq.jitter_ms, + ); + encoder.set_pending_quality_report(report); + } + // Heartbeat every 2s with capture+encode+send state if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) { let fs = send_fs.load(Ordering::Relaxed); @@ -843,6 +861,7 @@ impl CallEngine { // above for the full flow. let mut dred_recv = DredRecvState::new(); let mut quality_ctrl = AdaptiveQualityController::new(); + let mut recv_quality_counter: u32 = 0; info!(codec = ?current_codec, t_ms = recv_t0.elapsed().as_millis(), "first-join diag: recv task spawned (android/oboe)"); // First-join diagnostic latches — see send task above for the // sibling capture milestones. @@ -989,6 +1008,29 @@ impl CallEngine { } } + // P2P self-observation: if no quality reports from peer, + // generate local observations from our own QUIC path stats. + // This ensures adaptive quality works even on P2P calls + // where the peer hasn't been updated to send reports yet. + recv_quality_counter += 1; + if recv_quality_counter >= QUALITY_REPORT_INTERVAL { + recv_quality_counter = 0; + let snap = recv_t.quinn_path_stats(); + let pq = recv_t.path_quality(); + let local_report = wzp_proto::QualityReport::from_path_stats( + snap.loss_pct, + snap.rtt_ms, + pq.jitter_ms, + ); + if auto_profile { + if let Some(new_profile) = quality_ctrl.observe(&local_report) { + let idx = profile_to_index(&new_profile); + info!(to = ?new_profile.codec, "auto: local quality observation recommends switch"); + pending_profile_recv.store(idx, Ordering::Release); + } + } + } + match decoder.decode(&pkt.payload, &mut pcm) { Ok(n) => { last_decode_n = n; @@ -1392,6 +1434,7 @@ impl CallEngine { // Continuous DRED tuning (same as Android send task). let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec); let mut frames_since_dred_poll: u32 = 0; + let mut frames_since_quality_report: u32 = 0; loop { if !send_r.load(Ordering::Relaxed) { @@ -1460,6 +1503,21 @@ impl CallEngine { encoder.apply_dred_tuning(tuning); } } + + // Quality report: generate from quinn stats and attach to next packet. + // The peer's recv task (or relay) uses this for adaptive quality. + frames_since_quality_report += 1; + if frames_since_quality_report >= QUALITY_REPORT_INTERVAL { + frames_since_quality_report = 0; + let snap = send_t.quinn_path_stats(); + let pq = send_t.path_quality(); + let report = wzp_proto::QualityReport::from_path_stats( + snap.loss_pct, + snap.rtt_ms, + pq.jitter_ms, + ); + encoder.set_pending_quality_report(report); + } } }); @@ -1483,6 +1541,7 @@ impl CallEngine { let mut pcm = vec![0i16; FRAME_SAMPLES_40MS]; // big enough for any codec let mut dred_recv = DredRecvState::new(); let mut quality_ctrl = AdaptiveQualityController::new(); + let mut recv_quality_counter: u32 = 0; loop { if !recv_r.load(Ordering::Relaxed) { @@ -1544,6 +1603,29 @@ impl CallEngine { } } + // P2P self-observation: if no quality reports from peer, + // generate local observations from our own QUIC path stats. + // This ensures adaptive quality works even on P2P calls + // where the peer hasn't been updated to send reports yet. + recv_quality_counter += 1; + if recv_quality_counter >= QUALITY_REPORT_INTERVAL { + recv_quality_counter = 0; + let snap = recv_t.quinn_path_stats(); + let pq = recv_t.path_quality(); + let local_report = wzp_proto::QualityReport::from_path_stats( + snap.loss_pct, + snap.rtt_ms, + pq.jitter_ms, + ); + if auto_profile { + if let Some(new_profile) = quality_ctrl.observe(&local_report) { + let idx = profile_to_index(&new_profile); + info!(to = ?new_profile.codec, "auto: local quality observation recommends switch"); + pending_profile_recv.store(idx, Ordering::Release); + } + } + } + if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) { agc.process_frame(&mut pcm[..n]); if !recv_spk.load(Ordering::Relaxed) {