diff --git a/crates/wzp-client/src/call.rs b/crates/wzp-client/src/call.rs index 4a7ec53..3a937f5 100644 --- a/crates/wzp-client/src/call.rs +++ b/crates/wzp-client/src/call.rs @@ -12,6 +12,7 @@ use wzp_proto::quality::AdaptiveQualityController; use wzp_proto::traits::{ AudioDecoder, AudioEncoder, FecDecoder, FecEncoder, }; +use wzp_proto::packet::QualityReport; use wzp_proto::QualityProfile; /// Configuration for a call session. @@ -37,6 +38,129 @@ impl Default for CallConfig { } } +impl CallConfig { + /// Build a `CallConfig` tuned for the given quality profile. + pub fn from_profile(profile: QualityProfile) -> Self { + let (jitter_target, jitter_max, jitter_min) = if profile == QualityProfile::CATASTROPHIC { + // Catastrophic: larger jitter buffer to absorb spikes + (20, 500, 8) + } else if profile == QualityProfile::DEGRADED { + // Degraded: moderately deeper buffer + (15, 350, 5) + } else { + // Good: low-latency defaults + (10, 250, 3) + }; + Self { + profile, + jitter_target, + jitter_max, + jitter_min, + } + } +} + +/// Sliding-window quality adapter that reacts to relay `QualityReport`s. +/// +/// Thresholds (per-report): +/// - loss > 15% OR rtt > 200ms => CATASTROPHIC +/// - loss > 5% OR rtt > 100ms => DEGRADED +/// - otherwise => GOOD +/// +/// Hysteresis: a profile switch is only recommended after the new profile +/// has been the recommendation for 3 or more consecutive reports. +pub struct QualityAdapter { + /// Sliding window of the last N reports. + window: std::collections::VecDeque, + /// Maximum window size. + max_window: usize, + /// Number of consecutive reports recommending the same (non-current) profile. + consecutive_same: u32, + /// The profile that the last `consecutive_same` reports recommended. + pending_profile: Option, +} + +/// Number of consecutive reports required before accepting a switch. +const HYSTERESIS_COUNT: u32 = 3; +/// Default sliding window capacity. +const ADAPTER_WINDOW: usize = 10; + +impl QualityAdapter { + pub fn new() -> Self { + Self { + window: std::collections::VecDeque::with_capacity(ADAPTER_WINDOW), + max_window: ADAPTER_WINDOW, + consecutive_same: 0, + pending_profile: None, + } + } + + /// Record a new quality report from the relay. + pub fn ingest(&mut self, report: &QualityReport) { + if self.window.len() >= self.max_window { + self.window.pop_front(); + } + self.window.push_back(*report); + } + + /// Classify a single report into a recommended profile. + fn classify(report: &QualityReport) -> QualityProfile { + let loss = report.loss_percent(); + let rtt = report.rtt_ms(); + + if loss > 15.0 || rtt > 200 { + QualityProfile::CATASTROPHIC + } else if loss > 5.0 || rtt > 100 { + QualityProfile::DEGRADED + } else { + QualityProfile::GOOD + } + } + + /// Return the best profile based on the most recent report in the window. + pub fn recommended_profile(&self) -> QualityProfile { + match self.window.back() { + Some(report) => Self::classify(report), + None => QualityProfile::GOOD, + } + } + + /// Determine if a profile switch should happen, applying hysteresis. + /// + /// Returns `Some(new_profile)` only when the recommendation has differed + /// from `current` for at least `HYSTERESIS_COUNT` consecutive reports. + pub fn should_switch(&mut self, current: &QualityProfile) -> Option { + let recommended = self.recommended_profile(); + + if recommended == *current { + // Conditions match current profile — reset pending state. + self.consecutive_same = 0; + self.pending_profile = None; + return None; + } + + // Recommended differs from current. + match self.pending_profile { + Some(pending) if pending == recommended => { + self.consecutive_same += 1; + } + _ => { + // New or changed recommendation — restart counter. + self.pending_profile = Some(recommended); + self.consecutive_same = 1; + } + } + + if self.consecutive_same >= HYSTERESIS_COUNT { + self.consecutive_same = 0; + self.pending_profile = None; + Some(recommended) + } else { + None + } + } +} + /// Manages the encode/send side of a call. pub struct CallEncoder { /// Audio encoder (Opus or Codec2). @@ -301,4 +425,137 @@ mod tests { let mut pcm = vec![0i16; 960]; assert!(dec.decode_next(&mut pcm).is_none()); } + + // ---- QualityAdapter tests ---- + + /// Helper: build a QualityReport from human-readable loss% and RTT ms. + fn make_report(loss_pct_f: f32, rtt_ms: u16) -> QualityReport { + QualityReport { + loss_pct: (loss_pct_f / 100.0 * 255.0) as u8, + rtt_4ms: (rtt_ms / 4) as u8, + jitter_ms: 10, + bitrate_cap_kbps: 200, + } + } + + #[test] + fn good_conditions_stays_good() { + let mut adapter = QualityAdapter::new(); + let good = make_report(1.0, 40); + for _ in 0..10 { + adapter.ingest(&good); + } + assert_eq!(adapter.recommended_profile(), QualityProfile::GOOD); + + let current = QualityProfile::GOOD; + for _ in 0..10 { + adapter.ingest(&good); + assert!(adapter.should_switch(¤t).is_none()); + } + } + + #[test] + fn high_loss_degrades() { + let mut adapter = QualityAdapter::new(); + // 8% loss, low RTT => DEGRADED + let degraded = make_report(8.0, 40); + let mut current = QualityProfile::GOOD; + + // Feed 3 consecutive degraded reports to pass hysteresis + for _ in 0..3 { + adapter.ingest(°raded); + if let Some(new) = adapter.should_switch(¤t) { + current = new; + } + } + assert_eq!(current, QualityProfile::DEGRADED); + } + + #[test] + fn catastrophic_conditions() { + let mut adapter = QualityAdapter::new(); + // 20% loss => CATASTROPHIC + let terrible = make_report(20.0, 50); + let mut current = QualityProfile::GOOD; + + for _ in 0..3 { + adapter.ingest(&terrible); + if let Some(new) = adapter.should_switch(¤t) { + current = new; + } + } + assert_eq!(current, QualityProfile::CATASTROPHIC); + + // Also test via high RTT alone (250ms > 200ms threshold) + let mut adapter2 = QualityAdapter::new(); + let high_rtt = make_report(1.0, 252); // rtt_4ms rounds to 63 => 252ms + let mut current2 = QualityProfile::GOOD; + + for _ in 0..3 { + adapter2.ingest(&high_rtt); + if let Some(new) = adapter2.should_switch(¤t2) { + current2 = new; + } + } + assert_eq!(current2, QualityProfile::CATASTROPHIC); + } + + #[test] + fn hysteresis_prevents_flapping() { + let mut adapter = QualityAdapter::new(); + let good = make_report(1.0, 40); + let bad = make_report(8.0, 40); // DEGRADED + let current = QualityProfile::GOOD; + + // Alternate good/bad — should never trigger a switch because + // we never get 3 consecutive same-recommendation reports. + for _ in 0..20 { + adapter.ingest(&bad); + assert!(adapter.should_switch(¤t).is_none()); + adapter.ingest(&good); + assert!(adapter.should_switch(¤t).is_none()); + } + assert_eq!(current, QualityProfile::GOOD); + } + + #[test] + fn recovery_to_good() { + let mut adapter = QualityAdapter::new(); + let bad = make_report(20.0, 50); + let good = make_report(1.0, 40); + + // Drive to CATASTROPHIC first + let mut current = QualityProfile::GOOD; + for _ in 0..3 { + adapter.ingest(&bad); + if let Some(new) = adapter.should_switch(¤t) { + current = new; + } + } + assert_eq!(current, QualityProfile::CATASTROPHIC); + + // Now feed good reports — should recover to GOOD after 3 consecutive + for _ in 0..3 { + adapter.ingest(&good); + if let Some(new) = adapter.should_switch(¤t) { + current = new; + } + } + assert_eq!(current, QualityProfile::GOOD); + } + + #[test] + fn call_config_from_profile() { + let good = CallConfig::from_profile(QualityProfile::GOOD); + assert_eq!(good.profile, QualityProfile::GOOD); + assert_eq!(good.jitter_min, 3); + + let degraded = CallConfig::from_profile(QualityProfile::DEGRADED); + assert_eq!(degraded.profile, QualityProfile::DEGRADED); + assert!(degraded.jitter_target > good.jitter_target); + + let catastrophic = CallConfig::from_profile(QualityProfile::CATASTROPHIC); + assert_eq!(catastrophic.profile, QualityProfile::CATASTROPHIC); + assert!(catastrophic.jitter_max > degraded.jitter_max); + } } diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index 45112c7..fdbfd0a 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -17,4 +17,4 @@ pub mod session_mgr; pub use config::RelayConfig; pub use handshake::accept_handshake; pub use pipeline::{PipelineConfig, PipelineStats, RelayPipeline}; -pub use session_mgr::{RelaySession, SessionId, SessionManager}; +pub use session_mgr::{RelaySession, SessionId, SessionInfo, SessionManager, SessionState}; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 5026a85..9b65272 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -19,6 +19,7 @@ use wzp_proto::MediaTransport; use wzp_relay::config::RelayConfig; use wzp_relay::pipeline::{PipelineConfig, RelayPipeline}; use wzp_relay::room::{self, RoomManager}; +use wzp_relay::session_mgr::SessionManager; fn parse_args() -> RelayConfig { let mut config = RelayConfig::default(); @@ -163,6 +164,9 @@ async fn main() -> anyhow::Result<()> { // Room manager (room mode only) let room_mgr = Arc::new(Mutex::new(RoomManager::new())); + // Session manager — enforces max concurrent sessions + let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); + if let Some(ref url) = config.auth_url { info!(url, "auth enabled — clients must present featherChat token"); } else { @@ -179,6 +183,7 @@ async fn main() -> anyhow::Result<()> { let remote_transport = remote_transport.clone(); let room_mgr = room_mgr.clone(); + let session_mgr = session_mgr.clone(); let auth_url = config.auth_url.clone(); let relay_seed_bytes = relay_seed.0; @@ -284,13 +289,28 @@ async fn main() -> anyhow::Result<()> { stats_handle.abort(); transport.close().await.ok(); } else { - // Room mode — join room and forward to all others + // Room mode — enforce max sessions, then join room + let session_id = { + let mut smgr = session_mgr.lock().await; + match smgr.create_session(&room_name, authenticated_fp.clone()) { + Ok(id) => id, + Err(e) => { + error!(%addr, room = %room_name, "session rejected: {e}"); + transport.close().await.ok(); + return; + } + } + }; + let participant_id = { let mut mgr = room_mgr.lock().await; match mgr.join(&room_name, addr, transport.clone(), authenticated_fp.as_deref()) { Ok(id) => id, Err(e) => { error!(%addr, room = %room_name, "room join denied: {e}"); + // Clean up the session we just created + let mut smgr = session_mgr.lock().await; + smgr.remove_session(session_id); transport.close().await.ok(); return; } @@ -304,6 +324,12 @@ async fn main() -> anyhow::Result<()> { transport.clone(), ).await; + // Participant disconnected — clean up session + { + let mut smgr = session_mgr.lock().await; + smgr.remove_session(session_id); + } + transport.close().await.ok(); } }); diff --git a/crates/wzp-relay/src/session_mgr.rs b/crates/wzp-relay/src/session_mgr.rs index e5e059b..e9f07b9 100644 --- a/crates/wzp-relay/src/session_mgr.rs +++ b/crates/wzp-relay/src/session_mgr.rs @@ -1,6 +1,7 @@ //! Session manager — tracks active call sessions on the relay. use std::collections::HashMap; +use std::time::Instant; use wzp_proto::{QualityProfile, Session}; @@ -9,6 +10,26 @@ use crate::pipeline::{PipelineConfig, RelayPipeline}; /// Unique identifier for a relay session. pub type SessionId = [u8; 16]; +/// Lifecycle state of a concurrent session. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SessionState { + Active, + Closing, +} + +/// Lightweight metadata for a concurrent session (room-mode tracking). +#[derive(Debug, Clone)] +pub struct SessionInfo { + /// Which room this session belongs to. + pub room_name: String, + /// Client fingerprint (present when auth is enabled). + pub fingerprint: Option, + /// When the session was created. + pub connected_at: Instant, + /// Current lifecycle state. + pub state: SessionState, +} + /// A single active call session on the relay. pub struct RelaySession { /// Protocol session state machine. @@ -47,8 +68,14 @@ impl RelaySession { } /// Manages all active sessions on a relay. +/// +/// Combines two layers of tracking: +/// - `sessions`: heavy `RelaySession` objects (pipeline state machines, used in forward mode) +/// - `tracked`: lightweight `SessionInfo` entries (room + fingerprint, used in room mode to +/// enforce `max_sessions` and answer lifecycle queries) pub struct SessionManager { sessions: HashMap, + tracked: HashMap, max_sessions: usize, } @@ -56,17 +83,20 @@ impl SessionManager { pub fn new(max_sessions: usize) -> Self { Self { sessions: HashMap::new(), + tracked: HashMap::new(), max_sessions, } } - /// Create a new session. Returns None if at capacity. - pub fn create_session( + // ── Heavy session API (forward-mode pipelines) ────────────────────── + + /// Create a new pipeline session. Returns None if at capacity. + pub fn create_pipeline_session( &mut self, session_id: SessionId, config: PipelineConfig, ) -> Option<&mut RelaySession> { - if self.sessions.len() >= self.max_sessions { + if self.total_count() >= self.max_sessions { return None; } self.sessions @@ -75,53 +105,124 @@ impl SessionManager { self.sessions.get_mut(&session_id) } - /// Get a session by ID. + /// Get a pipeline session by ID. pub fn get_session(&mut self, id: &SessionId) -> Option<&mut RelaySession> { self.sessions.get_mut(id) } - /// Remove a session. - pub fn remove_session(&mut self, id: &SessionId) -> Option { + /// Remove a pipeline session. + pub fn remove_pipeline_session(&mut self, id: &SessionId) -> Option { self.sessions.remove(id) } - /// Number of active sessions. - pub fn active_count(&self) -> usize { + /// Number of active pipeline sessions. + pub fn pipeline_active_count(&self) -> usize { self.sessions.values().filter(|s| s.is_active()).count() } - /// Total sessions (including inactive/closing). - pub fn total_count(&self) -> usize { + /// Total pipeline sessions (including inactive/closing). + pub fn pipeline_total_count(&self) -> usize { self.sessions.len() } - /// Remove sessions idle for longer than `timeout_ms`. + /// Remove pipeline sessions idle for longer than `timeout_ms`. pub fn expire_idle(&mut self, now_ms: u64, timeout_ms: u64) -> usize { let before = self.sessions.len(); self.sessions .retain(|_, s| now_ms.saturating_sub(s.last_activity_ms) < timeout_ms); before - self.sessions.len() } + + // ── Lightweight concurrent-session API (room mode) ────────────────── + + /// Register a new concurrent session. + /// Returns the `SessionId` on success, or an error string if `max_sessions` is exceeded. + pub fn create_session( + &mut self, + room: &str, + fingerprint: Option, + ) -> Result { + if self.total_count() >= self.max_sessions { + return Err(format!( + "max sessions ({}) exceeded", + self.max_sessions + )); + } + let id = rand_session_id(); + self.tracked.insert(id, SessionInfo { + room_name: room.to_string(), + fingerprint, + connected_at: Instant::now(), + state: SessionState::Active, + }); + Ok(id) + } + + /// Remove a tracked session. + pub fn remove_session(&mut self, id: SessionId) { + self.tracked.remove(&id); + } + + /// Number of currently tracked (room-mode) sessions. + pub fn active_count(&self) -> usize { + self.tracked.values().filter(|s| s.state == SessionState::Active).count() + } + + /// Return all session IDs that belong to a given room. + pub fn sessions_in_room(&self, room: &str) -> Vec { + self.tracked + .iter() + .filter(|(_, info)| info.room_name == room) + .map(|(id, _)| *id) + .collect() + } + + /// Get metadata for a tracked session. + pub fn session_info(&self, id: SessionId) -> Option<&SessionInfo> { + self.tracked.get(&id) + } + + /// Total sessions across both tracking layers. + pub fn total_count(&self) -> usize { + self.sessions.len() + self.tracked.len() + } +} + +/// Generate a random 16-byte session identifier. +fn rand_session_id() -> SessionId { + let mut id = [0u8; 16]; + // Use a simple monotonic + random source to avoid pulling in `rand` crate. + // Hash the instant + a counter for uniqueness. + use std::sync::atomic::{AtomicU64, Ordering}; + static CTR: AtomicU64 = AtomicU64::new(1); + let ctr = CTR.fetch_add(1, Ordering::Relaxed); + let bytes = ctr.to_le_bytes(); + id[..8].copy_from_slice(&bytes); + // Mix in some time-based entropy for the upper half. + let t = Instant::now().elapsed().as_nanos() as u64; + id[8..16].copy_from_slice(&t.to_le_bytes()); + id } #[cfg(test)] mod tests { use super::*; + // ── Pipeline session tests (pre-existing, adapted to renamed API) ─── + #[test] - fn create_and_get_session() { + fn create_and_get_pipeline_session() { let mut mgr = SessionManager::new(10); let id = [1u8; 16]; - mgr.create_session(id, PipelineConfig::default()); - assert_eq!(mgr.total_count(), 1); + mgr.create_pipeline_session(id, PipelineConfig::default()); assert!(mgr.get_session(&id).is_some()); } #[test] - fn respects_max_sessions() { + fn respects_max_pipeline_sessions() { let mut mgr = SessionManager::new(1); - mgr.create_session([1u8; 16], PipelineConfig::default()); - let result = mgr.create_session([2u8; 16], PipelineConfig::default()); + mgr.create_pipeline_session([1u8; 16], PipelineConfig::default()); + let result = mgr.create_pipeline_session([2u8; 16], PipelineConfig::default()); assert!(result.is_none()); } @@ -129,10 +230,73 @@ mod tests { fn expire_idle_removes_old() { let mut mgr = SessionManager::new(10); let id = [1u8; 16]; - mgr.create_session(id, PipelineConfig::default()); - // Session has last_activity_ms = 0, current time = 60000, timeout = 30000 + mgr.create_pipeline_session(id, PipelineConfig::default()); let expired = mgr.expire_idle(60_000, 30_000); assert_eq!(expired, 1); - assert_eq!(mgr.total_count(), 0); + assert_eq!(mgr.pipeline_total_count(), 0); + } + + // ── Concurrent session (room-mode) tests ──────────────────────────── + + #[test] + fn create_and_remove() { + let mut mgr = SessionManager::new(10); + let id = mgr.create_session("room-a", Some("fp123".into())).unwrap(); + assert_eq!(mgr.active_count(), 1); + mgr.remove_session(id); + assert_eq!(mgr.active_count(), 0); + } + + #[test] + fn max_sessions_enforced() { + let mut mgr = SessionManager::new(2); + mgr.create_session("r1", None).unwrap(); + mgr.create_session("r2", None).unwrap(); + let err = mgr.create_session("r3", None); + assert!(err.is_err()); + assert!(err.unwrap_err().contains("max sessions")); + } + + #[test] + fn sessions_in_room_tracking() { + let mut mgr = SessionManager::new(10); + let a1 = mgr.create_session("alpha", None).unwrap(); + let _a2 = mgr.create_session("alpha", None).unwrap(); + let _b1 = mgr.create_session("beta", None).unwrap(); + + let alpha_ids = mgr.sessions_in_room("alpha"); + assert_eq!(alpha_ids.len(), 2); + assert!(alpha_ids.contains(&a1)); + + let beta_ids = mgr.sessions_in_room("beta"); + assert_eq!(beta_ids.len(), 1); + + let empty = mgr.sessions_in_room("gamma"); + assert!(empty.is_empty()); + } + + #[test] + fn session_info_returns_correct_data() { + let mut mgr = SessionManager::new(10); + let id = mgr.create_session("room-x", Some("alice-fp".into())).unwrap(); + + let info = mgr.session_info(id).expect("session should exist"); + assert_eq!(info.room_name, "room-x"); + assert_eq!(info.fingerprint.as_deref(), Some("alice-fp")); + assert_eq!(info.state, SessionState::Active); + + // Non-existent session returns None + assert!(mgr.session_info([0xFFu8; 16]).is_none()); + } + + #[test] + fn max_sessions_shared_across_both_layers() { + let mut mgr = SessionManager::new(2); + // One pipeline session + one tracked session = 2 = at capacity + mgr.create_pipeline_session([1u8; 16], PipelineConfig::default()); + mgr.create_session("room", None).unwrap(); + // Both layers should now reject + assert!(mgr.create_session("room", None).is_err()); + assert!(mgr.create_pipeline_session([2u8; 16], PipelineConfig::default()).is_none()); } } diff --git a/crates/wzp-web/static/audio-processor.js b/crates/wzp-web/static/audio-processor.js index b1fcaab..fab8b1e 100644 --- a/crates/wzp-web/static/audio-processor.js +++ b/crates/wzp-web/static/audio-processor.js @@ -1,34 +1,51 @@ -// AudioWorklet processor for capturing microphone audio. -// Accumulates samples and posts 960-sample (20ms @ 48kHz) frames to the main thread. +// WarzonePhone AudioWorklet processors. +// Both capture and playback handle 960-sample frames (20ms @ 48kHz). +// AudioWorklet calls process() with 128-sample blocks, so we buffer internally. -class CaptureProcessor extends AudioWorkletProcessor { +const FRAME_SIZE = 960; + +class WZPCaptureProcessor extends AudioWorkletProcessor { constructor() { super(); - this.buffer = new Float32Array(0); + // Pre-allocate ring buffer large enough for several frames + this._ring = new Float32Array(FRAME_SIZE * 4); + this._writePos = 0; } - process(inputs, outputs, parameters) { + process(inputs, _outputs, _parameters) { const input = inputs[0]; if (!input || !input[0]) return true; - const samples = input[0]; // Float32Array, typically 128 samples + const samples = input[0]; // Float32Array, 128 samples typically + const len = samples.length; - // Accumulate - const newBuf = new Float32Array(this.buffer.length + samples.length); - newBuf.set(this.buffer); - newBuf.set(samples, this.buffer.length); - this.buffer = newBuf; + // Write into ring buffer + if (this._writePos + len > this._ring.length) { + // Should not happen with FRAME_SIZE * 4 capacity and timely draining, + // but handle gracefully by resizing + const bigger = new Float32Array(this._ring.length * 2); + bigger.set(this._ring.subarray(0, this._writePos)); + this._ring = bigger; + } + this._ring.set(samples, this._writePos); + this._writePos += len; - // Send complete 960-sample frames - while (this.buffer.length >= 960) { - const frame = this.buffer.slice(0, 960); - this.buffer = this.buffer.slice(960); - - // Convert to Int16 - const pcm = new Int16Array(960); - for (let i = 0; i < 960; i++) { - pcm[i] = Math.max(-32768, Math.min(32767, Math.round(frame[i] * 32767))); + // Drain complete 960-sample frames + while (this._writePos >= FRAME_SIZE) { + // Convert Float32 -> Int16 PCM + const pcm = new Int16Array(FRAME_SIZE); + for (let i = 0; i < FRAME_SIZE; i++) { + const s = this._ring[i]; + pcm[i] = s < -1 ? -32768 : s > 1 ? 32767 : (s * 32767) | 0; } + + // Shift remaining data forward + this._writePos -= FRAME_SIZE; + if (this._writePos > 0) { + this._ring.copyWithin(0, FRAME_SIZE, FRAME_SIZE + this._writePos); + } + + // Send the Int16 PCM buffer (1920 bytes) to the main thread this.port.postMessage(pcm.buffer, [pcm.buffer]); } @@ -36,4 +53,90 @@ class CaptureProcessor extends AudioWorkletProcessor { } } -registerProcessor('capture-processor', CaptureProcessor); +class WZPPlaybackProcessor extends AudioWorkletProcessor { + constructor() { + super(); + // Ring buffer for decoded Float32 samples ready for output + this._ring = new Float32Array(FRAME_SIZE * 8); + this._readPos = 0; + this._writePos = 0; + this._maxBuffered = FRAME_SIZE * 6; // ~120ms max to prevent drift + + this.port.onmessage = (e) => { + // Receive Int16 PCM from main thread, convert to Float32 + const pcm = new Int16Array(e.data); + const len = pcm.length; + + // Check capacity + let available = this._writePos - this._readPos; + if (available < 0) available += this._ring.length; + if (available + len > this._maxBuffered) { + // Too much buffered; drop oldest samples to prevent drift + this._readPos = this._writePos; + } + + // Ensure ring buffer is big enough + if (this._ring.length < len + available + 128) { + const bigger = new Float32Array(this._ring.length * 2); + // Copy existing data contiguously + if (this._readPos <= this._writePos) { + bigger.set(this._ring.subarray(this._readPos, this._writePos)); + } else { + const firstPart = this._ring.subarray(this._readPos); + const secondPart = this._ring.subarray(0, this._writePos); + bigger.set(firstPart); + bigger.set(secondPart, firstPart.length); + } + this._ring = bigger; + const count = available; + this._readPos = 0; + this._writePos = count; + } + + // Write converted samples into ring buffer linearly (simpler: use linear buffer) + for (let i = 0; i < len; i++) { + this._ring[this._writePos] = pcm[i] / 32768.0; + this._writePos++; + if (this._writePos >= this._ring.length) this._writePos = 0; + } + }; + } + + process(_inputs, outputs, _parameters) { + const output = outputs[0]; + if (!output || !output[0]) return true; + + const out = output[0]; // 128 samples typically + const needed = out.length; + + let available; + if (this._writePos >= this._readPos) { + available = this._writePos - this._readPos; + } else { + available = this._ring.length - this._readPos + this._writePos; + } + + if (available >= needed) { + for (let i = 0; i < needed; i++) { + out[i] = this._ring[this._readPos]; + this._readPos++; + if (this._readPos >= this._ring.length) this._readPos = 0; + } + } else { + // Output what we have, zero-fill the rest (underrun) + for (let i = 0; i < available; i++) { + out[i] = this._ring[this._readPos]; + this._readPos++; + if (this._readPos >= this._ring.length) this._readPos = 0; + } + for (let i = available; i < needed; i++) { + out[i] = 0; + } + } + + return true; + } +} + +registerProcessor('wzp-capture-processor', WZPCaptureProcessor); +registerProcessor('wzp-playback-processor', WZPPlaybackProcessor); diff --git a/crates/wzp-web/static/index.html b/crates/wzp-web/static/index.html index f86c6ee..383de72 100644 --- a/crates/wzp-web/static/index.html +++ b/crates/wzp-web/static/index.html @@ -165,16 +165,34 @@ function stopCall() { function cleanupAudio() { if (captureNode) { captureNode.disconnect(); captureNode = null; } if (playbackNode) { playbackNode.disconnect(); playbackNode = null; } - if (audioCtx) { audioCtx.close(); audioCtx = null; } + if (audioCtx) { audioCtx.close(); audioCtx = null; workletLoaded = false; } if (mediaStream) { mediaStream.getTracks().forEach(t => t.stop()); mediaStream = null; } } +let workletLoaded = false; + +async function loadWorkletModule() { + if (workletLoaded) return true; + if (typeof AudioWorkletNode === 'undefined' || !audioCtx.audioWorklet) { + console.warn('AudioWorklet API not supported in this browser — using ScriptProcessorNode fallback'); + return false; + } + try { + await audioCtx.audioWorklet.addModule('audio-processor.js'); + workletLoaded = true; + return true; + } catch(e) { + console.warn('AudioWorklet module failed to load — using ScriptProcessorNode fallback:', e); + return false; + } +} + async function startAudioCapture() { const source = audioCtx.createMediaStreamSource(mediaStream); + const hasWorklet = await loadWorkletModule(); - try { - await audioCtx.audioWorklet.addModule('audio-processor.js'); - captureNode = new AudioWorkletNode(audioCtx, 'capture-processor'); + if (hasWorklet) { + captureNode = new AudioWorkletNode(audioCtx, 'wzp-capture-processor'); captureNode.port.onmessage = (e) => { if (!active || !ws || ws.readyState !== WebSocket.OPEN || !transmitting) return; ws.send(e.data); @@ -188,10 +206,10 @@ async function startAudioCapture() { }; source.connect(captureNode); captureNode.connect(audioCtx.destination); // needed to keep worklet alive - } catch(e) { - // Fallback to ScriptProcessor if AudioWorklet not supported - console.warn('AudioWorklet not available, using ScriptProcessor fallback:', e); - captureNode = audioCtx.createScriptProcessor(1024, 1, 1); + } else { + // Fallback to ScriptProcessorNode (deprecated but widely supported) + console.warn('Capture: using ScriptProcessorNode fallback'); + captureNode = audioCtx.createScriptProcessor(4096, 1, 1); let acc = new Float32Array(0); captureNode.onaudioprocess = (ev) => { if (!active || !ws || ws.readyState !== WebSocket.OPEN || !transmitting) return; @@ -215,13 +233,14 @@ async function startAudioCapture() { } async function startAudioPlayback() { - try { - await audioCtx.audioWorklet.addModule('playback-processor.js'); - playbackNode = new AudioWorkletNode(audioCtx, 'playback-processor'); + const hasWorklet = await loadWorkletModule(); + + if (hasWorklet) { + playbackNode = new AudioWorkletNode(audioCtx, 'wzp-playback-processor'); playbackNode.connect(audioCtx.destination); - } catch(e) { - console.warn('AudioWorklet playback not available, using scheduled fallback'); - playbackNode = null; // will use createBufferSource fallback + } else { + console.warn('Playback: using scheduled BufferSource fallback'); + playbackNode = null; // will use createBufferSource fallback in playAudio() } } @@ -230,16 +249,15 @@ let nextPlayTime = 0; function playAudio(pcmInt16) { if (!audioCtx) return; - const floatData = new Float32Array(pcmInt16.length); - for (let i = 0; i < pcmInt16.length; i++) { - floatData[i] = pcmInt16[i] / 32768.0; - } - if (playbackNode && playbackNode.port) { - // AudioWorklet path — send float samples to the worklet - playbackNode.port.postMessage(floatData.buffer, [floatData.buffer]); + // AudioWorklet path — send Int16 PCM directly to the worklet for conversion + playbackNode.port.postMessage(pcmInt16.buffer, [pcmInt16.buffer]); } else { - // Fallback: scheduled BufferSource + // Fallback: scheduled BufferSource (convert Int16 -> Float32 on main thread) + const floatData = new Float32Array(pcmInt16.length); + for (let i = 0; i < pcmInt16.length; i++) { + floatData[i] = pcmInt16[i] / 32768.0; + } const buffer = audioCtx.createBuffer(1, floatData.length, SAMPLE_RATE); buffer.getChannelData(0).set(floatData); const source = audioCtx.createBufferSource(); diff --git a/crates/wzp-web/static/playback-processor.js b/crates/wzp-web/static/playback-processor.js deleted file mode 100644 index df72692..0000000 --- a/crates/wzp-web/static/playback-processor.js +++ /dev/null @@ -1,45 +0,0 @@ -// AudioWorklet processor for playing received audio. -// Receives PCM samples from the main thread and outputs them. - -class PlaybackProcessor extends AudioWorkletProcessor { - constructor() { - super(); - this.buffer = new Float32Array(0); - this.maxBuffered = 48000 / 5; // 200ms max - this.port.onmessage = (e) => { - const incoming = new Float32Array(e.data); - // Append - const newBuf = new Float32Array(this.buffer.length + incoming.length); - newBuf.set(this.buffer); - newBuf.set(incoming, this.buffer.length); - this.buffer = newBuf; - - // Cap buffer to prevent drift - if (this.buffer.length > this.maxBuffered) { - this.buffer = this.buffer.slice(this.buffer.length - this.maxBuffered); - } - }; - } - - process(inputs, outputs, parameters) { - const output = outputs[0]; - if (!output || !output[0]) return true; - - const out = output[0]; // 128 samples typically - - if (this.buffer.length >= out.length) { - out.set(this.buffer.subarray(0, out.length)); - this.buffer = this.buffer.slice(out.length); - } else if (this.buffer.length > 0) { - out.set(this.buffer); - for (let i = this.buffer.length; i < out.length; i++) out[i] = 0; - this.buffer = new Float32Array(0); - } else { - for (let i = 0; i < out.length; i++) out[i] = 0; - } - - return true; - } -} - -registerProcessor('playback-processor', PlaybackProcessor);