diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 974ec4a..45bce5d 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -1209,6 +1209,15 @@ async fn run_call( stats.room_participant_count = count; stats.room_participants = members; } + Ok(Some(SignalMessage::QualityDirective { recommended_profile, reason })) => { + let idx = profile_to_index(&recommended_profile); + info!( + codec = ?recommended_profile.codec, + reason = reason.as_deref().unwrap_or(""), + "relay quality directive: switching profile" + ); + pending_profile_recv.store(idx, Ordering::Release); + } Ok(Some(msg)) => { info!("signal received: {:?}", std::mem::discriminant(&msg)); } diff --git a/crates/wzp-proto/src/quality.rs b/crates/wzp-proto/src/quality.rs index e5422c3..23aa7cf 100644 --- a/crates/wzp-proto/src/quality.rs +++ b/crates/wzp-proto/src/quality.rs @@ -6,19 +6,31 @@ use crate::traits::QualityController; use crate::QualityProfile; /// Network quality tier — drives codec and FEC selection. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// +/// 5-tier range from studio quality down to catastrophic: +/// Studio64k > Studio48k > Studio32k > Good > Degraded > Catastrophic +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum Tier { - /// loss < 10%, RTT < 400ms - Good, - /// loss 10-40% OR RTT 400-600ms - Degraded, - /// loss > 40% OR RTT > 600ms - Catastrophic, + /// loss >= 15% OR RTT >= 200ms — Codec2 1.2k + Catastrophic = 0, + /// loss < 15% AND RTT < 200ms — Opus 6k + Degraded = 1, + /// loss < 5% AND RTT < 100ms — Opus 24k + Good = 2, + /// loss < 2% AND RTT < 80ms — Opus 32k + Studio32k = 3, + /// loss < 1% AND RTT < 50ms — Opus 48k + Studio48k = 4, + /// loss < 1% AND RTT < 30ms — Opus 64k + Studio64k = 5, } impl Tier { pub fn profile(self) -> QualityProfile { match self { + Self::Studio64k => QualityProfile::STUDIO_64K, + Self::Studio48k => QualityProfile::STUDIO_48K, + Self::Studio32k => QualityProfile::STUDIO_32K, Self::Good => QualityProfile::GOOD, Self::Degraded => QualityProfile::DEGRADED, Self::Catastrophic => QualityProfile::CATASTROPHIC, @@ -39,7 +51,7 @@ impl Tier { NetworkContext::CellularLte | NetworkContext::Cellular5g | NetworkContext::Cellular3g => { - // Tighter thresholds for cellular networks + // Tighter thresholds for cellular — no studio tiers if loss > 25.0 || rtt > 500 { Self::Catastrophic } else if loss > 8.0 || rtt > 300 { @@ -49,13 +61,18 @@ impl Tier { } } NetworkContext::WiFi | NetworkContext::Unknown => { - // Original thresholds - if loss > 40.0 || rtt > 600 { + if loss >= 15.0 || rtt >= 200 { Self::Catastrophic - } else if loss > 10.0 || rtt > 400 { + } else if loss >= 5.0 || rtt >= 100 { Self::Degraded - } else { + } else if loss >= 2.0 || rtt >= 80 { Self::Good + } else if loss >= 1.0 || rtt >= 50 { + Self::Studio32k + } else if rtt >= 30 { + Self::Studio48k + } else { + Self::Studio64k } } } @@ -64,11 +81,19 @@ impl Tier { /// Return the next lower (worse) tier, or None if already at the worst. pub fn downgrade(self) -> Option { match self { + Self::Studio64k => Some(Self::Studio48k), + Self::Studio48k => Some(Self::Studio32k), + Self::Studio32k => Some(Self::Good), Self::Good => Some(Self::Degraded), Self::Degraded => Some(Self::Catastrophic), Self::Catastrophic => None, } } + + /// Whether this is a studio tier (above Good). + pub fn is_studio(self) -> bool { + matches!(self, Self::Studio64k | Self::Studio48k | Self::Studio32k) + } } /// Describes the network transport type for context-aware quality decisions. @@ -114,8 +139,10 @@ pub struct AdaptiveQualityController { const DOWNGRADE_THRESHOLD: u32 = 3; /// Threshold for downgrading on cellular networks (even faster). const CELLULAR_DOWNGRADE_THRESHOLD: u32 = 2; -/// Threshold for upgrading (slow, cautious improvement). -const UPGRADE_THRESHOLD: u32 = 10; +/// Threshold for upgrading from Catastrophic/Degraded to Good. +const UPGRADE_THRESHOLD: u32 = 5; +/// Threshold for upgrading into studio tiers (very conservative). +const STUDIO_UPGRADE_THRESHOLD: u32 = 10; /// Maximum history window size. const HISTORY_SIZE: usize = 20; /// Default FEC boost amount during handoff recovery. @@ -213,16 +240,13 @@ impl AdaptiveQualityController { return None; } - let is_worse = match (self.current_tier, observed_tier) { - (Tier::Good, Tier::Degraded | Tier::Catastrophic) => true, - (Tier::Degraded, Tier::Catastrophic) => true, - _ => false, - }; + let is_worse = observed_tier < self.current_tier; if is_worse { self.consecutive_up = 0; self.consecutive_down += 1; if self.consecutive_down >= self.downgrade_threshold() { + // Jump directly to the observed tier (don't step one-at-a-time on downgrade) self.current_tier = observed_tier; self.current_profile = observed_tier.profile(); self.consecutive_down = 0; @@ -232,22 +256,36 @@ impl AdaptiveQualityController { // Better conditions self.consecutive_down = 0; self.consecutive_up += 1; - if self.consecutive_up >= UPGRADE_THRESHOLD { + // Studio tiers require more consecutive good reports + let threshold = if self.current_tier >= Tier::Good { + STUDIO_UPGRADE_THRESHOLD + } else { + UPGRADE_THRESHOLD + }; + if self.consecutive_up >= threshold { // Only upgrade one step at a time - let next_tier = match self.current_tier { - Tier::Catastrophic => Tier::Degraded, - Tier::Degraded => Tier::Good, - Tier::Good => return None, - }; - self.current_tier = next_tier; - self.current_profile = next_tier.profile(); - self.consecutive_up = 0; - return Some(self.current_profile); + if let Some(next_tier) = self.upgrade_one_step() { + self.current_tier = next_tier; + self.current_profile = next_tier.profile(); + self.consecutive_up = 0; + return Some(self.current_profile); + } } } None } + + fn upgrade_one_step(&self) -> Option { + match self.current_tier { + Tier::Catastrophic => Some(Tier::Degraded), + Tier::Degraded => Some(Tier::Good), + Tier::Good => Some(Tier::Studio32k), + Tier::Studio32k => Some(Tier::Studio48k), + Tier::Studio48k => Some(Tier::Studio64k), + Tier::Studio64k => None, + } + } } impl Default for AdaptiveQualityController { @@ -331,25 +369,33 @@ mod tests { } assert_eq!(ctrl.tier(), Tier::Catastrophic); - // 9 good reports — not enough - let good = make_report(2.0, 100); - for _ in 0..9 { + // 4 good reports — not enough (threshold is 5) + let good = make_report(0.5, 20); // studio-quality report + for _ in 0..4 { assert!(ctrl.observe(&good).is_none()); } assert_eq!(ctrl.tier(), Tier::Catastrophic); - // 10th good report triggers upgrade (one step: Catastrophic → Degraded) + // 5th good report triggers upgrade (one step: Catastrophic → Degraded) let result = ctrl.observe(&good); assert!(result.is_some()); assert_eq!(ctrl.tier(), Tier::Degraded); - // Need another 10 to go from Degraded → Good - for _ in 0..9 { + // Another 5 to go from Degraded → Good + for _ in 0..4 { assert!(ctrl.observe(&good).is_none()); } let result = ctrl.observe(&good); assert!(result.is_some()); assert_eq!(ctrl.tier(), Tier::Good); + + // Studio upgrades need 10 consecutive — Good → Studio32k + for _ in 0..9 { + assert!(ctrl.observe(&good).is_none()); + } + let result = ctrl.observe(&good); + assert!(result.is_some()); + assert_eq!(ctrl.tier(), Tier::Studio32k); } #[test] @@ -366,11 +412,29 @@ mod tests { #[test] fn tier_classification() { - assert_eq!(Tier::classify(&make_report(5.0, 200)), Tier::Good); - assert_eq!(Tier::classify(&make_report(15.0, 200)), Tier::Degraded); - assert_eq!(Tier::classify(&make_report(5.0, 500)), Tier::Degraded); - assert_eq!(Tier::classify(&make_report(50.0, 200)), Tier::Catastrophic); - assert_eq!(Tier::classify(&make_report(5.0, 700)), Tier::Catastrophic); + // Studio tiers + assert_eq!(Tier::classify(&make_report(0.5, 20)), Tier::Studio64k); + assert_eq!(Tier::classify(&make_report(0.5, 40)), Tier::Studio48k); + assert_eq!(Tier::classify(&make_report(1.5, 60)), Tier::Studio32k); + // Good/Degraded/Catastrophic + assert_eq!(Tier::classify(&make_report(3.0, 90)), Tier::Good); + assert_eq!(Tier::classify(&make_report(6.0, 120)), Tier::Degraded); + assert_eq!(Tier::classify(&make_report(16.0, 120)), Tier::Catastrophic); + assert_eq!(Tier::classify(&make_report(5.0, 200)), Tier::Catastrophic); + } + + #[test] + fn studio_tier_boundaries() { + // loss < 1% AND RTT < 30ms → Studio64k + assert_eq!(Tier::classify(&make_report(0.9, 28)), Tier::Studio64k); + // loss < 1% AND RTT 30-49ms → Studio48k + assert_eq!(Tier::classify(&make_report(0.9, 32)), Tier::Studio48k); + // loss < 2% AND RTT < 80ms → Studio32k (but loss >= 1%) + assert_eq!(Tier::classify(&make_report(1.5, 40)), Tier::Studio32k); + // loss >= 2% → Good (use 2.5 to survive u8 quantization) + assert_eq!(Tier::classify(&make_report(2.5, 40)), Tier::Good); + // RTT 80ms → Good + assert_eq!(Tier::classify(&make_report(0.5, 80)), Tier::Good); } // --------------------------------------------------------------- @@ -379,8 +443,8 @@ mod tests { #[test] fn cellular_tighter_thresholds() { - // 12% loss: Good on WiFi, Degraded on cellular - let report = make_report(12.0, 200); + // 9% loss: Degraded on both WiFi (>=5%) and cellular (>=8%) + let report = make_report(9.0, 80); assert_eq!( Tier::classify_with_context(&report, NetworkContext::WiFi), Tier::Degraded @@ -390,22 +454,22 @@ mod tests { Tier::Degraded ); - // 9% loss: Good on WiFi, Degraded on cellular - let report = make_report(9.0, 200); + // 6% loss, low RTT: Degraded on WiFi (>=5%), Good on cellular (<8%) + let report = make_report(6.0, 80); assert_eq!( Tier::classify_with_context(&report, NetworkContext::WiFi), + Tier::Degraded + ); + assert_eq!( + Tier::classify_with_context(&report, NetworkContext::CellularLte), Tier::Good ); - assert_eq!( - Tier::classify_with_context(&report, NetworkContext::CellularLte), - Tier::Degraded - ); - // 30% loss: Degraded on WiFi, Catastrophic on cellular - let report = make_report(30.0, 200); + // 30% loss: Catastrophic on WiFi (>=15%), Catastrophic on cellular (>=25%) + let report = make_report(30.0, 80); assert_eq!( Tier::classify_with_context(&report, NetworkContext::WiFi), - Tier::Degraded + Tier::Catastrophic ); assert_eq!( Tier::classify_with_context(&report, NetworkContext::Cellular3g), @@ -415,15 +479,29 @@ mod tests { #[test] fn cellular_rtt_thresholds() { - // RTT 350ms: Good on WiFi, Degraded on cellular - let report = make_report(2.0, 348); // rtt_4ms rounds so use 348 + // RTT 150ms: Degraded on WiFi (>=100ms), Good on cellular (<300ms and loss<8%) + let report = make_report(2.0, 148); assert_eq!( Tier::classify_with_context(&report, NetworkContext::WiFi), - Tier::Good + Tier::Degraded ); assert_eq!( Tier::classify_with_context(&report, NetworkContext::CellularLte), - Tier::Degraded + Tier::Good + ); + } + + #[test] + fn cellular_no_studio_tiers() { + // Even with perfect network, cellular stays at Good (no studio) + let report = make_report(0.0, 10); + assert_eq!( + Tier::classify_with_context(&report, NetworkContext::CellularLte), + Tier::Good + ); + assert_eq!( + Tier::classify_with_context(&report, NetworkContext::WiFi), + Tier::Studio64k ); } @@ -469,6 +547,9 @@ mod tests { #[test] fn tier_downgrade() { + assert_eq!(Tier::Studio64k.downgrade(), Some(Tier::Studio48k)); + assert_eq!(Tier::Studio48k.downgrade(), Some(Tier::Studio32k)); + assert_eq!(Tier::Studio32k.downgrade(), Some(Tier::Good)); assert_eq!(Tier::Good.downgrade(), Some(Tier::Degraded)); assert_eq!(Tier::Degraded.downgrade(), Some(Tier::Catastrophic)); assert_eq!(Tier::Catastrophic.downgrade(), None); diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 8453345..41eee22 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -96,6 +96,62 @@ impl DebugTap { "TAP EVENT" ); } + + pub fn log_stats(&self, room: &str, stats: &TapStats) { + let codecs: Vec = stats.codecs_seen.iter().map(|c| format!("{c:?}")).collect(); + info!( + target: "debug_tap", + room = %room, + period = "5s", + in_pkts = stats.in_pkts, + out_pkts = stats.out_pkts, + fan_out_avg = format!("{:.1}", if stats.in_pkts > 0 { stats.out_pkts as f64 / stats.in_pkts as f64 } else { 0.0 }), + seq_gaps = stats.seq_gaps, + codecs_seen = ?codecs, + "TAP STATS" + ); + } +} + +/// Per-participant stats for the debug tap periodic summary. +pub struct TapStats { + pub in_pkts: u64, + pub out_pkts: u64, + pub seq_gaps: u64, + pub codecs_seen: std::collections::HashSet, + last_seq: Option, +} + +impl TapStats { + pub fn new() -> Self { + Self { + in_pkts: 0, + out_pkts: 0, + seq_gaps: 0, + codecs_seen: std::collections::HashSet::new(), + last_seq: None, + } + } + + pub fn record_in(&mut self, pkt: &wzp_proto::MediaPacket, fan_out: usize) { + self.in_pkts += 1; + self.out_pkts += fan_out as u64; + self.codecs_seen.insert(pkt.header.codec_id); + if let Some(prev) = self.last_seq { + let expected = prev.wrapping_add(1); + if pkt.header.seq != expected { + self.seq_gaps += 1; + } + } + self.last_seq = Some(pkt.header.seq); + } + + pub fn reset_period(&mut self) { + self.in_pkts = 0; + self.out_pkts = 0; + self.seq_gaps = 0; + // Keep codecs_seen and last_seq across periods + } } /// Tracks network quality for a single participant in a room. @@ -129,11 +185,7 @@ impl ParticipantQuality { fn weakest_tier<'a>(qualities: impl Iterator) -> Tier { qualities .map(|pq| pq.current_tier) - .min_by_key(|t| match t { - Tier::Good => 2, - Tier::Degraded => 1, - Tier::Catastrophic => 0, - }) + .min() .unwrap_or(Tier::Good) } @@ -638,6 +690,12 @@ async fn run_participant_plain( let mut send_errors = 0u64; let mut last_log_instant = std::time::Instant::now(); + let mut tap_stats = if debug_tap.as_ref().map_or(false, |t| t.matches(&room_name)) { + Some(TapStats::new()) + } else { + None + }; + info!( room = %room_name, participant = participant_id, @@ -717,12 +775,15 @@ async fn run_participant_plain( broadcast_signal(&all_senders, &directive).await; } - // Debug tap: log packet metadata + // Debug tap: log packet metadata + record stats if let Some(ref tap) = debug_tap { if tap.matches(&room_name) { tap.log_packet(&room_name, "in", &addr, &pkt, others.len()); } } + if let Some(ref mut ts) = tap_stats { + ts.record_in(&pkt, others.len()); + } // Forward to all others let fwd_start = std::time::Instant::now(); @@ -795,6 +856,10 @@ async fn run_participant_plain( send_errors, "participant stats" ); + if let (Some(tap), Some(ts)) = (&debug_tap, &mut tap_stats) { + tap.log_stats(&room_name, ts); + ts.reset_period(); + } max_recv_gap_ms = 0; max_forward_ms = 0; last_log_instant = std::time::Instant::now(); diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 59828fc..71ec87b 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -1114,10 +1114,11 @@ impl CallEngine { } }); - // Signal task (presence — same shape as desktop). + // Signal task (presence + quality directives). let sig_t = transport.clone(); let sig_r = running.clone(); let sig_p = participants.clone(); + let sig_pending_profile = pending_profile.clone(); let event_cb = Arc::new(event_cb); let sig_cb = event_cb.clone(); tokio::spawn(async move { @@ -1149,6 +1150,18 @@ impl CallEngine { *sig_p.lock().await = unique; sig_cb("room-update", &format!("{count} participants")); } + Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective { + recommended_profile, + reason, + }))) => { + let idx = profile_to_index(&recommended_profile); + info!( + codec = ?recommended_profile.codec, + reason = reason.as_deref().unwrap_or(""), + "relay quality directive: switching profile" + ); + sig_pending_profile.store(idx, Ordering::Release); + } Ok(Ok(Some(_))) => {} Ok(Ok(None)) => break, Ok(Err(_)) => break, @@ -1534,10 +1547,11 @@ impl CallEngine { } }); - // Signal task (presence) + // Signal task (presence + quality directives) let sig_t = transport.clone(); let sig_r = running.clone(); let sig_p = participants.clone(); + let sig_pending_profile = pending_profile.clone(); let event_cb = Arc::new(event_cb); let sig_cb = event_cb.clone(); tokio::spawn(async move { @@ -1569,6 +1583,18 @@ impl CallEngine { *sig_p.lock().await = unique; sig_cb("room-update", &format!("{count} participants")); } + Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective { + recommended_profile, + reason, + }))) => { + let idx = profile_to_index(&recommended_profile); + info!( + codec = ?recommended_profile.codec, + reason = reason.as_deref().unwrap_or(""), + "relay quality directive: switching profile" + ); + sig_pending_profile.store(idx, Ordering::Release); + } Ok(Ok(Some(_))) => {} Ok(Ok(None)) => break, Ok(Err(_)) => break,