feat: adaptive quality in desktop, relay quality directive, Oboe state polling

- Wire AdaptiveQualityController into desktop engine send/recv tasks
  (mirrors Android pattern: AtomicU8 pending_profile, auto-mode check)
- Wire same into Android engine send task (was only in recv before)
- QualityDirective SignalMessage variant for relay-initiated codec switch
- ParticipantQuality tracking in relay RoomManager (per-participant
  AdaptiveQualityController, weakest-link tier computation)
- Relay broadcasts QualityDirective to all participants when room-wide
  tier degrades (coordinated codec switching)
- Oboe stream state polling: poll getState() for up to 2s after
  requestStart() to ensure both streams reach Started before proceeding
  (fixes intermittent silent calls on cold start, Nothing Phone A059)

Tasks: #7, #25, #26, #31, #35

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-12 19:54:04 +04:00
parent 766c9df442
commit 22045bc5e6
5 changed files with 266 additions and 10 deletions

View File

@@ -13,6 +13,8 @@ use tokio::sync::Mutex;
use tracing::{error, info, warn};
use wzp_proto::packet::TrunkFrame;
use wzp_proto::quality::{AdaptiveQualityController, Tier};
use wzp_proto::traits::QualityController;
use wzp_proto::MediaTransport;
use crate::metrics::RelayMetrics;
@@ -50,6 +52,45 @@ impl DebugTap {
}
}
/// Tracks network quality for a single participant in a room.
struct ParticipantQuality {
controller: AdaptiveQualityController,
current_tier: Tier,
}
impl ParticipantQuality {
fn new() -> Self {
Self {
controller: AdaptiveQualityController::new(),
current_tier: Tier::Good,
}
}
/// Feed a quality report and return the new tier if it changed.
fn observe(&mut self, report: &wzp_proto::packet::QualityReport) -> Option<Tier> {
let _ = self.controller.observe(report);
let new_tier = self.controller.tier();
if new_tier != self.current_tier {
self.current_tier = new_tier;
Some(new_tier)
} else {
None
}
}
}
/// Compute the weakest (worst) quality tier across all tracked participants.
fn weakest_tier<'a>(qualities: impl Iterator<Item = &'a ParticipantQuality>) -> Tier {
qualities
.map(|pq| pq.current_tier)
.min_by_key(|t| match t {
Tier::Good => 2,
Tier::Degraded => 1,
Tier::Catastrophic => 0,
})
.unwrap_or(Tier::Good)
}
/// Unique participant ID within a room.
pub type ParticipantId = u64;
@@ -208,6 +249,10 @@ pub struct RoomManager {
acl: Option<HashMap<String, HashSet<String>>>,
/// Channel for room lifecycle events (federation subscribes).
event_tx: tokio::sync::broadcast::Sender<RoomEvent>,
/// Per-participant quality tracking, keyed by (room_name, participant_id).
qualities: HashMap<(String, ParticipantId), ParticipantQuality>,
/// Current room-wide tier per room (to avoid repeated broadcasts).
room_tiers: HashMap<String, Tier>,
}
impl RoomManager {
@@ -217,6 +262,8 @@ impl RoomManager {
rooms: HashMap::new(),
acl: None,
event_tx,
qualities: HashMap::new(),
room_tiers: HashMap::new(),
}
}
@@ -227,6 +274,8 @@ impl RoomManager {
rooms: HashMap::new(),
acl: Some(HashMap::new()),
event_tx,
qualities: HashMap::new(),
room_tiers: HashMap::new(),
}
}
@@ -277,6 +326,7 @@ impl RoomManager {
|| self.rooms.get(room_name).map_or(true, |r| r.is_empty());
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()));
self.qualities.insert((room_name.to_string(), id), ParticipantQuality::new());
if was_empty {
let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() });
}
@@ -323,10 +373,12 @@ impl RoomManager {
/// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty.
pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
self.qualities.remove(&(room_name.to_string(), participant_id));
if let Some(room) = self.rooms.get_mut(room_name) {
room.remove(participant_id);
if room.is_empty() {
self.rooms.remove(room_name);
self.room_tiers.remove(room_name);
let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() });
info!(room = room_name, "room closed (empty)");
return None;
@@ -363,6 +415,58 @@ impl RoomManager {
pub fn list(&self) -> Vec<(String, usize)> {
self.rooms.iter().map(|(k, v)| (k.clone(), v.len())).collect()
}
/// Feed a quality report from a participant. If the room-wide weakest
/// tier changes, returns `(QualityDirective signal, all senders)` for
/// broadcasting.
pub fn observe_quality(
&mut self,
room_name: &str,
participant_id: ParticipantId,
report: &wzp_proto::packet::QualityReport,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
let key = (room_name.to_string(), participant_id);
let tier_changed = self.qualities
.get_mut(&key)
.and_then(|pq| pq.observe(report))
.is_some();
if !tier_changed {
return None;
}
// Compute the weakest tier across all participants in this room
let room_qualities = self.qualities.iter()
.filter(|((rn, _), _)| rn == room_name)
.map(|(_, pq)| pq);
let weakest = weakest_tier(room_qualities);
let current_room_tier = self.room_tiers.get(room_name).copied().unwrap_or(Tier::Good);
if weakest == current_room_tier {
return None;
}
// Room-wide tier changed — update and broadcast directive
self.room_tiers.insert(room_name.to_string(), weakest);
let profile = weakest.profile();
info!(
room = room_name,
old_tier = ?current_room_tier,
new_tier = ?weakest,
codec = ?profile.codec,
fec_ratio = profile.fec_ratio,
"room quality directive"
);
let directive = wzp_proto::SignalMessage::QualityDirective {
recommended_profile: profile,
reason: Some(format!("weakest link: {weakest:?}")),
};
let senders = self.rooms.get(room_name)
.map(|r| r.all_senders())
.unwrap_or_default();
Some((directive, senders))
}
}
// ---------------------------------------------------------------------------
@@ -535,11 +639,17 @@ async fn run_participant_plain(
metrics.update_session_quality(session_id, report);
}
// Get current list of other participants
// Get current list of other participants + check quality directive
let lock_start = std::time::Instant::now();
let others = {
let mgr = room_mgr.lock().await;
mgr.others(&room_name, participant_id)
let (others, quality_directive) = {
let mut mgr = room_mgr.lock().await;
let directive = if let Some(ref report) = pkt.quality_report {
mgr.observe_quality(&room_name, participant_id, report)
} else {
None
};
let o = mgr.others(&room_name, participant_id);
(o, directive)
};
let lock_ms = lock_start.elapsed().as_millis() as u64;
if lock_ms > 10 {
@@ -551,6 +661,11 @@ async fn run_participant_plain(
);
}
// Broadcast quality directive to all participants if tier changed
if let Some((directive, all_senders)) = quality_directive {
broadcast_signal(&all_senders, &directive).await;
}
// Debug tap: log packet metadata
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
@@ -719,9 +834,15 @@ async fn run_participant_trunked(
}
let lock_start = std::time::Instant::now();
let others = {
let mgr = room_mgr.lock().await;
mgr.others(&room_name, participant_id)
let (others, quality_directive) = {
let mut mgr = room_mgr.lock().await;
let directive = if let Some(ref report) = pkt.quality_report {
mgr.observe_quality(&room_name, participant_id, report)
} else {
None
};
let o = mgr.others(&room_name, participant_id);
(o, directive)
};
let lock_ms = lock_start.elapsed().as_millis() as u64;
if lock_ms > 10 {
@@ -733,6 +854,11 @@ async fn run_participant_trunked(
);
}
// Broadcast quality directive to all participants if tier changed
if let Some((directive, all_senders)) = quality_directive {
broadcast_signal(&all_senders, &directive).await;
}
let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {