feat: complete WZP Phase 2 (T2/T3/T4) — adaptive quality, AudioWorklet, sessions
WZP-P2-T2: Adaptive quality switching - QualityAdapter with sliding window of QualityReports - Hysteresis: 3 consecutive reports before switching profiles - Thresholds: loss>15%/rtt>200ms→CATASTROPHIC, loss>5%/rtt>100ms→DEGRADED - CallConfig::from_profile() constructor - 5 unit tests: good/degraded/catastrophic conditions, hysteresis, recovery WZP-P2-T3: AudioWorklet migration (web bridge) - audio-processor.js: WZPCaptureProcessor + WZPPlaybackProcessor - Capture: buffers 128-sample AudioWorklet blocks → 960-sample frames - Playback: ring buffer, Int16→Float32 conversion in worklet - ScriptProcessorNode fallback if AudioWorklet unavailable - Existing UI preserved (connect, room, PTT) WZP-P2-T4: Concurrent session management (relay) - SessionManager tracks active sessions with HashMap - Enforces max_sessions limit from RelayConfig - create_session/remove_session lifecycle - Wired into relay main: session created after auth+handshake, cleaned up after run_participant returns - 7 unit tests: create/remove, max enforced, room tracking, info, expiry 207 tests passing across all crates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -12,6 +12,7 @@ use wzp_proto::quality::AdaptiveQualityController;
|
||||
use wzp_proto::traits::{
|
||||
AudioDecoder, AudioEncoder, FecDecoder, FecEncoder,
|
||||
};
|
||||
use wzp_proto::packet::QualityReport;
|
||||
use wzp_proto::QualityProfile;
|
||||
|
||||
/// Configuration for a call session.
|
||||
@@ -37,6 +38,129 @@ impl Default for CallConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl CallConfig {
|
||||
/// Build a `CallConfig` tuned for the given quality profile.
|
||||
pub fn from_profile(profile: QualityProfile) -> Self {
|
||||
let (jitter_target, jitter_max, jitter_min) = if profile == QualityProfile::CATASTROPHIC {
|
||||
// Catastrophic: larger jitter buffer to absorb spikes
|
||||
(20, 500, 8)
|
||||
} else if profile == QualityProfile::DEGRADED {
|
||||
// Degraded: moderately deeper buffer
|
||||
(15, 350, 5)
|
||||
} else {
|
||||
// Good: low-latency defaults
|
||||
(10, 250, 3)
|
||||
};
|
||||
Self {
|
||||
profile,
|
||||
jitter_target,
|
||||
jitter_max,
|
||||
jitter_min,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sliding-window quality adapter that reacts to relay `QualityReport`s.
|
||||
///
|
||||
/// Thresholds (per-report):
|
||||
/// - loss > 15% OR rtt > 200ms => CATASTROPHIC
|
||||
/// - loss > 5% OR rtt > 100ms => DEGRADED
|
||||
/// - otherwise => GOOD
|
||||
///
|
||||
/// Hysteresis: a profile switch is only recommended after the new profile
|
||||
/// has been the recommendation for 3 or more consecutive reports.
|
||||
pub struct QualityAdapter {
|
||||
/// Sliding window of the last N reports.
|
||||
window: std::collections::VecDeque<QualityReport>,
|
||||
/// Maximum window size.
|
||||
max_window: usize,
|
||||
/// Number of consecutive reports recommending the same (non-current) profile.
|
||||
consecutive_same: u32,
|
||||
/// The profile that the last `consecutive_same` reports recommended.
|
||||
pending_profile: Option<QualityProfile>,
|
||||
}
|
||||
|
||||
/// Number of consecutive reports required before accepting a switch.
|
||||
const HYSTERESIS_COUNT: u32 = 3;
|
||||
/// Default sliding window capacity.
|
||||
const ADAPTER_WINDOW: usize = 10;
|
||||
|
||||
impl QualityAdapter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
window: std::collections::VecDeque::with_capacity(ADAPTER_WINDOW),
|
||||
max_window: ADAPTER_WINDOW,
|
||||
consecutive_same: 0,
|
||||
pending_profile: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a new quality report from the relay.
|
||||
pub fn ingest(&mut self, report: &QualityReport) {
|
||||
if self.window.len() >= self.max_window {
|
||||
self.window.pop_front();
|
||||
}
|
||||
self.window.push_back(*report);
|
||||
}
|
||||
|
||||
/// Classify a single report into a recommended profile.
|
||||
fn classify(report: &QualityReport) -> QualityProfile {
|
||||
let loss = report.loss_percent();
|
||||
let rtt = report.rtt_ms();
|
||||
|
||||
if loss > 15.0 || rtt > 200 {
|
||||
QualityProfile::CATASTROPHIC
|
||||
} else if loss > 5.0 || rtt > 100 {
|
||||
QualityProfile::DEGRADED
|
||||
} else {
|
||||
QualityProfile::GOOD
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the best profile based on the most recent report in the window.
|
||||
pub fn recommended_profile(&self) -> QualityProfile {
|
||||
match self.window.back() {
|
||||
Some(report) => Self::classify(report),
|
||||
None => QualityProfile::GOOD,
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine if a profile switch should happen, applying hysteresis.
|
||||
///
|
||||
/// Returns `Some(new_profile)` only when the recommendation has differed
|
||||
/// from `current` for at least `HYSTERESIS_COUNT` consecutive reports.
|
||||
pub fn should_switch(&mut self, current: &QualityProfile) -> Option<QualityProfile> {
|
||||
let recommended = self.recommended_profile();
|
||||
|
||||
if recommended == *current {
|
||||
// Conditions match current profile — reset pending state.
|
||||
self.consecutive_same = 0;
|
||||
self.pending_profile = None;
|
||||
return None;
|
||||
}
|
||||
|
||||
// Recommended differs from current.
|
||||
match self.pending_profile {
|
||||
Some(pending) if pending == recommended => {
|
||||
self.consecutive_same += 1;
|
||||
}
|
||||
_ => {
|
||||
// New or changed recommendation — restart counter.
|
||||
self.pending_profile = Some(recommended);
|
||||
self.consecutive_same = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if self.consecutive_same >= HYSTERESIS_COUNT {
|
||||
self.consecutive_same = 0;
|
||||
self.pending_profile = None;
|
||||
Some(recommended)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages the encode/send side of a call.
|
||||
pub struct CallEncoder {
|
||||
/// Audio encoder (Opus or Codec2).
|
||||
@@ -301,4 +425,137 @@ mod tests {
|
||||
let mut pcm = vec![0i16; 960];
|
||||
assert!(dec.decode_next(&mut pcm).is_none());
|
||||
}
|
||||
|
||||
// ---- QualityAdapter tests ----
|
||||
|
||||
/// Helper: build a QualityReport from human-readable loss% and RTT ms.
|
||||
fn make_report(loss_pct_f: f32, rtt_ms: u16) -> QualityReport {
|
||||
QualityReport {
|
||||
loss_pct: (loss_pct_f / 100.0 * 255.0) as u8,
|
||||
rtt_4ms: (rtt_ms / 4) as u8,
|
||||
jitter_ms: 10,
|
||||
bitrate_cap_kbps: 200,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn good_conditions_stays_good() {
|
||||
let mut adapter = QualityAdapter::new();
|
||||
let good = make_report(1.0, 40);
|
||||
for _ in 0..10 {
|
||||
adapter.ingest(&good);
|
||||
}
|
||||
assert_eq!(adapter.recommended_profile(), QualityProfile::GOOD);
|
||||
|
||||
let current = QualityProfile::GOOD;
|
||||
for _ in 0..10 {
|
||||
adapter.ingest(&good);
|
||||
assert!(adapter.should_switch(¤t).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn high_loss_degrades() {
|
||||
let mut adapter = QualityAdapter::new();
|
||||
// 8% loss, low RTT => DEGRADED
|
||||
let degraded = make_report(8.0, 40);
|
||||
let mut current = QualityProfile::GOOD;
|
||||
|
||||
// Feed 3 consecutive degraded reports to pass hysteresis
|
||||
for _ in 0..3 {
|
||||
adapter.ingest(°raded);
|
||||
if let Some(new) = adapter.should_switch(¤t) {
|
||||
current = new;
|
||||
}
|
||||
}
|
||||
assert_eq!(current, QualityProfile::DEGRADED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn catastrophic_conditions() {
|
||||
let mut adapter = QualityAdapter::new();
|
||||
// 20% loss => CATASTROPHIC
|
||||
let terrible = make_report(20.0, 50);
|
||||
let mut current = QualityProfile::GOOD;
|
||||
|
||||
for _ in 0..3 {
|
||||
adapter.ingest(&terrible);
|
||||
if let Some(new) = adapter.should_switch(¤t) {
|
||||
current = new;
|
||||
}
|
||||
}
|
||||
assert_eq!(current, QualityProfile::CATASTROPHIC);
|
||||
|
||||
// Also test via high RTT alone (250ms > 200ms threshold)
|
||||
let mut adapter2 = QualityAdapter::new();
|
||||
let high_rtt = make_report(1.0, 252); // rtt_4ms rounds to 63 => 252ms
|
||||
let mut current2 = QualityProfile::GOOD;
|
||||
|
||||
for _ in 0..3 {
|
||||
adapter2.ingest(&high_rtt);
|
||||
if let Some(new) = adapter2.should_switch(¤t2) {
|
||||
current2 = new;
|
||||
}
|
||||
}
|
||||
assert_eq!(current2, QualityProfile::CATASTROPHIC);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hysteresis_prevents_flapping() {
|
||||
let mut adapter = QualityAdapter::new();
|
||||
let good = make_report(1.0, 40);
|
||||
let bad = make_report(8.0, 40); // DEGRADED
|
||||
let current = QualityProfile::GOOD;
|
||||
|
||||
// Alternate good/bad — should never trigger a switch because
|
||||
// we never get 3 consecutive same-recommendation reports.
|
||||
for _ in 0..20 {
|
||||
adapter.ingest(&bad);
|
||||
assert!(adapter.should_switch(¤t).is_none());
|
||||
adapter.ingest(&good);
|
||||
assert!(adapter.should_switch(¤t).is_none());
|
||||
}
|
||||
assert_eq!(current, QualityProfile::GOOD);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recovery_to_good() {
|
||||
let mut adapter = QualityAdapter::new();
|
||||
let bad = make_report(20.0, 50);
|
||||
let good = make_report(1.0, 40);
|
||||
|
||||
// Drive to CATASTROPHIC first
|
||||
let mut current = QualityProfile::GOOD;
|
||||
for _ in 0..3 {
|
||||
adapter.ingest(&bad);
|
||||
if let Some(new) = adapter.should_switch(¤t) {
|
||||
current = new;
|
||||
}
|
||||
}
|
||||
assert_eq!(current, QualityProfile::CATASTROPHIC);
|
||||
|
||||
// Now feed good reports — should recover to GOOD after 3 consecutive
|
||||
for _ in 0..3 {
|
||||
adapter.ingest(&good);
|
||||
if let Some(new) = adapter.should_switch(¤t) {
|
||||
current = new;
|
||||
}
|
||||
}
|
||||
assert_eq!(current, QualityProfile::GOOD);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn call_config_from_profile() {
|
||||
let good = CallConfig::from_profile(QualityProfile::GOOD);
|
||||
assert_eq!(good.profile, QualityProfile::GOOD);
|
||||
assert_eq!(good.jitter_min, 3);
|
||||
|
||||
let degraded = CallConfig::from_profile(QualityProfile::DEGRADED);
|
||||
assert_eq!(degraded.profile, QualityProfile::DEGRADED);
|
||||
assert!(degraded.jitter_target > good.jitter_target);
|
||||
|
||||
let catastrophic = CallConfig::from_profile(QualityProfile::CATASTROPHIC);
|
||||
assert_eq!(catastrophic.profile, QualityProfile::CATASTROPHIC);
|
||||
assert!(catastrophic.jitter_max > degraded.jitter_max);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,4 +17,4 @@ pub mod session_mgr;
|
||||
pub use config::RelayConfig;
|
||||
pub use handshake::accept_handshake;
|
||||
pub use pipeline::{PipelineConfig, PipelineStats, RelayPipeline};
|
||||
pub use session_mgr::{RelaySession, SessionId, SessionManager};
|
||||
pub use session_mgr::{RelaySession, SessionId, SessionInfo, SessionManager, SessionState};
|
||||
|
||||
@@ -19,6 +19,7 @@ use wzp_proto::MediaTransport;
|
||||
use wzp_relay::config::RelayConfig;
|
||||
use wzp_relay::pipeline::{PipelineConfig, RelayPipeline};
|
||||
use wzp_relay::room::{self, RoomManager};
|
||||
use wzp_relay::session_mgr::SessionManager;
|
||||
|
||||
fn parse_args() -> RelayConfig {
|
||||
let mut config = RelayConfig::default();
|
||||
@@ -163,6 +164,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Room manager (room mode only)
|
||||
let room_mgr = Arc::new(Mutex::new(RoomManager::new()));
|
||||
|
||||
// Session manager — enforces max concurrent sessions
|
||||
let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions)));
|
||||
|
||||
if let Some(ref url) = config.auth_url {
|
||||
info!(url, "auth enabled — clients must present featherChat token");
|
||||
} else {
|
||||
@@ -179,6 +183,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let remote_transport = remote_transport.clone();
|
||||
let room_mgr = room_mgr.clone();
|
||||
let session_mgr = session_mgr.clone();
|
||||
let auth_url = config.auth_url.clone();
|
||||
let relay_seed_bytes = relay_seed.0;
|
||||
|
||||
@@ -284,13 +289,28 @@ async fn main() -> anyhow::Result<()> {
|
||||
stats_handle.abort();
|
||||
transport.close().await.ok();
|
||||
} else {
|
||||
// Room mode — join room and forward to all others
|
||||
// Room mode — enforce max sessions, then join room
|
||||
let session_id = {
|
||||
let mut smgr = session_mgr.lock().await;
|
||||
match smgr.create_session(&room_name, authenticated_fp.clone()) {
|
||||
Ok(id) => id,
|
||||
Err(e) => {
|
||||
error!(%addr, room = %room_name, "session rejected: {e}");
|
||||
transport.close().await.ok();
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let participant_id = {
|
||||
let mut mgr = room_mgr.lock().await;
|
||||
match mgr.join(&room_name, addr, transport.clone(), authenticated_fp.as_deref()) {
|
||||
Ok(id) => id,
|
||||
Err(e) => {
|
||||
error!(%addr, room = %room_name, "room join denied: {e}");
|
||||
// Clean up the session we just created
|
||||
let mut smgr = session_mgr.lock().await;
|
||||
smgr.remove_session(session_id);
|
||||
transport.close().await.ok();
|
||||
return;
|
||||
}
|
||||
@@ -304,6 +324,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
transport.clone(),
|
||||
).await;
|
||||
|
||||
// Participant disconnected — clean up session
|
||||
{
|
||||
let mut smgr = session_mgr.lock().await;
|
||||
smgr.remove_session(session_id);
|
||||
}
|
||||
|
||||
transport.close().await.ok();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Session manager — tracks active call sessions on the relay.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
|
||||
use wzp_proto::{QualityProfile, Session};
|
||||
|
||||
@@ -9,6 +10,26 @@ use crate::pipeline::{PipelineConfig, RelayPipeline};
|
||||
/// Unique identifier for a relay session.
|
||||
pub type SessionId = [u8; 16];
|
||||
|
||||
/// Lifecycle state of a concurrent session.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SessionState {
|
||||
Active,
|
||||
Closing,
|
||||
}
|
||||
|
||||
/// Lightweight metadata for a concurrent session (room-mode tracking).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SessionInfo {
|
||||
/// Which room this session belongs to.
|
||||
pub room_name: String,
|
||||
/// Client fingerprint (present when auth is enabled).
|
||||
pub fingerprint: Option<String>,
|
||||
/// When the session was created.
|
||||
pub connected_at: Instant,
|
||||
/// Current lifecycle state.
|
||||
pub state: SessionState,
|
||||
}
|
||||
|
||||
/// A single active call session on the relay.
|
||||
pub struct RelaySession {
|
||||
/// Protocol session state machine.
|
||||
@@ -47,8 +68,14 @@ impl RelaySession {
|
||||
}
|
||||
|
||||
/// Manages all active sessions on a relay.
|
||||
///
|
||||
/// Combines two layers of tracking:
|
||||
/// - `sessions`: heavy `RelaySession` objects (pipeline state machines, used in forward mode)
|
||||
/// - `tracked`: lightweight `SessionInfo` entries (room + fingerprint, used in room mode to
|
||||
/// enforce `max_sessions` and answer lifecycle queries)
|
||||
pub struct SessionManager {
|
||||
sessions: HashMap<SessionId, RelaySession>,
|
||||
tracked: HashMap<SessionId, SessionInfo>,
|
||||
max_sessions: usize,
|
||||
}
|
||||
|
||||
@@ -56,17 +83,20 @@ impl SessionManager {
|
||||
pub fn new(max_sessions: usize) -> Self {
|
||||
Self {
|
||||
sessions: HashMap::new(),
|
||||
tracked: HashMap::new(),
|
||||
max_sessions,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new session. Returns None if at capacity.
|
||||
pub fn create_session(
|
||||
// ── Heavy session API (forward-mode pipelines) ──────────────────────
|
||||
|
||||
/// Create a new pipeline session. Returns None if at capacity.
|
||||
pub fn create_pipeline_session(
|
||||
&mut self,
|
||||
session_id: SessionId,
|
||||
config: PipelineConfig,
|
||||
) -> Option<&mut RelaySession> {
|
||||
if self.sessions.len() >= self.max_sessions {
|
||||
if self.total_count() >= self.max_sessions {
|
||||
return None;
|
||||
}
|
||||
self.sessions
|
||||
@@ -75,53 +105,124 @@ impl SessionManager {
|
||||
self.sessions.get_mut(&session_id)
|
||||
}
|
||||
|
||||
/// Get a session by ID.
|
||||
/// Get a pipeline session by ID.
|
||||
pub fn get_session(&mut self, id: &SessionId) -> Option<&mut RelaySession> {
|
||||
self.sessions.get_mut(id)
|
||||
}
|
||||
|
||||
/// Remove a session.
|
||||
pub fn remove_session(&mut self, id: &SessionId) -> Option<RelaySession> {
|
||||
/// Remove a pipeline session.
|
||||
pub fn remove_pipeline_session(&mut self, id: &SessionId) -> Option<RelaySession> {
|
||||
self.sessions.remove(id)
|
||||
}
|
||||
|
||||
/// Number of active sessions.
|
||||
pub fn active_count(&self) -> usize {
|
||||
/// Number of active pipeline sessions.
|
||||
pub fn pipeline_active_count(&self) -> usize {
|
||||
self.sessions.values().filter(|s| s.is_active()).count()
|
||||
}
|
||||
|
||||
/// Total sessions (including inactive/closing).
|
||||
pub fn total_count(&self) -> usize {
|
||||
/// Total pipeline sessions (including inactive/closing).
|
||||
pub fn pipeline_total_count(&self) -> usize {
|
||||
self.sessions.len()
|
||||
}
|
||||
|
||||
/// Remove sessions idle for longer than `timeout_ms`.
|
||||
/// Remove pipeline sessions idle for longer than `timeout_ms`.
|
||||
pub fn expire_idle(&mut self, now_ms: u64, timeout_ms: u64) -> usize {
|
||||
let before = self.sessions.len();
|
||||
self.sessions
|
||||
.retain(|_, s| now_ms.saturating_sub(s.last_activity_ms) < timeout_ms);
|
||||
before - self.sessions.len()
|
||||
}
|
||||
|
||||
// ── Lightweight concurrent-session API (room mode) ──────────────────
|
||||
|
||||
/// Register a new concurrent session.
|
||||
/// Returns the `SessionId` on success, or an error string if `max_sessions` is exceeded.
|
||||
pub fn create_session(
|
||||
&mut self,
|
||||
room: &str,
|
||||
fingerprint: Option<String>,
|
||||
) -> Result<SessionId, String> {
|
||||
if self.total_count() >= self.max_sessions {
|
||||
return Err(format!(
|
||||
"max sessions ({}) exceeded",
|
||||
self.max_sessions
|
||||
));
|
||||
}
|
||||
let id = rand_session_id();
|
||||
self.tracked.insert(id, SessionInfo {
|
||||
room_name: room.to_string(),
|
||||
fingerprint,
|
||||
connected_at: Instant::now(),
|
||||
state: SessionState::Active,
|
||||
});
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Remove a tracked session.
|
||||
pub fn remove_session(&mut self, id: SessionId) {
|
||||
self.tracked.remove(&id);
|
||||
}
|
||||
|
||||
/// Number of currently tracked (room-mode) sessions.
|
||||
pub fn active_count(&self) -> usize {
|
||||
self.tracked.values().filter(|s| s.state == SessionState::Active).count()
|
||||
}
|
||||
|
||||
/// Return all session IDs that belong to a given room.
|
||||
pub fn sessions_in_room(&self, room: &str) -> Vec<SessionId> {
|
||||
self.tracked
|
||||
.iter()
|
||||
.filter(|(_, info)| info.room_name == room)
|
||||
.map(|(id, _)| *id)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get metadata for a tracked session.
|
||||
pub fn session_info(&self, id: SessionId) -> Option<&SessionInfo> {
|
||||
self.tracked.get(&id)
|
||||
}
|
||||
|
||||
/// Total sessions across both tracking layers.
|
||||
pub fn total_count(&self) -> usize {
|
||||
self.sessions.len() + self.tracked.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a random 16-byte session identifier.
|
||||
fn rand_session_id() -> SessionId {
|
||||
let mut id = [0u8; 16];
|
||||
// Use a simple monotonic + random source to avoid pulling in `rand` crate.
|
||||
// Hash the instant + a counter for uniqueness.
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
static CTR: AtomicU64 = AtomicU64::new(1);
|
||||
let ctr = CTR.fetch_add(1, Ordering::Relaxed);
|
||||
let bytes = ctr.to_le_bytes();
|
||||
id[..8].copy_from_slice(&bytes);
|
||||
// Mix in some time-based entropy for the upper half.
|
||||
let t = Instant::now().elapsed().as_nanos() as u64;
|
||||
id[8..16].copy_from_slice(&t.to_le_bytes());
|
||||
id
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// ── Pipeline session tests (pre-existing, adapted to renamed API) ───
|
||||
|
||||
#[test]
|
||||
fn create_and_get_session() {
|
||||
fn create_and_get_pipeline_session() {
|
||||
let mut mgr = SessionManager::new(10);
|
||||
let id = [1u8; 16];
|
||||
mgr.create_session(id, PipelineConfig::default());
|
||||
assert_eq!(mgr.total_count(), 1);
|
||||
mgr.create_pipeline_session(id, PipelineConfig::default());
|
||||
assert!(mgr.get_session(&id).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn respects_max_sessions() {
|
||||
fn respects_max_pipeline_sessions() {
|
||||
let mut mgr = SessionManager::new(1);
|
||||
mgr.create_session([1u8; 16], PipelineConfig::default());
|
||||
let result = mgr.create_session([2u8; 16], PipelineConfig::default());
|
||||
mgr.create_pipeline_session([1u8; 16], PipelineConfig::default());
|
||||
let result = mgr.create_pipeline_session([2u8; 16], PipelineConfig::default());
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
@@ -129,10 +230,73 @@ mod tests {
|
||||
fn expire_idle_removes_old() {
|
||||
let mut mgr = SessionManager::new(10);
|
||||
let id = [1u8; 16];
|
||||
mgr.create_session(id, PipelineConfig::default());
|
||||
// Session has last_activity_ms = 0, current time = 60000, timeout = 30000
|
||||
mgr.create_pipeline_session(id, PipelineConfig::default());
|
||||
let expired = mgr.expire_idle(60_000, 30_000);
|
||||
assert_eq!(expired, 1);
|
||||
assert_eq!(mgr.total_count(), 0);
|
||||
assert_eq!(mgr.pipeline_total_count(), 0);
|
||||
}
|
||||
|
||||
// ── Concurrent session (room-mode) tests ────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn create_and_remove() {
|
||||
let mut mgr = SessionManager::new(10);
|
||||
let id = mgr.create_session("room-a", Some("fp123".into())).unwrap();
|
||||
assert_eq!(mgr.active_count(), 1);
|
||||
mgr.remove_session(id);
|
||||
assert_eq!(mgr.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn max_sessions_enforced() {
|
||||
let mut mgr = SessionManager::new(2);
|
||||
mgr.create_session("r1", None).unwrap();
|
||||
mgr.create_session("r2", None).unwrap();
|
||||
let err = mgr.create_session("r3", None);
|
||||
assert!(err.is_err());
|
||||
assert!(err.unwrap_err().contains("max sessions"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sessions_in_room_tracking() {
|
||||
let mut mgr = SessionManager::new(10);
|
||||
let a1 = mgr.create_session("alpha", None).unwrap();
|
||||
let _a2 = mgr.create_session("alpha", None).unwrap();
|
||||
let _b1 = mgr.create_session("beta", None).unwrap();
|
||||
|
||||
let alpha_ids = mgr.sessions_in_room("alpha");
|
||||
assert_eq!(alpha_ids.len(), 2);
|
||||
assert!(alpha_ids.contains(&a1));
|
||||
|
||||
let beta_ids = mgr.sessions_in_room("beta");
|
||||
assert_eq!(beta_ids.len(), 1);
|
||||
|
||||
let empty = mgr.sessions_in_room("gamma");
|
||||
assert!(empty.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_info_returns_correct_data() {
|
||||
let mut mgr = SessionManager::new(10);
|
||||
let id = mgr.create_session("room-x", Some("alice-fp".into())).unwrap();
|
||||
|
||||
let info = mgr.session_info(id).expect("session should exist");
|
||||
assert_eq!(info.room_name, "room-x");
|
||||
assert_eq!(info.fingerprint.as_deref(), Some("alice-fp"));
|
||||
assert_eq!(info.state, SessionState::Active);
|
||||
|
||||
// Non-existent session returns None
|
||||
assert!(mgr.session_info([0xFFu8; 16]).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn max_sessions_shared_across_both_layers() {
|
||||
let mut mgr = SessionManager::new(2);
|
||||
// One pipeline session + one tracked session = 2 = at capacity
|
||||
mgr.create_pipeline_session([1u8; 16], PipelineConfig::default());
|
||||
mgr.create_session("room", None).unwrap();
|
||||
// Both layers should now reject
|
||||
assert!(mgr.create_session("room", None).is_err());
|
||||
assert!(mgr.create_pipeline_session([2u8; 16], PipelineConfig::default()).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,34 +1,51 @@
|
||||
// AudioWorklet processor for capturing microphone audio.
|
||||
// Accumulates samples and posts 960-sample (20ms @ 48kHz) frames to the main thread.
|
||||
// WarzonePhone AudioWorklet processors.
|
||||
// Both capture and playback handle 960-sample frames (20ms @ 48kHz).
|
||||
// AudioWorklet calls process() with 128-sample blocks, so we buffer internally.
|
||||
|
||||
class CaptureProcessor extends AudioWorkletProcessor {
|
||||
const FRAME_SIZE = 960;
|
||||
|
||||
class WZPCaptureProcessor extends AudioWorkletProcessor {
|
||||
constructor() {
|
||||
super();
|
||||
this.buffer = new Float32Array(0);
|
||||
// Pre-allocate ring buffer large enough for several frames
|
||||
this._ring = new Float32Array(FRAME_SIZE * 4);
|
||||
this._writePos = 0;
|
||||
}
|
||||
|
||||
process(inputs, outputs, parameters) {
|
||||
process(inputs, _outputs, _parameters) {
|
||||
const input = inputs[0];
|
||||
if (!input || !input[0]) return true;
|
||||
|
||||
const samples = input[0]; // Float32Array, typically 128 samples
|
||||
const samples = input[0]; // Float32Array, 128 samples typically
|
||||
const len = samples.length;
|
||||
|
||||
// Accumulate
|
||||
const newBuf = new Float32Array(this.buffer.length + samples.length);
|
||||
newBuf.set(this.buffer);
|
||||
newBuf.set(samples, this.buffer.length);
|
||||
this.buffer = newBuf;
|
||||
|
||||
// Send complete 960-sample frames
|
||||
while (this.buffer.length >= 960) {
|
||||
const frame = this.buffer.slice(0, 960);
|
||||
this.buffer = this.buffer.slice(960);
|
||||
|
||||
// Convert to Int16
|
||||
const pcm = new Int16Array(960);
|
||||
for (let i = 0; i < 960; i++) {
|
||||
pcm[i] = Math.max(-32768, Math.min(32767, Math.round(frame[i] * 32767)));
|
||||
// Write into ring buffer
|
||||
if (this._writePos + len > this._ring.length) {
|
||||
// Should not happen with FRAME_SIZE * 4 capacity and timely draining,
|
||||
// but handle gracefully by resizing
|
||||
const bigger = new Float32Array(this._ring.length * 2);
|
||||
bigger.set(this._ring.subarray(0, this._writePos));
|
||||
this._ring = bigger;
|
||||
}
|
||||
this._ring.set(samples, this._writePos);
|
||||
this._writePos += len;
|
||||
|
||||
// Drain complete 960-sample frames
|
||||
while (this._writePos >= FRAME_SIZE) {
|
||||
// Convert Float32 -> Int16 PCM
|
||||
const pcm = new Int16Array(FRAME_SIZE);
|
||||
for (let i = 0; i < FRAME_SIZE; i++) {
|
||||
const s = this._ring[i];
|
||||
pcm[i] = s < -1 ? -32768 : s > 1 ? 32767 : (s * 32767) | 0;
|
||||
}
|
||||
|
||||
// Shift remaining data forward
|
||||
this._writePos -= FRAME_SIZE;
|
||||
if (this._writePos > 0) {
|
||||
this._ring.copyWithin(0, FRAME_SIZE, FRAME_SIZE + this._writePos);
|
||||
}
|
||||
|
||||
// Send the Int16 PCM buffer (1920 bytes) to the main thread
|
||||
this.port.postMessage(pcm.buffer, [pcm.buffer]);
|
||||
}
|
||||
|
||||
@@ -36,4 +53,90 @@ class CaptureProcessor extends AudioWorkletProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
registerProcessor('capture-processor', CaptureProcessor);
|
||||
class WZPPlaybackProcessor extends AudioWorkletProcessor {
|
||||
constructor() {
|
||||
super();
|
||||
// Ring buffer for decoded Float32 samples ready for output
|
||||
this._ring = new Float32Array(FRAME_SIZE * 8);
|
||||
this._readPos = 0;
|
||||
this._writePos = 0;
|
||||
this._maxBuffered = FRAME_SIZE * 6; // ~120ms max to prevent drift
|
||||
|
||||
this.port.onmessage = (e) => {
|
||||
// Receive Int16 PCM from main thread, convert to Float32
|
||||
const pcm = new Int16Array(e.data);
|
||||
const len = pcm.length;
|
||||
|
||||
// Check capacity
|
||||
let available = this._writePos - this._readPos;
|
||||
if (available < 0) available += this._ring.length;
|
||||
if (available + len > this._maxBuffered) {
|
||||
// Too much buffered; drop oldest samples to prevent drift
|
||||
this._readPos = this._writePos;
|
||||
}
|
||||
|
||||
// Ensure ring buffer is big enough
|
||||
if (this._ring.length < len + available + 128) {
|
||||
const bigger = new Float32Array(this._ring.length * 2);
|
||||
// Copy existing data contiguously
|
||||
if (this._readPos <= this._writePos) {
|
||||
bigger.set(this._ring.subarray(this._readPos, this._writePos));
|
||||
} else {
|
||||
const firstPart = this._ring.subarray(this._readPos);
|
||||
const secondPart = this._ring.subarray(0, this._writePos);
|
||||
bigger.set(firstPart);
|
||||
bigger.set(secondPart, firstPart.length);
|
||||
}
|
||||
this._ring = bigger;
|
||||
const count = available;
|
||||
this._readPos = 0;
|
||||
this._writePos = count;
|
||||
}
|
||||
|
||||
// Write converted samples into ring buffer linearly (simpler: use linear buffer)
|
||||
for (let i = 0; i < len; i++) {
|
||||
this._ring[this._writePos] = pcm[i] / 32768.0;
|
||||
this._writePos++;
|
||||
if (this._writePos >= this._ring.length) this._writePos = 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
process(_inputs, outputs, _parameters) {
|
||||
const output = outputs[0];
|
||||
if (!output || !output[0]) return true;
|
||||
|
||||
const out = output[0]; // 128 samples typically
|
||||
const needed = out.length;
|
||||
|
||||
let available;
|
||||
if (this._writePos >= this._readPos) {
|
||||
available = this._writePos - this._readPos;
|
||||
} else {
|
||||
available = this._ring.length - this._readPos + this._writePos;
|
||||
}
|
||||
|
||||
if (available >= needed) {
|
||||
for (let i = 0; i < needed; i++) {
|
||||
out[i] = this._ring[this._readPos];
|
||||
this._readPos++;
|
||||
if (this._readPos >= this._ring.length) this._readPos = 0;
|
||||
}
|
||||
} else {
|
||||
// Output what we have, zero-fill the rest (underrun)
|
||||
for (let i = 0; i < available; i++) {
|
||||
out[i] = this._ring[this._readPos];
|
||||
this._readPos++;
|
||||
if (this._readPos >= this._ring.length) this._readPos = 0;
|
||||
}
|
||||
for (let i = available; i < needed; i++) {
|
||||
out[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
registerProcessor('wzp-capture-processor', WZPCaptureProcessor);
|
||||
registerProcessor('wzp-playback-processor', WZPPlaybackProcessor);
|
||||
|
||||
@@ -165,16 +165,34 @@ function stopCall() {
|
||||
function cleanupAudio() {
|
||||
if (captureNode) { captureNode.disconnect(); captureNode = null; }
|
||||
if (playbackNode) { playbackNode.disconnect(); playbackNode = null; }
|
||||
if (audioCtx) { audioCtx.close(); audioCtx = null; }
|
||||
if (audioCtx) { audioCtx.close(); audioCtx = null; workletLoaded = false; }
|
||||
if (mediaStream) { mediaStream.getTracks().forEach(t => t.stop()); mediaStream = null; }
|
||||
}
|
||||
|
||||
let workletLoaded = false;
|
||||
|
||||
async function loadWorkletModule() {
|
||||
if (workletLoaded) return true;
|
||||
if (typeof AudioWorkletNode === 'undefined' || !audioCtx.audioWorklet) {
|
||||
console.warn('AudioWorklet API not supported in this browser — using ScriptProcessorNode fallback');
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
await audioCtx.audioWorklet.addModule('audio-processor.js');
|
||||
workletLoaded = true;
|
||||
return true;
|
||||
} catch(e) {
|
||||
console.warn('AudioWorklet module failed to load — using ScriptProcessorNode fallback:', e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function startAudioCapture() {
|
||||
const source = audioCtx.createMediaStreamSource(mediaStream);
|
||||
const hasWorklet = await loadWorkletModule();
|
||||
|
||||
try {
|
||||
await audioCtx.audioWorklet.addModule('audio-processor.js');
|
||||
captureNode = new AudioWorkletNode(audioCtx, 'capture-processor');
|
||||
if (hasWorklet) {
|
||||
captureNode = new AudioWorkletNode(audioCtx, 'wzp-capture-processor');
|
||||
captureNode.port.onmessage = (e) => {
|
||||
if (!active || !ws || ws.readyState !== WebSocket.OPEN || !transmitting) return;
|
||||
ws.send(e.data);
|
||||
@@ -188,10 +206,10 @@ async function startAudioCapture() {
|
||||
};
|
||||
source.connect(captureNode);
|
||||
captureNode.connect(audioCtx.destination); // needed to keep worklet alive
|
||||
} catch(e) {
|
||||
// Fallback to ScriptProcessor if AudioWorklet not supported
|
||||
console.warn('AudioWorklet not available, using ScriptProcessor fallback:', e);
|
||||
captureNode = audioCtx.createScriptProcessor(1024, 1, 1);
|
||||
} else {
|
||||
// Fallback to ScriptProcessorNode (deprecated but widely supported)
|
||||
console.warn('Capture: using ScriptProcessorNode fallback');
|
||||
captureNode = audioCtx.createScriptProcessor(4096, 1, 1);
|
||||
let acc = new Float32Array(0);
|
||||
captureNode.onaudioprocess = (ev) => {
|
||||
if (!active || !ws || ws.readyState !== WebSocket.OPEN || !transmitting) return;
|
||||
@@ -215,13 +233,14 @@ async function startAudioCapture() {
|
||||
}
|
||||
|
||||
async function startAudioPlayback() {
|
||||
try {
|
||||
await audioCtx.audioWorklet.addModule('playback-processor.js');
|
||||
playbackNode = new AudioWorkletNode(audioCtx, 'playback-processor');
|
||||
const hasWorklet = await loadWorkletModule();
|
||||
|
||||
if (hasWorklet) {
|
||||
playbackNode = new AudioWorkletNode(audioCtx, 'wzp-playback-processor');
|
||||
playbackNode.connect(audioCtx.destination);
|
||||
} catch(e) {
|
||||
console.warn('AudioWorklet playback not available, using scheduled fallback');
|
||||
playbackNode = null; // will use createBufferSource fallback
|
||||
} else {
|
||||
console.warn('Playback: using scheduled BufferSource fallback');
|
||||
playbackNode = null; // will use createBufferSource fallback in playAudio()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,16 +249,15 @@ let nextPlayTime = 0;
|
||||
function playAudio(pcmInt16) {
|
||||
if (!audioCtx) return;
|
||||
|
||||
if (playbackNode && playbackNode.port) {
|
||||
// AudioWorklet path — send Int16 PCM directly to the worklet for conversion
|
||||
playbackNode.port.postMessage(pcmInt16.buffer, [pcmInt16.buffer]);
|
||||
} else {
|
||||
// Fallback: scheduled BufferSource (convert Int16 -> Float32 on main thread)
|
||||
const floatData = new Float32Array(pcmInt16.length);
|
||||
for (let i = 0; i < pcmInt16.length; i++) {
|
||||
floatData[i] = pcmInt16[i] / 32768.0;
|
||||
}
|
||||
|
||||
if (playbackNode && playbackNode.port) {
|
||||
// AudioWorklet path — send float samples to the worklet
|
||||
playbackNode.port.postMessage(floatData.buffer, [floatData.buffer]);
|
||||
} else {
|
||||
// Fallback: scheduled BufferSource
|
||||
const buffer = audioCtx.createBuffer(1, floatData.length, SAMPLE_RATE);
|
||||
buffer.getChannelData(0).set(floatData);
|
||||
const source = audioCtx.createBufferSource();
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
// AudioWorklet processor for playing received audio.
|
||||
// Receives PCM samples from the main thread and outputs them.
|
||||
|
||||
class PlaybackProcessor extends AudioWorkletProcessor {
|
||||
constructor() {
|
||||
super();
|
||||
this.buffer = new Float32Array(0);
|
||||
this.maxBuffered = 48000 / 5; // 200ms max
|
||||
this.port.onmessage = (e) => {
|
||||
const incoming = new Float32Array(e.data);
|
||||
// Append
|
||||
const newBuf = new Float32Array(this.buffer.length + incoming.length);
|
||||
newBuf.set(this.buffer);
|
||||
newBuf.set(incoming, this.buffer.length);
|
||||
this.buffer = newBuf;
|
||||
|
||||
// Cap buffer to prevent drift
|
||||
if (this.buffer.length > this.maxBuffered) {
|
||||
this.buffer = this.buffer.slice(this.buffer.length - this.maxBuffered);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
process(inputs, outputs, parameters) {
|
||||
const output = outputs[0];
|
||||
if (!output || !output[0]) return true;
|
||||
|
||||
const out = output[0]; // 128 samples typically
|
||||
|
||||
if (this.buffer.length >= out.length) {
|
||||
out.set(this.buffer.subarray(0, out.length));
|
||||
this.buffer = this.buffer.slice(out.length);
|
||||
} else if (this.buffer.length > 0) {
|
||||
out.set(this.buffer);
|
||||
for (let i = this.buffer.length; i < out.length; i++) out[i] = 0;
|
||||
this.buffer = new Float32Array(0);
|
||||
} else {
|
||||
for (let i = 0; i < out.length; i++) out[i] = 0;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
registerProcessor('playback-processor', PlaybackProcessor);
|
||||
Reference in New Issue
Block a user