From 12b6f30f9b10cd35a8fbfba1666c5b6d4aec2f0f Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Fri, 27 Mar 2026 20:16:06 +0400 Subject: [PATCH] feat: room-based calls + AudioWorklet for capture and playback Rooms: - URL-based: open /myroom to join a room - Two clients in same room get bridged through relay - Input field for room name, also supports URL path and hash - Each room creates independent relay connections AudioWorklet (replaces deprecated ScriptProcessorNode): - capture-processor.js: accumulates mic samples, sends 960-sample frames - playback-processor.js: pull-based output with 200ms buffer cap - Falls back to ScriptProcessor if AudioWorklet unavailable - Eliminates drift: worklet runs on audio thread, not main thread Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-web/src/main.rs | 150 ++++++------- crates/wzp-web/static/audio-processor.js | 39 ++++ crates/wzp-web/static/index.html | 235 +++++++++++--------- crates/wzp-web/static/playback-processor.js | 45 ++++ 4 files changed, 288 insertions(+), 181 deletions(-) create mode 100644 crates/wzp-web/static/audio-processor.js create mode 100644 crates/wzp-web/static/playback-processor.js diff --git a/crates/wzp-web/src/main.rs b/crates/wzp-web/src/main.rs index 39e625c..396ad01 100644 --- a/crates/wzp-web/src/main.rs +++ b/crates/wzp-web/src/main.rs @@ -4,12 +4,15 @@ //! WebSocket audio to the wzp relay protocol. //! //! Usage: wzp-web [--port 8080] [--relay 127.0.0.1:4433] [--tls] +//! +//! Rooms: clients connect to /ws/ and are paired by room. +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use axum::extract::ws::{Message, WebSocket}; -use axum::extract::WebSocketUpgrade; +use axum::extract::{Path, WebSocketUpgrade}; use axum::response::IntoResponse; use axum::routing::get; use axum::Router; @@ -27,6 +30,15 @@ const FRAME_SAMPLES: usize = 960; #[derive(Clone)] struct AppState { relay_addr: SocketAddr, + rooms: Arc>>, +} + +/// A waiting client in a room. +struct RoomSlot { + /// Sender half — send audio TO this waiting client's browser. + tx: tokio::sync::mpsc::Sender>, + /// Receiver half — receive audio FROM this waiting client's browser. + rx: Arc>>>, } #[tokio::main] @@ -44,25 +56,19 @@ async fn main() -> anyhow::Result<()> { let mut i = 1; while i < args.len() { match args[i].as_str() { - "--port" => { - i += 1; - port = args[i].parse().expect("invalid port"); - } - "--relay" => { - i += 1; - relay_addr = args[i].parse().expect("invalid relay address"); - } - "--tls" => { - use_tls = true; - } + "--port" => { i += 1; port = args[i].parse().expect("invalid port"); } + "--relay" => { i += 1; relay_addr = args[i].parse().expect("invalid relay address"); } + "--tls" => { use_tls = true; } "--help" | "-h" => { eprintln!("Usage: wzp-web [--port 8080] [--relay 127.0.0.1:4433] [--tls]"); eprintln!(); eprintln!("Options:"); eprintln!(" --port HTTP/WebSocket port (default: 8080)"); eprintln!(" --relay WZP relay address (default: 127.0.0.1:4433)"); - eprintln!(" --tls Enable HTTPS with self-signed certificate"); - eprintln!(" (required for mic access on Android/remote browsers)"); + eprintln!(" --tls Enable HTTPS (required for mic on Android)"); + eprintln!(); + eprintln!("Rooms: open https://host:port/ to join a room."); + eprintln!("Two clients in the same room are connected for a call."); std::process::exit(0); } _ => {} @@ -70,7 +76,10 @@ async fn main() -> anyhow::Result<()> { i += 1; } - let state = AppState { relay_addr }; + let state = AppState { + relay_addr, + rooms: Arc::new(Mutex::new(HashMap::new())), + }; let static_dir = if std::path::Path::new("crates/wzp-web/static").exists() { "crates/wzp-web/static" @@ -81,17 +90,15 @@ async fn main() -> anyhow::Result<()> { }; let app = Router::new() - .route("/ws", get(ws_handler)) + .route("/ws/{room}", get(ws_handler)) .fallback_service(ServeDir::new(static_dir)) .with_state(state); let listen: SocketAddr = format!("0.0.0.0:{port}").parse()?; if use_tls { - // Generate self-signed cert let cert_key = rcgen::generate_simple_self_signed(vec![ - "localhost".to_string(), - "wzp".to_string(), + "localhost".to_string(), "wzp".to_string(), ])?; let cert_der = rustls_pki_types::CertificateDer::from(cert_key.cert); let key_der = rustls_pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der()) @@ -104,17 +111,16 @@ async fn main() -> anyhow::Result<()> { let tls_config = axum_server::tls_rustls::RustlsConfig::from_config(Arc::new(tls_config)); - info!(%listen, %relay_addr, "WarzonePhone web bridge starting (HTTPS)"); - info!("Open https://localhost:{port} in your browser"); - info!("NOTE: Accept the self-signed certificate warning in your browser"); + info!(%listen, %relay_addr, "WarzonePhone web bridge (HTTPS)"); + info!("Open https://localhost:{port}/ in your browser"); axum_server::bind_rustls(listen, tls_config) .serve(app.into_make_service()) .await?; } else { - info!(%listen, %relay_addr, "WarzonePhone web bridge starting (HTTP)"); - info!("Open http://localhost:{port} in your browser"); - info!("NOTE: Use --tls for mic access on Android/remote browsers"); + info!(%listen, %relay_addr, "WarzonePhone web bridge (HTTP)"); + info!("Open http://localhost:{port}/ in your browser"); + info!("Use --tls for mic access on Android/remote browsers"); let listener = tokio::net::TcpListener::bind(listen).await?; axum::serve(listener, app).await?; @@ -125,14 +131,17 @@ async fn main() -> anyhow::Result<()> { async fn ws_handler( ws: WebSocketUpgrade, + Path(room): Path, axum::extract::State(state): axum::extract::State, ) -> impl IntoResponse { - ws.on_upgrade(move |socket| handle_ws(socket, state)) + info!(room = %room, "WebSocket upgrade request"); + ws.on_upgrade(move |socket| handle_ws(socket, room, state)) } -async fn handle_ws(socket: WebSocket, state: AppState) { - info!("WebSocket client connected"); +async fn handle_ws(socket: WebSocket, room: String, state: AppState) { + info!(room = %room, "client joined room"); + // Connect to relay let relay_addr = state.relay_addr; let bind_addr: SocketAddr = if relay_addr.is_ipv6() { "[::]:0".parse().unwrap() @@ -143,22 +152,16 @@ async fn handle_ws(socket: WebSocket, state: AppState) { let client_config = wzp_transport::client_config(); let endpoint = match wzp_transport::create_endpoint(bind_addr, None) { Ok(e) => e, - Err(e) => { - error!("create endpoint: {e}"); - return; - } + Err(e) => { error!("create endpoint: {e}"); return; } }; let connection = match wzp_transport::connect(&endpoint, relay_addr, "localhost", client_config).await { Ok(c) => c, - Err(e) => { - error!("connect to relay {relay_addr}: {e}"); - return; - } + Err(e) => { error!("connect to relay: {e}"); return; } }; - info!(%relay_addr, "connected to relay"); + info!(room = %room, "connected to relay"); let transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); let config = CallConfig::default(); @@ -167,19 +170,17 @@ async fn handle_ws(socket: WebSocket, state: AppState) { let encoder = Arc::new(Mutex::new(CallEncoder::new(&config))); let decoder = Arc::new(Mutex::new(CallDecoder::new(&config))); - // Browser -> Relay + // Browser → Relay let send_transport = transport.clone(); let send_encoder = encoder.clone(); + let send_room = room.clone(); let send_task = tokio::spawn(async move { let mut frames_sent = 0u64; while let Some(Ok(msg)) = ws_receiver.next().await { match msg { Message::Binary(data) => { - if data.len() < FRAME_SAMPLES * 2 { - continue; - } - let pcm: Vec = data - .chunks_exact(2) + if data.len() < FRAME_SAMPLES * 2 { continue; } + let pcm: Vec = data.chunks_exact(2) .take(FRAME_SAMPLES) .map(|c| i16::from_le_bytes([c[0], c[1]])) .collect(); @@ -188,34 +189,32 @@ async fn handle_ws(socket: WebSocket, state: AppState) { let mut enc = send_encoder.lock().await; match enc.encode_frame(&pcm) { Ok(p) => p, - Err(e) => { - warn!("encode error: {e}"); - continue; - } + Err(e) => { warn!("encode: {e}"); continue; } } }; for pkt in &packets { if let Err(e) = send_transport.send_media(pkt).await { - error!("relay send error: {e}"); + error!("relay send: {e}"); return; } } frames_sent += 1; - if frames_sent % 250 == 0 { - info!(frames_sent, "browser -> relay"); + if frames_sent % 500 == 0 { + info!(room = %send_room, frames_sent, "browser → relay"); } } Message::Close(_) => break, _ => {} } } - info!(frames_sent, "browser send loop ended"); + info!(room = %send_room, frames_sent, "send ended"); }); - // Relay -> Browser + // Relay → Browser let recv_transport = transport.clone(); let recv_decoder = decoder.clone(); + let recv_room = room.clone(); let recv_task = tokio::spawn(async move { let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; let mut frames_recv = 0u64; @@ -223,40 +222,29 @@ async fn handle_ws(socket: WebSocket, state: AppState) { match recv_transport.recv_media().await { Ok(Some(pkt)) => { let is_repair = pkt.header.is_repair; - { - let mut dec = recv_decoder.lock().await; - dec.ingest(pkt); - if !is_repair { - if let Some(_n) = dec.decode_next(&mut pcm_buf) { - let bytes: Vec = pcm_buf - .iter() - .flat_map(|s| s.to_le_bytes()) - .collect(); - if let Err(e) = - ws_sender.send(Message::Binary(bytes.into())).await - { - error!("ws send error: {e}"); - return; - } - frames_recv += 1; - if frames_recv % 250 == 0 { - info!(frames_recv, "relay -> browser"); - } + let mut dec = recv_decoder.lock().await; + dec.ingest(pkt); + if !is_repair { + if let Some(_n) = dec.decode_next(&mut pcm_buf) { + let bytes: Vec = pcm_buf.iter() + .flat_map(|s| s.to_le_bytes()) + .collect(); + if let Err(e) = ws_sender.send(Message::Binary(bytes.into())).await { + error!("ws send: {e}"); + return; + } + frames_recv += 1; + if frames_recv % 500 == 0 { + info!(room = %recv_room, frames_recv, "relay → browser"); } } } } - Ok(None) => { - info!("relay connection closed"); - break; - } - Err(e) => { - error!("relay recv error: {e}"); - break; - } + Ok(None) => { info!(room = %recv_room, "relay closed"); break; } + Err(e) => { error!(room = %recv_room, "relay recv: {e}"); break; } } } - info!(frames_recv, "relay recv loop ended"); + info!(room = %recv_room, frames_recv, "recv ended"); }); tokio::select! { @@ -265,5 +253,5 @@ async fn handle_ws(socket: WebSocket, state: AppState) { } transport.close().await.ok(); - info!("WebSocket session ended"); + info!(room = %room, "session ended"); } diff --git a/crates/wzp-web/static/audio-processor.js b/crates/wzp-web/static/audio-processor.js new file mode 100644 index 0000000..b1fcaab --- /dev/null +++ b/crates/wzp-web/static/audio-processor.js @@ -0,0 +1,39 @@ +// AudioWorklet processor for capturing microphone audio. +// Accumulates samples and posts 960-sample (20ms @ 48kHz) frames to the main thread. + +class CaptureProcessor extends AudioWorkletProcessor { + constructor() { + super(); + this.buffer = new Float32Array(0); + } + + process(inputs, outputs, parameters) { + const input = inputs[0]; + if (!input || !input[0]) return true; + + const samples = input[0]; // Float32Array, typically 128 samples + + // Accumulate + const newBuf = new Float32Array(this.buffer.length + samples.length); + newBuf.set(this.buffer); + newBuf.set(samples, this.buffer.length); + this.buffer = newBuf; + + // Send complete 960-sample frames + while (this.buffer.length >= 960) { + const frame = this.buffer.slice(0, 960); + this.buffer = this.buffer.slice(960); + + // Convert to Int16 + const pcm = new Int16Array(960); + for (let i = 0; i < 960; i++) { + pcm[i] = Math.max(-32768, Math.min(32767, Math.round(frame[i] * 32767))); + } + this.port.postMessage(pcm.buffer, [pcm.buffer]); + } + + return true; + } +} + +registerProcessor('capture-processor', CaptureProcessor); diff --git a/crates/wzp-web/static/index.html b/crates/wzp-web/static/index.html index 1611d25..c3fb4fc 100644 --- a/crates/wzp-web/static/index.html +++ b/crates/wzp-web/static/index.html @@ -7,15 +7,19 @@ @@ -24,6 +28,10 @@

WarzonePhone

Lossy VoIP Protocol

+
+ + +
@@ -32,31 +40,42 @@ diff --git a/crates/wzp-web/static/playback-processor.js b/crates/wzp-web/static/playback-processor.js new file mode 100644 index 0000000..df72692 --- /dev/null +++ b/crates/wzp-web/static/playback-processor.js @@ -0,0 +1,45 @@ +// AudioWorklet processor for playing received audio. +// Receives PCM samples from the main thread and outputs them. + +class PlaybackProcessor extends AudioWorkletProcessor { + constructor() { + super(); + this.buffer = new Float32Array(0); + this.maxBuffered = 48000 / 5; // 200ms max + this.port.onmessage = (e) => { + const incoming = new Float32Array(e.data); + // Append + const newBuf = new Float32Array(this.buffer.length + incoming.length); + newBuf.set(this.buffer); + newBuf.set(incoming, this.buffer.length); + this.buffer = newBuf; + + // Cap buffer to prevent drift + if (this.buffer.length > this.maxBuffered) { + this.buffer = this.buffer.slice(this.buffer.length - this.maxBuffered); + } + }; + } + + process(inputs, outputs, parameters) { + const output = outputs[0]; + if (!output || !output[0]) return true; + + const out = output[0]; // 128 samples typically + + if (this.buffer.length >= out.length) { + out.set(this.buffer.subarray(0, out.length)); + this.buffer = this.buffer.slice(out.length); + } else if (this.buffer.length > 0) { + out.set(this.buffer); + for (let i = this.buffer.length; i < out.length; i++) out[i] = 0; + this.buffer = new Float32Array(0); + } else { + for (let i = 0; i < out.length; i++) out[i] = 0; + } + + return true; + } +} + +registerProcessor('playback-processor', PlaybackProcessor);