diff --git a/crates/wzp-proto/src/quality.rs b/crates/wzp-proto/src/quality.rs index 882216e..2859672 100644 --- a/crates/wzp-proto/src/quality.rs +++ b/crates/wzp-proto/src/quality.rs @@ -135,6 +135,10 @@ pub struct AdaptiveQualityController { fec_boost_until: Option, /// FEC boost amount to add during handoff recovery window. fec_boost_amount: f32, + /// Probing state: when Some, we're actively testing a higher tier. + probe: Option, + /// Time spent stable at the current tier (for probe trigger). + stable_since: Option, } /// Threshold for downgrading (fast reaction to degradation). @@ -151,6 +155,28 @@ const HISTORY_SIZE: usize = 20; const DEFAULT_FEC_BOOST: f32 = 0.2; /// Duration of FEC boost after a network handoff. const FEC_BOOST_DURATION_SECS: u64 = 10; +/// Minimum time stable at current tier before probing upward (30 seconds). +const PROBE_STABLE_SECS: u64 = 30; +/// Duration of a probe window (5 seconds — ~25 quality reports at 1/s). +const PROBE_DURATION_SECS: u64 = 5; +/// Maximum bad reports during probe before aborting (1 out of ~5 = 20%). +const PROBE_MAX_BAD: u32 = 1; +/// Cooldown after a failed probe before trying again (60 seconds). +const PROBE_COOLDOWN_SECS: u64 = 60; + +/// Active bandwidth probe state. +struct ProbeState { + /// The tier we're probing (one step above current). + target_tier: Tier, + /// Profile to apply during probe. + target_profile: QualityProfile, + /// When the probe started. + started: Instant, + /// Reports observed during probe. + probe_reports: u32, + /// Bad reports during probe (loss/RTT exceeded target tier thresholds). + bad_reports: u32, +} impl AdaptiveQualityController { pub fn new() -> Self { @@ -164,6 +190,8 @@ impl AdaptiveQualityController { network_context: NetworkContext::default(), fec_boost_until: None, fec_boost_amount: DEFAULT_FEC_BOOST, + probe: None, + stable_since: None, } } @@ -203,6 +231,10 @@ impl AdaptiveQualityController { self.forced = false; } + // Cancel any active probe + self.probe = None; + self.stable_since = None; + // Activate FEC boost for any network change self.fec_boost_until = Some(Instant::now() + Duration::from_secs(FEC_BOOST_DURATION_SECS)); } @@ -223,6 +255,8 @@ impl AdaptiveQualityController { pub fn reset_counters(&mut self) { self.consecutive_up = 0; self.consecutive_down = 0; + self.probe = None; + self.stable_since = None; } /// Get the effective downgrade threshold based on network context. @@ -278,6 +312,85 @@ impl AdaptiveQualityController { None } + /// Check whether to start, continue, or conclude a bandwidth probe. + /// + /// Called from `observe()` when no hysteresis transition fired. + fn check_probe(&mut self, observed_tier: Tier) -> Option { + // Don't probe if forced, or if already at highest tier, or on cellular + if self.forced || self.current_tier == Tier::Studio64k { + return None; + } + if matches!( + self.network_context, + NetworkContext::CellularLte | NetworkContext::Cellular5g | NetworkContext::Cellular3g + ) { + return None; + } + + // If we have an active probe, evaluate it + if let Some(ref mut probe) = self.probe { + probe.probe_reports += 1; + + // Check if the observed tier meets the probe target + if observed_tier < probe.target_tier { + probe.bad_reports += 1; + } + + // Probe failed: too many bad reports + if probe.bad_reports > PROBE_MAX_BAD { + let _failed_probe = self.probe.take(); + // Reset stable_since to trigger cooldown + self.stable_since = + Some(Instant::now() + Duration::from_secs(PROBE_COOLDOWN_SECS)); + return None; // stay at current tier + } + + // Probe succeeded: enough good reports within the window + if probe.started.elapsed() >= Duration::from_secs(PROBE_DURATION_SECS) { + let target = probe.target_tier; + let profile = probe.target_profile; + self.probe.take(); + self.current_tier = target; + self.current_profile = profile; + self.consecutive_up = 0; + self.stable_since = Some(Instant::now()); + return Some(profile); + } + + return None; // probe still running + } + + // No active probe — check if we should start one + if observed_tier >= self.current_tier { + // Track stability + if self.stable_since.is_none() { + self.stable_since = Some(Instant::now()); + } + + if let Some(stable_since) = self.stable_since { + if stable_since.elapsed() >= Duration::from_secs(PROBE_STABLE_SECS) { + // Stable long enough — start probing + if let Some(next) = self.upgrade_one_step() { + self.probe = Some(ProbeState { + target_tier: next, + target_profile: next.profile(), + started: Instant::now(), + probe_reports: 0, + bad_reports: 0, + }); + // Return the probe profile so the encoder switches + return Some(next.profile()); + } + } + } + } else { + // Conditions degraded — reset stability timer + self.stable_since = None; + } + + None + } + fn upgrade_one_step(&self) -> Option { match self.current_tier { Tier::Catastrophic => Some(Tier::Degraded), @@ -309,7 +422,17 @@ impl QualityController for AdaptiveQualityController { } let observed = Tier::classify_with_context(report, self.network_context); - self.try_transition(observed) + + // First check for downgrades/upgrades via hysteresis + if let Some(profile) = self.try_transition(observed) { + // Cancel any active probe on tier change + self.probe.take(); + self.stable_since = None; + return Some(profile); + } + + // Then check probing + self.check_probe(observed) } fn force_profile(&mut self, profile: QualityProfile) { @@ -561,4 +684,97 @@ mod tests { fn network_context_default() { assert_eq!(NetworkContext::default(), NetworkContext::Unknown); } + + // --------------------------------------------------------------- + // Bandwidth probing tests + // --------------------------------------------------------------- + + #[test] + fn probe_triggers_after_stable_period() { + let mut ctrl = AdaptiveQualityController::new(); + let excellent = make_report(0.3, 20); // would classify as Studio64k + + // Starts at Good. Fast-forward stability by setting stable_since directly. + ctrl.stable_since = Some(Instant::now() - Duration::from_secs(31)); + + // One excellent report should trigger a probe (Good → Studio32k) + let result = ctrl.observe(&excellent); + assert!(result.is_some(), "should start probe after 30s stable"); + assert!(ctrl.probe.is_some(), "probe should be active"); + assert_eq!(ctrl.probe.as_ref().unwrap().target_tier, Tier::Studio32k); + } + + #[test] + fn probe_succeeds_after_window() { + let mut ctrl = AdaptiveQualityController::new(); + ctrl.stable_since = Some(Instant::now() - Duration::from_secs(31)); + + let excellent = make_report(0.3, 20); + + // Trigger probe start + let result = ctrl.observe(&excellent); + assert!(result.is_some()); + + // Simulate probe window elapsed by backdating started + ctrl.probe.as_mut().unwrap().started = + Instant::now() - Duration::from_secs(PROBE_DURATION_SECS); + + // Next good report should finalize the probe + let result = ctrl.observe(&excellent); + assert!(result.is_some(), "probe should succeed"); + assert_eq!(ctrl.current_tier, Tier::Studio32k); + assert!(ctrl.probe.is_none(), "probe should be cleared"); + } + + #[test] + fn probe_fails_on_bad_reports() { + let mut ctrl = AdaptiveQualityController::new(); + // Put controller at Studio32k, pretend we've been stable + ctrl.current_tier = Tier::Studio32k; + ctrl.current_profile = Tier::Studio32k.profile(); + ctrl.stable_since = Some(Instant::now() - Duration::from_secs(31)); + + // Start a probe to Studio48k + let excellent = make_report(0.3, 20); + let result = ctrl.observe(&excellent); + assert!(result.is_some()); // probe started + assert_eq!(ctrl.probe.as_ref().unwrap().target_tier, Tier::Studio48k); + + // Feed bad reports (loss too high for Studio48k) + let degraded = make_report(3.0, 100); + ctrl.observe(°raded); // first bad + ctrl.observe(°raded); // second bad — exceeds PROBE_MAX_BAD (1) + + // Probe should be cancelled + assert!(ctrl.probe.is_none(), "probe should be cancelled after bad reports"); + // Should still be at Studio32k (not upgraded) + assert_eq!(ctrl.current_tier, Tier::Studio32k); + } + + #[test] + fn no_probe_on_cellular() { + let mut ctrl = AdaptiveQualityController::new(); + ctrl.signal_network_change(NetworkContext::CellularLte); + ctrl.current_tier = Tier::Good; + ctrl.current_profile = Tier::Good.profile(); + ctrl.stable_since = Some(Instant::now() - Duration::from_secs(60)); + + let good = make_report(0.5, 40); + let result = ctrl.observe(&good); + // Should NOT probe on cellular + assert!(ctrl.probe.is_none(), "should not probe on cellular"); + assert!(result.is_none() || ctrl.current_tier == Tier::Good); + } + + #[test] + fn no_probe_at_highest_tier() { + let mut ctrl = AdaptiveQualityController::new(); + ctrl.current_tier = Tier::Studio64k; + ctrl.current_profile = Tier::Studio64k.profile(); + ctrl.stable_since = Some(Instant::now() - Duration::from_secs(60)); + + let excellent = make_report(0.1, 10); + let result = ctrl.observe(&excellent); + assert!(result.is_none(), "should not probe when already at Studio64k"); + } }