diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index 344eaf4..3871c55 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -131,6 +131,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { // bridge. Catch-all mapping for completeness. SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer, SignalMessage::MediaPathReport { .. } => CallSignalType::Offer, // control-plane + SignalMessage::QualityDirective { .. } => CallSignalType::Offer, // relay-initiated } } diff --git a/crates/wzp-native/cpp/oboe_bridge.cpp b/crates/wzp-native/cpp/oboe_bridge.cpp index 82ba79c..1e36d93 100644 --- a/crates/wzp-native/cpp/oboe_bridge.cpp +++ b/crates/wzp-native/cpp/oboe_bridge.cpp @@ -8,6 +8,8 @@ #include #include #include +#include +#include #define LOG_TAG "wzp-oboe" #define LOGI(...) __android_log_print(ANDROID_LOG_INFO, LOG_TAG, __VA_ARGS__) @@ -388,6 +390,38 @@ int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) { return -5; } + // Log initial stream states right after requestStart() returns. + // On well-behaved HALs both will already be Started; on others + // (Nothing A059) they may still be in Starting state. + LOGI("requestStart returned: capture_state=%d playout_state=%d", + (int)g_capture_stream->getState(), + (int)g_playout_stream->getState()); + + // Poll until both streams report Started state, up to 2s timeout. + // Some Android HALs (Nothing A059) delay transitioning from Starting + // to Started; proceeding before the transition completes causes the + // first capture/playout callbacks to be dropped silently. + { + auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(2000); + int poll_count = 0; + while (std::chrono::steady_clock::now() < deadline) { + auto cap_state = g_capture_stream->getState(); + auto play_state = g_playout_stream->getState(); + if (cap_state == oboe::StreamState::Started && + play_state == oboe::StreamState::Started) { + LOGI("both streams Started after %d polls", poll_count); + break; + } + poll_count++; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + // Log final state even on timeout (helps diagnose HAL quirks) + LOGI("stream states after poll: capture=%d playout=%d (polls=%d)", + (int)g_capture_stream->getState(), + (int)g_playout_stream->getState(), + poll_count); + } + LOGI("Oboe started: sr=%d burst=%d ch=%d", config->sample_rate, config->frames_per_burst, config->channel_count); return 0; diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index 30c5755..04960f1 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -917,6 +917,14 @@ pub enum SignalMessage { /// federation link via `send_signal_to_peer`. origin_relay_fp: String, }, + + /// Relay-initiated quality directive: all participants should switch + /// to the recommended profile to match the weakest link. + QualityDirective { + recommended_profile: crate::QualityProfile, + #[serde(default, skip_serializing_if = "Option::is_none")] + reason: Option, + }, } /// How the callee responds to a direct call. diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 9dcce62..50961f5 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -13,6 +13,8 @@ use tokio::sync::Mutex; use tracing::{error, info, warn}; use wzp_proto::packet::TrunkFrame; +use wzp_proto::quality::{AdaptiveQualityController, Tier}; +use wzp_proto::traits::QualityController; use wzp_proto::MediaTransport; use crate::metrics::RelayMetrics; @@ -50,6 +52,45 @@ impl DebugTap { } } +/// Tracks network quality for a single participant in a room. +struct ParticipantQuality { + controller: AdaptiveQualityController, + current_tier: Tier, +} + +impl ParticipantQuality { + fn new() -> Self { + Self { + controller: AdaptiveQualityController::new(), + current_tier: Tier::Good, + } + } + + /// Feed a quality report and return the new tier if it changed. + fn observe(&mut self, report: &wzp_proto::packet::QualityReport) -> Option { + let _ = self.controller.observe(report); + let new_tier = self.controller.tier(); + if new_tier != self.current_tier { + self.current_tier = new_tier; + Some(new_tier) + } else { + None + } + } +} + +/// Compute the weakest (worst) quality tier across all tracked participants. +fn weakest_tier<'a>(qualities: impl Iterator) -> Tier { + qualities + .map(|pq| pq.current_tier) + .min_by_key(|t| match t { + Tier::Good => 2, + Tier::Degraded => 1, + Tier::Catastrophic => 0, + }) + .unwrap_or(Tier::Good) +} + /// Unique participant ID within a room. pub type ParticipantId = u64; @@ -208,6 +249,10 @@ pub struct RoomManager { acl: Option>>, /// Channel for room lifecycle events (federation subscribes). event_tx: tokio::sync::broadcast::Sender, + /// Per-participant quality tracking, keyed by (room_name, participant_id). + qualities: HashMap<(String, ParticipantId), ParticipantQuality>, + /// Current room-wide tier per room (to avoid repeated broadcasts). + room_tiers: HashMap, } impl RoomManager { @@ -217,6 +262,8 @@ impl RoomManager { rooms: HashMap::new(), acl: None, event_tx, + qualities: HashMap::new(), + room_tiers: HashMap::new(), } } @@ -227,6 +274,8 @@ impl RoomManager { rooms: HashMap::new(), acl: Some(HashMap::new()), event_tx, + qualities: HashMap::new(), + room_tiers: HashMap::new(), } } @@ -277,6 +326,7 @@ impl RoomManager { || self.rooms.get(room_name).map_or(true, |r| r.is_empty()); let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string())); + self.qualities.insert((room_name.to_string(), id), ParticipantQuality::new()); if was_empty { let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() }); } @@ -323,10 +373,12 @@ impl RoomManager { /// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty. pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec)> { + self.qualities.remove(&(room_name.to_string(), participant_id)); if let Some(room) = self.rooms.get_mut(room_name) { room.remove(participant_id); if room.is_empty() { self.rooms.remove(room_name); + self.room_tiers.remove(room_name); let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() }); info!(room = room_name, "room closed (empty)"); return None; @@ -363,6 +415,58 @@ impl RoomManager { pub fn list(&self) -> Vec<(String, usize)> { self.rooms.iter().map(|(k, v)| (k.clone(), v.len())).collect() } + + /// Feed a quality report from a participant. If the room-wide weakest + /// tier changes, returns `(QualityDirective signal, all senders)` for + /// broadcasting. + pub fn observe_quality( + &mut self, + room_name: &str, + participant_id: ParticipantId, + report: &wzp_proto::packet::QualityReport, + ) -> Option<(wzp_proto::SignalMessage, Vec)> { + let key = (room_name.to_string(), participant_id); + let tier_changed = self.qualities + .get_mut(&key) + .and_then(|pq| pq.observe(report)) + .is_some(); + + if !tier_changed { + return None; + } + + // Compute the weakest tier across all participants in this room + let room_qualities = self.qualities.iter() + .filter(|((rn, _), _)| rn == room_name) + .map(|(_, pq)| pq); + let weakest = weakest_tier(room_qualities); + + let current_room_tier = self.room_tiers.get(room_name).copied().unwrap_or(Tier::Good); + if weakest == current_room_tier { + return None; + } + + // Room-wide tier changed — update and broadcast directive + self.room_tiers.insert(room_name.to_string(), weakest); + let profile = weakest.profile(); + info!( + room = room_name, + old_tier = ?current_room_tier, + new_tier = ?weakest, + codec = ?profile.codec, + fec_ratio = profile.fec_ratio, + "room quality directive" + ); + + let directive = wzp_proto::SignalMessage::QualityDirective { + recommended_profile: profile, + reason: Some(format!("weakest link: {weakest:?}")), + }; + let senders = self.rooms.get(room_name) + .map(|r| r.all_senders()) + .unwrap_or_default(); + Some((directive, senders)) + } } // --------------------------------------------------------------------------- @@ -535,11 +639,17 @@ async fn run_participant_plain( metrics.update_session_quality(session_id, report); } - // Get current list of other participants + // Get current list of other participants + check quality directive let lock_start = std::time::Instant::now(); - let others = { - let mgr = room_mgr.lock().await; - mgr.others(&room_name, participant_id) + let (others, quality_directive) = { + let mut mgr = room_mgr.lock().await; + let directive = if let Some(ref report) = pkt.quality_report { + mgr.observe_quality(&room_name, participant_id, report) + } else { + None + }; + let o = mgr.others(&room_name, participant_id); + (o, directive) }; let lock_ms = lock_start.elapsed().as_millis() as u64; if lock_ms > 10 { @@ -551,6 +661,11 @@ async fn run_participant_plain( ); } + // Broadcast quality directive to all participants if tier changed + if let Some((directive, all_senders)) = quality_directive { + broadcast_signal(&all_senders, &directive).await; + } + // Debug tap: log packet metadata if let Some(ref tap) = debug_tap { if tap.matches(&room_name) { @@ -719,9 +834,15 @@ async fn run_participant_trunked( } let lock_start = std::time::Instant::now(); - let others = { - let mgr = room_mgr.lock().await; - mgr.others(&room_name, participant_id) + let (others, quality_directive) = { + let mut mgr = room_mgr.lock().await; + let directive = if let Some(ref report) = pkt.quality_report { + mgr.observe_quality(&room_name, participant_id, report) + } else { + None + }; + let o = mgr.others(&room_name, participant_id); + (o, directive) }; let lock_ms = lock_start.elapsed().as_millis() as u64; if lock_ms > 10 { @@ -733,6 +854,11 @@ async fn run_participant_trunked( ); } + // Broadcast quality directive to all participants if tier changed + if let Some((directive, all_senders)) = quality_directive { + broadcast_signal(&all_senders, &directive).await; + } + let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; for other in &others { diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 3880711..59828fc 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -9,7 +9,7 @@ //! still fails cleanly but the rest of the engine code links in. use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; use tauri::Emitter; @@ -26,11 +26,38 @@ use wzp_client::audio_io::{AudioCapture, AudioPlayback}; // Android (where wzp-client is pulled in with default-features=false). use wzp_client::call::{CallConfig, CallEncoder}; -use wzp_proto::traits::AudioDecoder; -use wzp_proto::{CodecId, MediaTransport, QualityProfile}; +use wzp_proto::traits::{AudioDecoder, QualityController}; +use wzp_proto::{AdaptiveQualityController, CodecId, MediaTransport, QualityProfile}; const FRAME_SAMPLES_40MS: usize = 1920; +/// Profile index mapping for the AtomicU8 adaptive-quality bridge. +const PROFILE_NO_CHANGE: u8 = 0xFF; + +fn profile_to_index(p: &QualityProfile) -> u8 { + match p.codec { + CodecId::Opus64k => 0, + CodecId::Opus48k => 1, + CodecId::Opus32k => 2, + CodecId::Opus24k => 3, + CodecId::Opus6k => 4, + CodecId::Codec2_1200 => 5, + _ => 3, // default to GOOD + } +} + +fn index_to_profile(idx: u8) -> Option { + match idx { + 0 => Some(QualityProfile::STUDIO_64K), + 1 => Some(QualityProfile::STUDIO_48K), + 2 => Some(QualityProfile::STUDIO_32K), + 3 => Some(QualityProfile::GOOD), + 4 => Some(QualityProfile::DEGRADED), + 5 => Some(QualityProfile::CATASTROPHIC), + _ => None, + } +} + /// Resolve a quality string from the UI to a QualityProfile. /// Returns None for "auto" (use default adaptive behavior). fn resolve_quality(quality: &str) -> Option { @@ -480,6 +507,10 @@ impl CallEngine { let tx_codec = Arc::new(Mutex::new(String::new())); let rx_codec = Arc::new(Mutex::new(String::new())); + // Adaptive quality: shared pending-profile bridge between recv → send. + let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE)); + let auto_profile = resolve_quality(&quality).is_none(); + // Send task — drain Oboe capture ring, Opus-encode, push to transport. let send_t = transport.clone(); let send_r = running.clone(); @@ -492,6 +523,7 @@ impl CallEngine { let send_tx_codec = tx_codec.clone(); let send_t0 = call_t0; let send_app = app.clone(); + let send_pending_profile = pending_profile.clone(); tokio::spawn(async move { let profile = resolve_quality(&send_quality); let config = match profile { @@ -609,6 +641,20 @@ impl CallEngine { Err(e) => error!("encode: {e}"), } + // Adaptive quality: check if recv task recommended a profile switch. + if auto_profile { + let p = send_pending_profile.swap(PROFILE_NO_CHANGE, Ordering::Acquire); + if p != PROFILE_NO_CHANGE { + if let Some(new_profile) = index_to_profile(p) { + info!(to = ?new_profile.codec, "auto: switching encoder profile"); + if encoder.set_profile(new_profile).is_ok() { + dred_tuner.set_codec(new_profile.codec); + *send_tx_codec.lock().await = format!("{:?}", new_profile.codec); + } + } + } + } + // DRED tuner: poll quinn path stats periodically and // adjust encoder DRED duration + expected-loss hint. frames_since_dred_poll += 1; @@ -682,6 +728,7 @@ impl CallEngine { let recv_rx_codec = rx_codec.clone(); let recv_t0 = call_t0; let recv_app = app.clone(); + let pending_profile_recv = pending_profile.clone(); tokio::spawn(async move { let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD); // Phase 3b/3c: use concrete AdaptiveDecoder (not Box { last_decode_n = n; @@ -1255,6 +1312,10 @@ impl CallEngine { let tx_codec = Arc::new(Mutex::new(String::new())); let rx_codec = Arc::new(Mutex::new(String::new())); + // Adaptive quality: shared pending-profile bridge between recv → send. + let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE)); + let auto_profile = resolve_quality(&quality).is_none(); + // Send task let send_t = transport.clone(); let send_r = running.clone(); @@ -1264,6 +1325,7 @@ impl CallEngine { let send_drops = Arc::new(AtomicU64::new(0)); let send_quality = quality.clone(); let send_tx_codec = tx_codec.clone(); + let send_pending_profile = pending_profile.clone(); tokio::spawn(async move { let profile = resolve_quality(&send_quality); let config = match profile { @@ -1326,6 +1388,20 @@ impl CallEngine { Err(e) => error!("encode: {e}"), } + // Adaptive quality: check if recv task recommended a profile switch. + if auto_profile { + let p = send_pending_profile.swap(PROFILE_NO_CHANGE, Ordering::Acquire); + if p != PROFILE_NO_CHANGE { + if let Some(new_profile) = index_to_profile(p) { + info!(to = ?new_profile.codec, "auto: switching encoder profile"); + if encoder.set_profile(new_profile).is_ok() { + dred_tuner.set_codec(new_profile.codec); + *send_tx_codec.lock().await = format!("{:?}", new_profile.codec); + } + } + } + } + // DRED tuner: poll quinn path stats periodically. frames_since_dred_poll += 1; if frames_since_dred_poll >= DRED_POLL_INTERVAL { @@ -1349,6 +1425,7 @@ impl CallEngine { let recv_spk = spk_muted.clone(); let recv_fr = frames_received.clone(); let recv_rx_codec = rx_codec.clone(); + let pending_profile_recv = pending_profile.clone(); tokio::spawn(async move { let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD); // Phase 3b/3c: concrete AdaptiveDecoder (not Box) so we @@ -1361,6 +1438,7 @@ impl CallEngine { let mut agc = wzp_codec::AutoGainControl::new(); let mut pcm = vec![0i16; FRAME_SAMPLES_40MS]; // big enough for any codec let mut dred_recv = DredRecvState::new(); + let mut quality_ctrl = AdaptiveQualityController::new(); loop { if !recv_r.load(Ordering::Relaxed) { @@ -1425,6 +1503,15 @@ impl CallEngine { ); } + // Adaptive quality: ingest quality reports from peer + if let Some(ref qr) = pkt.quality_report { + if let Some(new_profile) = quality_ctrl.observe(qr) { + let idx = profile_to_index(&new_profile); + info!(to = ?new_profile.codec, "auto: quality adapter recommends switch"); + pending_profile_recv.store(idx, Ordering::Release); + } + } + if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) { agc.process_frame(&mut pcm[..n]); if !recv_spk.load(Ordering::Relaxed) {