feat: 5-tier quality classification, QualityDirective handling, debug tap stats
Some checks failed
Mirror to GitHub / mirror (push) Failing after 31s
Build Release Binaries / build-amd64 (push) Failing after 3m49s

- Extend Tier enum from 3 to 6 levels: Studio64k/48k/32k + Good +
  Degraded + Catastrophic with asymmetric hysteresis (down:3, up:5,
  studio:10)
- Handle QualityDirective signals in both desktop and Android engines
  — relay-coordinated codec switching now works end-to-end
- Add periodic TAP STATS to debug tap: packets in/out, fan-out avg,
  seq gaps, codecs seen (every 5s)
- Mark task #2 done (ParticipantInfo in federation signals already
  implemented)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-13 10:23:48 +04:00
parent ea5fc17c34
commit d424515542
4 changed files with 244 additions and 63 deletions

View File

@@ -1209,6 +1209,15 @@ async fn run_call(
stats.room_participant_count = count; stats.room_participant_count = count;
stats.room_participants = members; stats.room_participants = members;
} }
Ok(Some(SignalMessage::QualityDirective { recommended_profile, reason })) => {
let idx = profile_to_index(&recommended_profile);
info!(
codec = ?recommended_profile.codec,
reason = reason.as_deref().unwrap_or(""),
"relay quality directive: switching profile"
);
pending_profile_recv.store(idx, Ordering::Release);
}
Ok(Some(msg)) => { Ok(Some(msg)) => {
info!("signal received: {:?}", std::mem::discriminant(&msg)); info!("signal received: {:?}", std::mem::discriminant(&msg));
} }

View File

@@ -6,19 +6,31 @@ use crate::traits::QualityController;
use crate::QualityProfile; use crate::QualityProfile;
/// Network quality tier — drives codec and FEC selection. /// Network quality tier — drives codec and FEC selection.
#[derive(Clone, Copy, Debug, PartialEq, Eq)] ///
/// 5-tier range from studio quality down to catastrophic:
/// Studio64k > Studio48k > Studio32k > Good > Degraded > Catastrophic
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Tier { pub enum Tier {
/// loss < 10%, RTT < 400ms /// loss >= 15% OR RTT >= 200ms — Codec2 1.2k
Good, Catastrophic = 0,
/// loss 10-40% OR RTT 400-600ms /// loss < 15% AND RTT < 200ms — Opus 6k
Degraded, Degraded = 1,
/// loss > 40% OR RTT > 600ms /// loss < 5% AND RTT < 100ms — Opus 24k
Catastrophic, Good = 2,
/// loss < 2% AND RTT < 80ms — Opus 32k
Studio32k = 3,
/// loss < 1% AND RTT < 50ms — Opus 48k
Studio48k = 4,
/// loss < 1% AND RTT < 30ms — Opus 64k
Studio64k = 5,
} }
impl Tier { impl Tier {
pub fn profile(self) -> QualityProfile { pub fn profile(self) -> QualityProfile {
match self { match self {
Self::Studio64k => QualityProfile::STUDIO_64K,
Self::Studio48k => QualityProfile::STUDIO_48K,
Self::Studio32k => QualityProfile::STUDIO_32K,
Self::Good => QualityProfile::GOOD, Self::Good => QualityProfile::GOOD,
Self::Degraded => QualityProfile::DEGRADED, Self::Degraded => QualityProfile::DEGRADED,
Self::Catastrophic => QualityProfile::CATASTROPHIC, Self::Catastrophic => QualityProfile::CATASTROPHIC,
@@ -39,7 +51,7 @@ impl Tier {
NetworkContext::CellularLte NetworkContext::CellularLte
| NetworkContext::Cellular5g | NetworkContext::Cellular5g
| NetworkContext::Cellular3g => { | NetworkContext::Cellular3g => {
// Tighter thresholds for cellular networks // Tighter thresholds for cellular — no studio tiers
if loss > 25.0 || rtt > 500 { if loss > 25.0 || rtt > 500 {
Self::Catastrophic Self::Catastrophic
} else if loss > 8.0 || rtt > 300 { } else if loss > 8.0 || rtt > 300 {
@@ -49,13 +61,18 @@ impl Tier {
} }
} }
NetworkContext::WiFi | NetworkContext::Unknown => { NetworkContext::WiFi | NetworkContext::Unknown => {
// Original thresholds if loss >= 15.0 || rtt >= 200 {
if loss > 40.0 || rtt > 600 {
Self::Catastrophic Self::Catastrophic
} else if loss > 10.0 || rtt > 400 { } else if loss >= 5.0 || rtt >= 100 {
Self::Degraded Self::Degraded
} else { } else if loss >= 2.0 || rtt >= 80 {
Self::Good Self::Good
} else if loss >= 1.0 || rtt >= 50 {
Self::Studio32k
} else if rtt >= 30 {
Self::Studio48k
} else {
Self::Studio64k
} }
} }
} }
@@ -64,11 +81,19 @@ impl Tier {
/// Return the next lower (worse) tier, or None if already at the worst. /// Return the next lower (worse) tier, or None if already at the worst.
pub fn downgrade(self) -> Option<Tier> { pub fn downgrade(self) -> Option<Tier> {
match self { match self {
Self::Studio64k => Some(Self::Studio48k),
Self::Studio48k => Some(Self::Studio32k),
Self::Studio32k => Some(Self::Good),
Self::Good => Some(Self::Degraded), Self::Good => Some(Self::Degraded),
Self::Degraded => Some(Self::Catastrophic), Self::Degraded => Some(Self::Catastrophic),
Self::Catastrophic => None, Self::Catastrophic => None,
} }
} }
/// Whether this is a studio tier (above Good).
pub fn is_studio(self) -> bool {
matches!(self, Self::Studio64k | Self::Studio48k | Self::Studio32k)
}
} }
/// Describes the network transport type for context-aware quality decisions. /// Describes the network transport type for context-aware quality decisions.
@@ -114,8 +139,10 @@ pub struct AdaptiveQualityController {
const DOWNGRADE_THRESHOLD: u32 = 3; const DOWNGRADE_THRESHOLD: u32 = 3;
/// Threshold for downgrading on cellular networks (even faster). /// Threshold for downgrading on cellular networks (even faster).
const CELLULAR_DOWNGRADE_THRESHOLD: u32 = 2; const CELLULAR_DOWNGRADE_THRESHOLD: u32 = 2;
/// Threshold for upgrading (slow, cautious improvement). /// Threshold for upgrading from Catastrophic/Degraded to Good.
const UPGRADE_THRESHOLD: u32 = 10; const UPGRADE_THRESHOLD: u32 = 5;
/// Threshold for upgrading into studio tiers (very conservative).
const STUDIO_UPGRADE_THRESHOLD: u32 = 10;
/// Maximum history window size. /// Maximum history window size.
const HISTORY_SIZE: usize = 20; const HISTORY_SIZE: usize = 20;
/// Default FEC boost amount during handoff recovery. /// Default FEC boost amount during handoff recovery.
@@ -213,16 +240,13 @@ impl AdaptiveQualityController {
return None; return None;
} }
let is_worse = match (self.current_tier, observed_tier) { let is_worse = observed_tier < self.current_tier;
(Tier::Good, Tier::Degraded | Tier::Catastrophic) => true,
(Tier::Degraded, Tier::Catastrophic) => true,
_ => false,
};
if is_worse { if is_worse {
self.consecutive_up = 0; self.consecutive_up = 0;
self.consecutive_down += 1; self.consecutive_down += 1;
if self.consecutive_down >= self.downgrade_threshold() { if self.consecutive_down >= self.downgrade_threshold() {
// Jump directly to the observed tier (don't step one-at-a-time on downgrade)
self.current_tier = observed_tier; self.current_tier = observed_tier;
self.current_profile = observed_tier.profile(); self.current_profile = observed_tier.profile();
self.consecutive_down = 0; self.consecutive_down = 0;
@@ -232,22 +256,36 @@ impl AdaptiveQualityController {
// Better conditions // Better conditions
self.consecutive_down = 0; self.consecutive_down = 0;
self.consecutive_up += 1; self.consecutive_up += 1;
if self.consecutive_up >= UPGRADE_THRESHOLD { // Studio tiers require more consecutive good reports
let threshold = if self.current_tier >= Tier::Good {
STUDIO_UPGRADE_THRESHOLD
} else {
UPGRADE_THRESHOLD
};
if self.consecutive_up >= threshold {
// Only upgrade one step at a time // Only upgrade one step at a time
let next_tier = match self.current_tier { if let Some(next_tier) = self.upgrade_one_step() {
Tier::Catastrophic => Tier::Degraded, self.current_tier = next_tier;
Tier::Degraded => Tier::Good, self.current_profile = next_tier.profile();
Tier::Good => return None, self.consecutive_up = 0;
}; return Some(self.current_profile);
self.current_tier = next_tier; }
self.current_profile = next_tier.profile();
self.consecutive_up = 0;
return Some(self.current_profile);
} }
} }
None None
} }
fn upgrade_one_step(&self) -> Option<Tier> {
match self.current_tier {
Tier::Catastrophic => Some(Tier::Degraded),
Tier::Degraded => Some(Tier::Good),
Tier::Good => Some(Tier::Studio32k),
Tier::Studio32k => Some(Tier::Studio48k),
Tier::Studio48k => Some(Tier::Studio64k),
Tier::Studio64k => None,
}
}
} }
impl Default for AdaptiveQualityController { impl Default for AdaptiveQualityController {
@@ -331,25 +369,33 @@ mod tests {
} }
assert_eq!(ctrl.tier(), Tier::Catastrophic); assert_eq!(ctrl.tier(), Tier::Catastrophic);
// 9 good reports — not enough // 4 good reports — not enough (threshold is 5)
let good = make_report(2.0, 100); let good = make_report(0.5, 20); // studio-quality report
for _ in 0..9 { for _ in 0..4 {
assert!(ctrl.observe(&good).is_none()); assert!(ctrl.observe(&good).is_none());
} }
assert_eq!(ctrl.tier(), Tier::Catastrophic); assert_eq!(ctrl.tier(), Tier::Catastrophic);
// 10th good report triggers upgrade (one step: Catastrophic → Degraded) // 5th good report triggers upgrade (one step: Catastrophic → Degraded)
let result = ctrl.observe(&good); let result = ctrl.observe(&good);
assert!(result.is_some()); assert!(result.is_some());
assert_eq!(ctrl.tier(), Tier::Degraded); assert_eq!(ctrl.tier(), Tier::Degraded);
// Need another 10 to go from Degraded → Good // Another 5 to go from Degraded → Good
for _ in 0..9 { for _ in 0..4 {
assert!(ctrl.observe(&good).is_none()); assert!(ctrl.observe(&good).is_none());
} }
let result = ctrl.observe(&good); let result = ctrl.observe(&good);
assert!(result.is_some()); assert!(result.is_some());
assert_eq!(ctrl.tier(), Tier::Good); assert_eq!(ctrl.tier(), Tier::Good);
// Studio upgrades need 10 consecutive — Good → Studio32k
for _ in 0..9 {
assert!(ctrl.observe(&good).is_none());
}
let result = ctrl.observe(&good);
assert!(result.is_some());
assert_eq!(ctrl.tier(), Tier::Studio32k);
} }
#[test] #[test]
@@ -366,11 +412,29 @@ mod tests {
#[test] #[test]
fn tier_classification() { fn tier_classification() {
assert_eq!(Tier::classify(&make_report(5.0, 200)), Tier::Good); // Studio tiers
assert_eq!(Tier::classify(&make_report(15.0, 200)), Tier::Degraded); assert_eq!(Tier::classify(&make_report(0.5, 20)), Tier::Studio64k);
assert_eq!(Tier::classify(&make_report(5.0, 500)), Tier::Degraded); assert_eq!(Tier::classify(&make_report(0.5, 40)), Tier::Studio48k);
assert_eq!(Tier::classify(&make_report(50.0, 200)), Tier::Catastrophic); assert_eq!(Tier::classify(&make_report(1.5, 60)), Tier::Studio32k);
assert_eq!(Tier::classify(&make_report(5.0, 700)), Tier::Catastrophic); // Good/Degraded/Catastrophic
assert_eq!(Tier::classify(&make_report(3.0, 90)), Tier::Good);
assert_eq!(Tier::classify(&make_report(6.0, 120)), Tier::Degraded);
assert_eq!(Tier::classify(&make_report(16.0, 120)), Tier::Catastrophic);
assert_eq!(Tier::classify(&make_report(5.0, 200)), Tier::Catastrophic);
}
#[test]
fn studio_tier_boundaries() {
// loss < 1% AND RTT < 30ms → Studio64k
assert_eq!(Tier::classify(&make_report(0.9, 28)), Tier::Studio64k);
// loss < 1% AND RTT 30-49ms → Studio48k
assert_eq!(Tier::classify(&make_report(0.9, 32)), Tier::Studio48k);
// loss < 2% AND RTT < 80ms → Studio32k (but loss >= 1%)
assert_eq!(Tier::classify(&make_report(1.5, 40)), Tier::Studio32k);
// loss >= 2% → Good (use 2.5 to survive u8 quantization)
assert_eq!(Tier::classify(&make_report(2.5, 40)), Tier::Good);
// RTT 80ms → Good
assert_eq!(Tier::classify(&make_report(0.5, 80)), Tier::Good);
} }
// --------------------------------------------------------------- // ---------------------------------------------------------------
@@ -379,8 +443,8 @@ mod tests {
#[test] #[test]
fn cellular_tighter_thresholds() { fn cellular_tighter_thresholds() {
// 12% loss: Good on WiFi, Degraded on cellular // 9% loss: Degraded on both WiFi (>=5%) and cellular (>=8%)
let report = make_report(12.0, 200); let report = make_report(9.0, 80);
assert_eq!( assert_eq!(
Tier::classify_with_context(&report, NetworkContext::WiFi), Tier::classify_with_context(&report, NetworkContext::WiFi),
Tier::Degraded Tier::Degraded
@@ -390,22 +454,22 @@ mod tests {
Tier::Degraded Tier::Degraded
); );
// 9% loss: Good on WiFi, Degraded on cellular // 6% loss, low RTT: Degraded on WiFi (>=5%), Good on cellular (<8%)
let report = make_report(9.0, 200); let report = make_report(6.0, 80);
assert_eq!( assert_eq!(
Tier::classify_with_context(&report, NetworkContext::WiFi), Tier::classify_with_context(&report, NetworkContext::WiFi),
Tier::Degraded
);
assert_eq!(
Tier::classify_with_context(&report, NetworkContext::CellularLte),
Tier::Good Tier::Good
); );
assert_eq!(
Tier::classify_with_context(&report, NetworkContext::CellularLte),
Tier::Degraded
);
// 30% loss: Degraded on WiFi, Catastrophic on cellular // 30% loss: Catastrophic on WiFi (>=15%), Catastrophic on cellular (>=25%)
let report = make_report(30.0, 200); let report = make_report(30.0, 80);
assert_eq!( assert_eq!(
Tier::classify_with_context(&report, NetworkContext::WiFi), Tier::classify_with_context(&report, NetworkContext::WiFi),
Tier::Degraded Tier::Catastrophic
); );
assert_eq!( assert_eq!(
Tier::classify_with_context(&report, NetworkContext::Cellular3g), Tier::classify_with_context(&report, NetworkContext::Cellular3g),
@@ -415,15 +479,29 @@ mod tests {
#[test] #[test]
fn cellular_rtt_thresholds() { fn cellular_rtt_thresholds() {
// RTT 350ms: Good on WiFi, Degraded on cellular // RTT 150ms: Degraded on WiFi (>=100ms), Good on cellular (<300ms and loss<8%)
let report = make_report(2.0, 348); // rtt_4ms rounds so use 348 let report = make_report(2.0, 148);
assert_eq!( assert_eq!(
Tier::classify_with_context(&report, NetworkContext::WiFi), Tier::classify_with_context(&report, NetworkContext::WiFi),
Tier::Good Tier::Degraded
); );
assert_eq!( assert_eq!(
Tier::classify_with_context(&report, NetworkContext::CellularLte), Tier::classify_with_context(&report, NetworkContext::CellularLte),
Tier::Degraded Tier::Good
);
}
#[test]
fn cellular_no_studio_tiers() {
// Even with perfect network, cellular stays at Good (no studio)
let report = make_report(0.0, 10);
assert_eq!(
Tier::classify_with_context(&report, NetworkContext::CellularLte),
Tier::Good
);
assert_eq!(
Tier::classify_with_context(&report, NetworkContext::WiFi),
Tier::Studio64k
); );
} }
@@ -469,6 +547,9 @@ mod tests {
#[test] #[test]
fn tier_downgrade() { fn tier_downgrade() {
assert_eq!(Tier::Studio64k.downgrade(), Some(Tier::Studio48k));
assert_eq!(Tier::Studio48k.downgrade(), Some(Tier::Studio32k));
assert_eq!(Tier::Studio32k.downgrade(), Some(Tier::Good));
assert_eq!(Tier::Good.downgrade(), Some(Tier::Degraded)); assert_eq!(Tier::Good.downgrade(), Some(Tier::Degraded));
assert_eq!(Tier::Degraded.downgrade(), Some(Tier::Catastrophic)); assert_eq!(Tier::Degraded.downgrade(), Some(Tier::Catastrophic));
assert_eq!(Tier::Catastrophic.downgrade(), None); assert_eq!(Tier::Catastrophic.downgrade(), None);

View File

@@ -96,6 +96,62 @@ impl DebugTap {
"TAP EVENT" "TAP EVENT"
); );
} }
pub fn log_stats(&self, room: &str, stats: &TapStats) {
let codecs: Vec<String> = stats.codecs_seen.iter().map(|c| format!("{c:?}")).collect();
info!(
target: "debug_tap",
room = %room,
period = "5s",
in_pkts = stats.in_pkts,
out_pkts = stats.out_pkts,
fan_out_avg = format!("{:.1}", if stats.in_pkts > 0 { stats.out_pkts as f64 / stats.in_pkts as f64 } else { 0.0 }),
seq_gaps = stats.seq_gaps,
codecs_seen = ?codecs,
"TAP STATS"
);
}
}
/// Per-participant stats for the debug tap periodic summary.
pub struct TapStats {
pub in_pkts: u64,
pub out_pkts: u64,
pub seq_gaps: u64,
pub codecs_seen: std::collections::HashSet<wzp_proto::CodecId>,
last_seq: Option<u16>,
}
impl TapStats {
pub fn new() -> Self {
Self {
in_pkts: 0,
out_pkts: 0,
seq_gaps: 0,
codecs_seen: std::collections::HashSet::new(),
last_seq: None,
}
}
pub fn record_in(&mut self, pkt: &wzp_proto::MediaPacket, fan_out: usize) {
self.in_pkts += 1;
self.out_pkts += fan_out as u64;
self.codecs_seen.insert(pkt.header.codec_id);
if let Some(prev) = self.last_seq {
let expected = prev.wrapping_add(1);
if pkt.header.seq != expected {
self.seq_gaps += 1;
}
}
self.last_seq = Some(pkt.header.seq);
}
pub fn reset_period(&mut self) {
self.in_pkts = 0;
self.out_pkts = 0;
self.seq_gaps = 0;
// Keep codecs_seen and last_seq across periods
}
} }
/// Tracks network quality for a single participant in a room. /// Tracks network quality for a single participant in a room.
@@ -129,11 +185,7 @@ impl ParticipantQuality {
fn weakest_tier<'a>(qualities: impl Iterator<Item = &'a ParticipantQuality>) -> Tier { fn weakest_tier<'a>(qualities: impl Iterator<Item = &'a ParticipantQuality>) -> Tier {
qualities qualities
.map(|pq| pq.current_tier) .map(|pq| pq.current_tier)
.min_by_key(|t| match t { .min()
Tier::Good => 2,
Tier::Degraded => 1,
Tier::Catastrophic => 0,
})
.unwrap_or(Tier::Good) .unwrap_or(Tier::Good)
} }
@@ -638,6 +690,12 @@ async fn run_participant_plain(
let mut send_errors = 0u64; let mut send_errors = 0u64;
let mut last_log_instant = std::time::Instant::now(); let mut last_log_instant = std::time::Instant::now();
let mut tap_stats = if debug_tap.as_ref().map_or(false, |t| t.matches(&room_name)) {
Some(TapStats::new())
} else {
None
};
info!( info!(
room = %room_name, room = %room_name,
participant = participant_id, participant = participant_id,
@@ -717,12 +775,15 @@ async fn run_participant_plain(
broadcast_signal(&all_senders, &directive).await; broadcast_signal(&all_senders, &directive).await;
} }
// Debug tap: log packet metadata // Debug tap: log packet metadata + record stats
if let Some(ref tap) = debug_tap { if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) { if tap.matches(&room_name) {
tap.log_packet(&room_name, "in", &addr, &pkt, others.len()); tap.log_packet(&room_name, "in", &addr, &pkt, others.len());
} }
} }
if let Some(ref mut ts) = tap_stats {
ts.record_in(&pkt, others.len());
}
// Forward to all others // Forward to all others
let fwd_start = std::time::Instant::now(); let fwd_start = std::time::Instant::now();
@@ -795,6 +856,10 @@ async fn run_participant_plain(
send_errors, send_errors,
"participant stats" "participant stats"
); );
if let (Some(tap), Some(ts)) = (&debug_tap, &mut tap_stats) {
tap.log_stats(&room_name, ts);
ts.reset_period();
}
max_recv_gap_ms = 0; max_recv_gap_ms = 0;
max_forward_ms = 0; max_forward_ms = 0;
last_log_instant = std::time::Instant::now(); last_log_instant = std::time::Instant::now();

View File

@@ -1114,10 +1114,11 @@ impl CallEngine {
} }
}); });
// Signal task (presence — same shape as desktop). // Signal task (presence + quality directives).
let sig_t = transport.clone(); let sig_t = transport.clone();
let sig_r = running.clone(); let sig_r = running.clone();
let sig_p = participants.clone(); let sig_p = participants.clone();
let sig_pending_profile = pending_profile.clone();
let event_cb = Arc::new(event_cb); let event_cb = Arc::new(event_cb);
let sig_cb = event_cb.clone(); let sig_cb = event_cb.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -1149,6 +1150,18 @@ impl CallEngine {
*sig_p.lock().await = unique; *sig_p.lock().await = unique;
sig_cb("room-update", &format!("{count} participants")); sig_cb("room-update", &format!("{count} participants"));
} }
Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
}))) => {
let idx = profile_to_index(&recommended_profile);
info!(
codec = ?recommended_profile.codec,
reason = reason.as_deref().unwrap_or(""),
"relay quality directive: switching profile"
);
sig_pending_profile.store(idx, Ordering::Release);
}
Ok(Ok(Some(_))) => {} Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break, Ok(Ok(None)) => break,
Ok(Err(_)) => break, Ok(Err(_)) => break,
@@ -1534,10 +1547,11 @@ impl CallEngine {
} }
}); });
// Signal task (presence) // Signal task (presence + quality directives)
let sig_t = transport.clone(); let sig_t = transport.clone();
let sig_r = running.clone(); let sig_r = running.clone();
let sig_p = participants.clone(); let sig_p = participants.clone();
let sig_pending_profile = pending_profile.clone();
let event_cb = Arc::new(event_cb); let event_cb = Arc::new(event_cb);
let sig_cb = event_cb.clone(); let sig_cb = event_cb.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -1569,6 +1583,18 @@ impl CallEngine {
*sig_p.lock().await = unique; *sig_p.lock().await = unique;
sig_cb("room-update", &format!("{count} participants")); sig_cb("room-update", &format!("{count} participants"));
} }
Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
}))) => {
let idx = profile_to_index(&recommended_profile);
info!(
codec = ?recommended_profile.codec,
reason = reason.as_deref().unwrap_or(""),
"relay quality directive: switching profile"
);
sig_pending_profile.store(idx, Ordering::Release);
}
Ok(Ok(Some(_))) => {} Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break, Ok(Ok(None)) => break,
Ok(Err(_)) => break, Ok(Err(_)) => break,