feat: room-based calls + AudioWorklet for capture and playback

Rooms:
- URL-based: open /myroom to join a room
- Two clients in same room get bridged through relay
- Input field for room name, also supports URL path and hash
- Each room creates independent relay connections

AudioWorklet (replaces deprecated ScriptProcessorNode):
- capture-processor.js: accumulates mic samples, sends 960-sample frames
- playback-processor.js: pull-based output with 200ms buffer cap
- Falls back to ScriptProcessor if AudioWorklet unavailable
- Eliminates drift: worklet runs on audio thread, not main thread

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-27 20:16:06 +04:00
parent 722bca0c87
commit 12b6f30f9b
4 changed files with 288 additions and 181 deletions

View File

@@ -4,12 +4,15 @@
//! WebSocket audio to the wzp relay protocol.
//!
//! Usage: wzp-web [--port 8080] [--relay 127.0.0.1:4433] [--tls]
//!
//! Rooms: clients connect to /ws/<room-name> and are paired by room.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::WebSocketUpgrade;
use axum::extract::{Path, WebSocketUpgrade};
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
@@ -27,6 +30,15 @@ const FRAME_SAMPLES: usize = 960;
#[derive(Clone)]
struct AppState {
relay_addr: SocketAddr,
rooms: Arc<Mutex<HashMap<String, RoomSlot>>>,
}
/// A waiting client in a room.
struct RoomSlot {
/// Sender half — send audio TO this waiting client's browser.
tx: tokio::sync::mpsc::Sender<Vec<u8>>,
/// Receiver half — receive audio FROM this waiting client's browser.
rx: Arc<Mutex<tokio::sync::mpsc::Receiver<Vec<i16>>>>,
}
#[tokio::main]
@@ -44,25 +56,19 @@ async fn main() -> anyhow::Result<()> {
let mut i = 1;
while i < args.len() {
match args[i].as_str() {
"--port" => {
i += 1;
port = args[i].parse().expect("invalid port");
}
"--relay" => {
i += 1;
relay_addr = args[i].parse().expect("invalid relay address");
}
"--tls" => {
use_tls = true;
}
"--port" => { i += 1; port = args[i].parse().expect("invalid port"); }
"--relay" => { i += 1; relay_addr = args[i].parse().expect("invalid relay address"); }
"--tls" => { use_tls = true; }
"--help" | "-h" => {
eprintln!("Usage: wzp-web [--port 8080] [--relay 127.0.0.1:4433] [--tls]");
eprintln!();
eprintln!("Options:");
eprintln!(" --port <port> HTTP/WebSocket port (default: 8080)");
eprintln!(" --relay <addr> WZP relay address (default: 127.0.0.1:4433)");
eprintln!(" --tls Enable HTTPS with self-signed certificate");
eprintln!(" (required for mic access on Android/remote browsers)");
eprintln!(" --tls Enable HTTPS (required for mic on Android)");
eprintln!();
eprintln!("Rooms: open https://host:port/<room-name> to join a room.");
eprintln!("Two clients in the same room are connected for a call.");
std::process::exit(0);
}
_ => {}
@@ -70,7 +76,10 @@ async fn main() -> anyhow::Result<()> {
i += 1;
}
let state = AppState { relay_addr };
let state = AppState {
relay_addr,
rooms: Arc::new(Mutex::new(HashMap::new())),
};
let static_dir = if std::path::Path::new("crates/wzp-web/static").exists() {
"crates/wzp-web/static"
@@ -81,17 +90,15 @@ async fn main() -> anyhow::Result<()> {
};
let app = Router::new()
.route("/ws", get(ws_handler))
.route("/ws/{room}", get(ws_handler))
.fallback_service(ServeDir::new(static_dir))
.with_state(state);
let listen: SocketAddr = format!("0.0.0.0:{port}").parse()?;
if use_tls {
// Generate self-signed cert
let cert_key = rcgen::generate_simple_self_signed(vec![
"localhost".to_string(),
"wzp".to_string(),
"localhost".to_string(), "wzp".to_string(),
])?;
let cert_der = rustls_pki_types::CertificateDer::from(cert_key.cert);
let key_der = rustls_pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der())
@@ -104,17 +111,16 @@ async fn main() -> anyhow::Result<()> {
let tls_config = axum_server::tls_rustls::RustlsConfig::from_config(Arc::new(tls_config));
info!(%listen, %relay_addr, "WarzonePhone web bridge starting (HTTPS)");
info!("Open https://localhost:{port} in your browser");
info!("NOTE: Accept the self-signed certificate warning in your browser");
info!(%listen, %relay_addr, "WarzonePhone web bridge (HTTPS)");
info!("Open https://localhost:{port}/<room-name> in your browser");
axum_server::bind_rustls(listen, tls_config)
.serve(app.into_make_service())
.await?;
} else {
info!(%listen, %relay_addr, "WarzonePhone web bridge starting (HTTP)");
info!("Open http://localhost:{port} in your browser");
info!("NOTE: Use --tls for mic access on Android/remote browsers");
info!(%listen, %relay_addr, "WarzonePhone web bridge (HTTP)");
info!("Open http://localhost:{port}/<room-name> in your browser");
info!("Use --tls for mic access on Android/remote browsers");
let listener = tokio::net::TcpListener::bind(listen).await?;
axum::serve(listener, app).await?;
@@ -125,14 +131,17 @@ async fn main() -> anyhow::Result<()> {
async fn ws_handler(
ws: WebSocketUpgrade,
Path(room): Path<String>,
axum::extract::State(state): axum::extract::State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_ws(socket, state))
info!(room = %room, "WebSocket upgrade request");
ws.on_upgrade(move |socket| handle_ws(socket, room, state))
}
async fn handle_ws(socket: WebSocket, state: AppState) {
info!("WebSocket client connected");
async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
info!(room = %room, "client joined room");
// Connect to relay
let relay_addr = state.relay_addr;
let bind_addr: SocketAddr = if relay_addr.is_ipv6() {
"[::]:0".parse().unwrap()
@@ -143,22 +152,16 @@ async fn handle_ws(socket: WebSocket, state: AppState) {
let client_config = wzp_transport::client_config();
let endpoint = match wzp_transport::create_endpoint(bind_addr, None) {
Ok(e) => e,
Err(e) => {
error!("create endpoint: {e}");
return;
}
Err(e) => { error!("create endpoint: {e}"); return; }
};
let connection =
match wzp_transport::connect(&endpoint, relay_addr, "localhost", client_config).await {
Ok(c) => c,
Err(e) => {
error!("connect to relay {relay_addr}: {e}");
return;
}
Err(e) => { error!("connect to relay: {e}"); return; }
};
info!(%relay_addr, "connected to relay");
info!(room = %room, "connected to relay");
let transport = Arc::new(wzp_transport::QuinnTransport::new(connection));
let config = CallConfig::default();
@@ -167,19 +170,17 @@ async fn handle_ws(socket: WebSocket, state: AppState) {
let encoder = Arc::new(Mutex::new(CallEncoder::new(&config)));
let decoder = Arc::new(Mutex::new(CallDecoder::new(&config)));
// Browser -> Relay
// Browser Relay
let send_transport = transport.clone();
let send_encoder = encoder.clone();
let send_room = room.clone();
let send_task = tokio::spawn(async move {
let mut frames_sent = 0u64;
while let Some(Ok(msg)) = ws_receiver.next().await {
match msg {
Message::Binary(data) => {
if data.len() < FRAME_SAMPLES * 2 {
continue;
}
let pcm: Vec<i16> = data
.chunks_exact(2)
if data.len() < FRAME_SAMPLES * 2 { continue; }
let pcm: Vec<i16> = data.chunks_exact(2)
.take(FRAME_SAMPLES)
.map(|c| i16::from_le_bytes([c[0], c[1]]))
.collect();
@@ -188,34 +189,32 @@ async fn handle_ws(socket: WebSocket, state: AppState) {
let mut enc = send_encoder.lock().await;
match enc.encode_frame(&pcm) {
Ok(p) => p,
Err(e) => {
warn!("encode error: {e}");
continue;
}
Err(e) => { warn!("encode: {e}"); continue; }
}
};
for pkt in &packets {
if let Err(e) = send_transport.send_media(pkt).await {
error!("relay send error: {e}");
error!("relay send: {e}");
return;
}
}
frames_sent += 1;
if frames_sent % 250 == 0 {
info!(frames_sent, "browser -> relay");
if frames_sent % 500 == 0 {
info!(room = %send_room, frames_sent, "browser relay");
}
}
Message::Close(_) => break,
_ => {}
}
}
info!(frames_sent, "browser send loop ended");
info!(room = %send_room, frames_sent, "send ended");
});
// Relay -> Browser
// Relay Browser
let recv_transport = transport.clone();
let recv_decoder = decoder.clone();
let recv_room = room.clone();
let recv_task = tokio::spawn(async move {
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
let mut frames_recv = 0u64;
@@ -223,40 +222,29 @@ async fn handle_ws(socket: WebSocket, state: AppState) {
match recv_transport.recv_media().await {
Ok(Some(pkt)) => {
let is_repair = pkt.header.is_repair;
{
let mut dec = recv_decoder.lock().await;
dec.ingest(pkt);
if !is_repair {
if let Some(_n) = dec.decode_next(&mut pcm_buf) {
let bytes: Vec<u8> = pcm_buf
.iter()
.flat_map(|s| s.to_le_bytes())
.collect();
if let Err(e) =
ws_sender.send(Message::Binary(bytes.into())).await
{
error!("ws send error: {e}");
return;
}
frames_recv += 1;
if frames_recv % 250 == 0 {
info!(frames_recv, "relay -> browser");
}
let mut dec = recv_decoder.lock().await;
dec.ingest(pkt);
if !is_repair {
if let Some(_n) = dec.decode_next(&mut pcm_buf) {
let bytes: Vec<u8> = pcm_buf.iter()
.flat_map(|s| s.to_le_bytes())
.collect();
if let Err(e) = ws_sender.send(Message::Binary(bytes.into())).await {
error!("ws send: {e}");
return;
}
frames_recv += 1;
if frames_recv % 500 == 0 {
info!(room = %recv_room, frames_recv, "relay → browser");
}
}
}
}
Ok(None) => {
info!("relay connection closed");
break;
}
Err(e) => {
error!("relay recv error: {e}");
break;
}
Ok(None) => { info!(room = %recv_room, "relay closed"); break; }
Err(e) => { error!(room = %recv_room, "relay recv: {e}"); break; }
}
}
info!(frames_recv, "relay recv loop ended");
info!(room = %recv_room, frames_recv, "recv ended");
});
tokio::select! {
@@ -265,5 +253,5 @@ async fn handle_ws(socket: WebSocket, state: AppState) {
}
transport.close().await.ok();
info!("WebSocket session ended");
info!(room = %room, "session ended");
}