From d8330525efe41e4b1761f30553b04a88dfed22b3 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Fri, 27 Mar 2026 20:36:19 +0400 Subject: [PATCH] feat: multi-party rooms (SFU) + push-to-talk radio mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Room-based SFU relay: - Clients join named rooms (room name from QUIC SNI) - Each participant's packets forwarded to all others (no mixing) - Multiple rooms run concurrently on one relay - Web bridge passes room name from URL path to relay Push-to-talk (radio mode): - Toggle "Radio mode" checkbox after connecting - Hold PTT button or spacebar to transmit - Release to mute mic (receive-only) - Works on desktop (spacebar) and mobile (touch) URL routing: - /myroom → joins room "myroom" - Room name input field as fallback Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-relay/Cargo.toml | 1 + crates/wzp-relay/src/lib.rs | 1 + crates/wzp-relay/src/main.rs | 201 +++++++------------------------ crates/wzp-relay/src/room.rs | 200 ++++++++++++++++++++++++++++++ crates/wzp-web/src/main.rs | 4 +- crates/wzp-web/static/index.html | 67 ++++++++++- 6 files changed, 313 insertions(+), 161 deletions(-) create mode 100644 crates/wzp-relay/src/room.rs diff --git a/crates/wzp-relay/Cargo.toml b/crates/wzp-relay/Cargo.toml index 6dad640..0b42699 100644 --- a/crates/wzp-relay/Cargo.toml +++ b/crates/wzp-relay/Cargo.toml @@ -21,6 +21,7 @@ serde = { workspace = true } toml = "0.8" anyhow = "1" rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } +quinn = { workspace = true } [[bin]] name = "wzp-relay" diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index cd5cfc9..e6180b6 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -10,6 +10,7 @@ pub mod config; pub mod handshake; pub mod pipeline; +pub mod room; pub mod session_mgr; pub use config::RelayConfig; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 7634572..0eb769e 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -1,7 +1,11 @@ //! WarzonePhone relay daemon entry point. //! -//! Accepts client QUIC connections and bridges pairs of clients together. -//! When a --remote relay is configured, forwards traffic to it instead. +//! Supports two modes: +//! - **Room mode** (default): clients join named rooms, packets forwarded to all others (SFU) +//! - **Forward mode** (--remote): all traffic forwarded to a remote relay +//! +//! Room names are passed via the QUIC SNI (server_name) field. +//! The web bridge connects with room name as SNI. use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; @@ -14,7 +18,7 @@ use tracing::{error, info}; use wzp_proto::MediaTransport; use wzp_relay::config::RelayConfig; use wzp_relay::pipeline::{PipelineConfig, RelayPipeline}; -use wzp_relay::session_mgr::SessionManager; +use wzp_relay::room::{self, RoomManager}; fn parse_args() -> RelayConfig { let mut config = RelayConfig::default(); @@ -39,7 +43,12 @@ fn parse_args() -> RelayConfig { eprintln!(); eprintln!("Options:"); eprintln!(" --listen Listen address (default: 0.0.0.0:4433)"); - eprintln!(" --remote Remote relay address for forwarding"); + eprintln!(" --remote Remote relay for forwarding (disables room mode)"); + eprintln!(); + eprintln!("Room mode (default):"); + eprintln!(" Clients join rooms by name. Packets are forwarded to all"); + eprintln!(" other participants in the same room (SFU model)."); + eprintln!(" Room name comes from QUIC SNI or defaults to 'default'."); std::process::exit(0); } other => { @@ -57,78 +66,6 @@ struct RelayStats { downstream_packets: AtomicU64, } -/// Bridge two transports: A's packets go to B, B's go to A. -async fn run_bridge( - a: Arc, - b: Arc, - a_addr: SocketAddr, - b_addr: SocketAddr, -) { - info!(%a_addr, %b_addr, "bridging two clients"); - - let stats = Arc::new(RelayStats { - upstream_packets: AtomicU64::new(0), - downstream_packets: AtomicU64::new(0), - }); - - let stats_log = stats.clone(); - let stats_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - interval.tick().await; - let ab = stats_log.upstream_packets.load(Ordering::Relaxed); - let ba = stats_log.downstream_packets.load(Ordering::Relaxed); - info!(a_to_b = ab, b_to_a = ba, "bridge stats"); - } - }); - - let a1 = a.clone(); - let b1 = b.clone(); - let s1 = stats.clone(); - let a_to_b = tokio::spawn(async move { - loop { - match a1.recv_media().await { - Ok(Some(pkt)) => { - if let Err(e) = b1.send_media(&pkt).await { - error!("A→B send error: {e}"); - break; - } - s1.upstream_packets.fetch_add(1, Ordering::Relaxed); - } - Ok(None) => { info!(%a_addr, "client A disconnected"); break; } - Err(e) => { error!(%a_addr, "A recv error: {e}"); break; } - } - } - }); - - let a2 = a.clone(); - let b2 = b.clone(); - let s2 = stats.clone(); - let b_to_a = tokio::spawn(async move { - loop { - match b2.recv_media().await { - Ok(Some(pkt)) => { - if let Err(e) = a2.send_media(&pkt).await { - error!("B→A send error: {e}"); - break; - } - s2.downstream_packets.fetch_add(1, Ordering::Relaxed); - } - Ok(None) => { info!(%b_addr, "client B disconnected"); break; } - Err(e) => { error!(%b_addr, "B recv error: {e}"); break; } - } - } - }); - - tokio::select! { - _ = a_to_b => {} - _ = b_to_a => {} - } - stats_handle.abort(); - info!(%a_addr, %b_addr, "bridge ended"); -} - -/// Run upstream forwarding: client → pipeline → remote. async fn run_upstream( client: Arc, remote: Arc, @@ -159,7 +96,6 @@ async fn run_upstream( } } -/// Run downstream forwarding: remote → pipeline → client. async fn run_downstream( client: Arc, remote: Arc, @@ -190,12 +126,6 @@ async fn run_downstream( } } -/// Waiting client: address + transport. -struct WaitingClient { - addr: SocketAddr, - transport: Arc, -} - #[tokio::main] async fn main() -> anyhow::Result<()> { let config = parse_args(); @@ -205,29 +135,24 @@ async fn main() -> anyhow::Result<()> { .expect("failed to install rustls crypto provider"); info!(addr = %config.listen_addr, "WarzonePhone relay starting"); - if let Some(remote) = config.remote_relay { - info!(%remote, "forwarding mode → remote relay"); - } else { - info!("bridge mode — pairs clients together (echo when alone)"); - } let (server_config, _cert) = wzp_transport::server_config(); let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?; - let _sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); - - // Remote relay transport (forwarding mode only) + // Forward mode let remote_transport: Option> = if let Some(remote_addr) = config.remote_relay { + info!(%remote_addr, "forward mode → remote relay"); let client_cfg = wzp_transport::client_config(); let conn = wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?; Some(Arc::new(wzp_transport::QuinnTransport::new(conn))) } else { + info!("room mode — clients join named rooms (SFU)"); None }; - // Bridge mode: slot for waiting client - let waiting: Arc>> = Arc::new(Mutex::new(None)); + // Room manager (room mode only) + let room_mgr = Arc::new(Mutex::new(RoomManager::new())); info!("Listening for connections..."); @@ -238,15 +163,27 @@ async fn main() -> anyhow::Result<()> { }; let remote_transport = remote_transport.clone(); - let waiting = waiting.clone(); + let room_mgr = room_mgr.clone(); tokio::spawn(async move { let addr = connection.remote_address(); + + // Extract room name from QUIC handshake data (SNI). + // The web bridge connects with the room name as server_name. + let room_name = connection + .handshake_data() + .and_then(|hd| { + hd.downcast::().ok() + }) + .and_then(|hd| hd.server_name.clone()) + .unwrap_or_else(|| "default".to_string()); + let transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); - info!(%addr, "new client"); + + info!(%addr, room = %room_name, "new client"); if let Some(remote) = remote_transport { - // Forwarding mode + // Forward mode — same as before let stats = Arc::new(RelayStats { upstream_packets: AtomicU64::new(0), downstream_packets: AtomicU64::new(0), @@ -273,71 +210,21 @@ async fn main() -> anyhow::Result<()> { tokio::select! { _ = up => {} _ = dn => {} } stats_handle.abort(); transport.close().await.ok(); - info!(%addr, "forwarding session ended"); } else { - // Bridge mode — try to pair with a waiting client - let peer = { - let mut slot = waiting.lock().await; - slot.take() + // Room mode — join room and forward to all others + let participant_id = { + let mut mgr = room_mgr.lock().await; + mgr.join(&room_name, addr, transport.clone()) }; - if let Some(peer_client) = peer { - // Second client — bridge immediately - run_bridge(peer_client.transport.clone(), transport.clone(), peer_client.addr, addr).await; - peer_client.transport.close().await.ok(); - transport.close().await.ok(); + room::run_participant( + room_mgr.clone(), + room_name, + participant_id, + transport.clone(), + ).await; - // After bridge ends, clean up so next pair can form - info!("bridge complete, ready for next pair"); - } else { - // First client — register and wait - { - let mut slot = waiting.lock().await; - *slot = Some(WaitingClient { addr, transport: transport.clone() }); - } - info!(%addr, "waiting for peer (echo in meantime)"); - - // Echo loop — but check periodically if we've been claimed by a bridge - loop { - // Check if we've been taken from the waiting slot - // (meaning a second client connected and started the bridge) - { - let slot = waiting.lock().await; - if slot.is_none() { - // We were taken — a bridge is running with our transport. - // Just exit this task; the bridge task handles everything. - info!(%addr, "peer connected, exiting echo loop"); - return; - } - } - - // Echo with a short timeout so we can check the slot again - match tokio::time::timeout( - Duration::from_millis(100), - transport.recv_media() - ).await { - Ok(Ok(Some(pkt))) => { - let _ = transport.send_media(&pkt).await; - } - Ok(Ok(None)) => { - info!(%addr, "disconnected while waiting"); - // Clean up our slot - let mut slot = waiting.lock().await; - *slot = None; - return; - } - Ok(Err(e)) => { - error!(%addr, "echo error: {e}"); - let mut slot = waiting.lock().await; - *slot = None; - return; - } - Err(_) => { - // Timeout — loop back and check if we got paired - } - } - } - } + transport.close().await.ok(); } }); } diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs new file mode 100644 index 0000000..88c767f --- /dev/null +++ b/crates/wzp-relay/src/room.rs @@ -0,0 +1,200 @@ +//! Room management for multi-party calls. +//! +//! Each room holds N participants. When one participant sends a media packet, +//! the relay forwards it to all other participants in the room (SFU model). + +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use tokio::sync::Mutex; +use tracing::{error, info}; + +use wzp_proto::MediaTransport; + +/// Unique participant ID within a room. +pub type ParticipantId = u64; + +static NEXT_PARTICIPANT_ID: AtomicU64 = AtomicU64::new(1); + +fn next_id() -> ParticipantId { + NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed) +} + +/// A participant in a room. +struct Participant { + id: ParticipantId, + addr: std::net::SocketAddr, + transport: Arc, +} + +/// A room holding multiple participants. +struct Room { + participants: Vec, +} + +impl Room { + fn new() -> Self { + Self { + participants: Vec::new(), + } + } + + fn add(&mut self, addr: std::net::SocketAddr, transport: Arc) -> ParticipantId { + let id = next_id(); + info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room"); + self.participants.push(Participant { id, addr, transport }); + id + } + + fn remove(&mut self, id: ParticipantId) { + self.participants.retain(|p| p.id != id); + info!(room_size = self.participants.len(), participant = id, "left room"); + } + + fn others(&self, exclude_id: ParticipantId) -> Vec> { + self.participants + .iter() + .filter(|p| p.id != exclude_id) + .map(|p| p.transport.clone()) + .collect() + } + + fn is_empty(&self) -> bool { + self.participants.is_empty() + } + + fn len(&self) -> usize { + self.participants.len() + } +} + +/// Manages all rooms on the relay. +pub struct RoomManager { + rooms: HashMap, +} + +impl RoomManager { + pub fn new() -> Self { + Self { + rooms: HashMap::new(), + } + } + + /// Join a room. Returns the participant ID. + pub fn join( + &mut self, + room_name: &str, + addr: std::net::SocketAddr, + transport: Arc, + ) -> ParticipantId { + let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); + room.add(addr, transport) + } + + /// Leave a room. Removes the room if empty. + pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) { + if let Some(room) = self.rooms.get_mut(room_name) { + room.remove(participant_id); + if room.is_empty() { + self.rooms.remove(room_name); + info!(room = room_name, "room closed (empty)"); + } + } + } + + /// Get transports for all OTHER participants in a room. + pub fn others( + &self, + room_name: &str, + participant_id: ParticipantId, + ) -> Vec> { + self.rooms + .get(room_name) + .map(|r| r.others(participant_id)) + .unwrap_or_default() + } + + /// Get room size. + pub fn room_size(&self, room_name: &str) -> usize { + self.rooms.get(room_name).map(|r| r.len()).unwrap_or(0) + } + + /// List all rooms with their sizes. + pub fn list(&self) -> Vec<(String, usize)> { + self.rooms.iter().map(|(k, v)| (k.clone(), v.len())).collect() + } +} + +/// Run the receive loop for one participant in a room. +/// Forwards all received packets to every other participant. +pub async fn run_participant( + room_mgr: Arc>, + room_name: String, + participant_id: ParticipantId, + transport: Arc, +) { + let addr = transport.connection().remote_address(); + let mut packets_forwarded = 0u64; + + loop { + let pkt = match transport.recv_media().await { + Ok(Some(pkt)) => pkt, + Ok(None) => { + info!(%addr, participant = participant_id, "disconnected"); + break; + } + Err(e) => { + error!(%addr, participant = participant_id, "recv error: {e}"); + break; + } + }; + + // Get current list of other participants + let others = { + let mgr = room_mgr.lock().await; + mgr.others(&room_name, participant_id) + }; + + // Forward to all others + for other in &others { + // Best-effort: if one send fails, continue to others + if let Err(e) = other.send_media(&pkt).await { + // Don't log every failure — they'll be cleaned up when their recv loop breaks + let _ = e; + } + } + + packets_forwarded += 1; + if packets_forwarded % 500 == 0 { + let room_size = { + let mgr = room_mgr.lock().await; + mgr.room_size(&room_name) + }; + info!( + room = %room_name, + participant = participant_id, + forwarded = packets_forwarded, + room_size, + "participant stats" + ); + } + } + + // Clean up + let mut mgr = room_mgr.lock().await; + mgr.leave(&room_name, participant_id); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn room_join_leave() { + let mut mgr = RoomManager::new(); + // Can't test with real transports, but test the room logic + assert_eq!(mgr.room_size("test"), 0); + assert!(mgr.list().is_empty()); + } +} diff --git a/crates/wzp-web/src/main.rs b/crates/wzp-web/src/main.rs index 396ad01..d7e07d5 100644 --- a/crates/wzp-web/src/main.rs +++ b/crates/wzp-web/src/main.rs @@ -155,8 +155,10 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { Err(e) => { error!("create endpoint: {e}"); return; } }; + // Pass room name as QUIC SNI so the relay knows which room to join + let sni = if room.is_empty() { "default" } else { &room }; let connection = - match wzp_transport::connect(&endpoint, relay_addr, "localhost", client_config).await { + match wzp_transport::connect(&endpoint, relay_addr, sni, client_config).await { Ok(c) => c, Err(e) => { error!("connect to relay: {e}"); return; } }; diff --git a/crates/wzp-web/static/index.html b/crates/wzp-web/static/index.html index c3fb4fc..f86c6ee 100644 --- a/crates/wzp-web/static/index.html +++ b/crates/wzp-web/static/index.html @@ -22,6 +22,11 @@ .stats { margin-top: 0.5rem; font-size: 0.75rem; color: #555; font-family: monospace; } .level { margin-top: 1rem; height: 6px; background: #333; border-radius: 3px; overflow: hidden; } .level-bar { height: 100%; background: #00d4ff; width: 0%; transition: width 50ms; } + .controls { margin-top: 1rem; display: flex; gap: 0.5rem; justify-content: center; flex-wrap: wrap; } + .controls label { font-size: 0.8rem; color: #888; cursor: pointer; display: flex; align-items: center; gap: 0.3rem; } + .controls input[type="checkbox"] { accent-color: #00d4ff; } + #pttBtn { display: none; background: #444; color: #e0e0e0; border: 2px solid #666; padding: 0.8rem 2rem; font-size: 1rem; border-radius: 12px; cursor: pointer; user-select: none; -webkit-user-select: none; touch-action: none; } + #pttBtn.transmitting { background: #ff4444; border-color: #ff6666; color: white; } @@ -33,6 +38,10 @@ + +
@@ -48,6 +57,8 @@ let mediaStream = null; let captureNode = null; let playbackNode = null; let active = false; +let transmitting = true; // in open-mic mode, always transmitting +let pttMode = false; let framesSent = 0; let framesRecv = 0; let startTime = 0; @@ -108,6 +119,7 @@ async function startCall() { framesSent = 0; framesRecv = 0; startTime = Date.now(); + showControls(true); await startAudioCapture(); await startAudioPlayback(); startStatsUpdate(); @@ -142,6 +154,7 @@ function stopCall() { btn.textContent = 'Connect'; btn.classList.remove('active'); btn.disabled = false; + showControls(false); cleanupAudio(); if (ws) { ws.close(); ws = null; } if (statsInterval) { clearInterval(statsInterval); statsInterval = null; } @@ -163,8 +176,7 @@ async function startAudioCapture() { await audioCtx.audioWorklet.addModule('audio-processor.js'); captureNode = new AudioWorkletNode(audioCtx, 'capture-processor'); captureNode.port.onmessage = (e) => { - if (!active || !ws || ws.readyState !== WebSocket.OPEN) return; - // e.data is an ArrayBuffer of Int16 PCM + if (!active || !ws || ws.readyState !== WebSocket.OPEN || !transmitting) return; ws.send(e.data); framesSent++; @@ -182,7 +194,7 @@ async function startAudioCapture() { captureNode = audioCtx.createScriptProcessor(1024, 1, 1); let acc = new Float32Array(0); captureNode.onaudioprocess = (ev) => { - if (!active || !ws || ws.readyState !== WebSocket.OPEN) return; + if (!active || !ws || ws.readyState !== WebSocket.OPEN || !transmitting) return; const input = ev.inputBuffer.getChannelData(0); const n = new Float32Array(acc.length + input.length); n.set(acc); n.set(input, acc.length); acc = n; @@ -250,6 +262,55 @@ function startStatsUpdate() { }, 1000); } +// --- Push-to-talk --- + +function togglePTT() { + pttMode = document.getElementById('pttMode').checked; + const btn = document.getElementById('pttBtn'); + if (pttMode) { + transmitting = false; + btn.style.display = 'block'; + } else { + transmitting = true; + btn.style.display = 'none'; + } +} + +// PTT button — hold to talk (mouse + touch) +document.getElementById('pttBtn').addEventListener('mousedown', () => { startTransmit(); }); +document.getElementById('pttBtn').addEventListener('mouseup', () => { stopTransmit(); }); +document.getElementById('pttBtn').addEventListener('mouseleave', () => { stopTransmit(); }); +document.getElementById('pttBtn').addEventListener('touchstart', (e) => { e.preventDefault(); startTransmit(); }); +document.getElementById('pttBtn').addEventListener('touchend', (e) => { e.preventDefault(); stopTransmit(); }); + +// Spacebar PTT +document.addEventListener('keydown', (e) => { if (pttMode && active && e.code === 'Space' && !e.repeat) { e.preventDefault(); startTransmit(); } }); +document.addEventListener('keyup', (e) => { if (pttMode && active && e.code === 'Space') { e.preventDefault(); stopTransmit(); } }); + +function startTransmit() { + if (!pttMode || !active) return; + transmitting = true; + document.getElementById('pttBtn').classList.add('transmitting'); + document.getElementById('pttBtn').textContent = 'Transmitting...'; +} + +function stopTransmit() { + if (!pttMode) return; + transmitting = false; + document.getElementById('pttBtn').classList.remove('transmitting'); + document.getElementById('pttBtn').textContent = 'Hold to Talk'; +} + +// Show controls when connected +function showControls(show) { + document.getElementById('controls').style.display = show ? 'flex' : 'none'; + if (!show) { + document.getElementById('pttBtn').style.display = 'none'; + pttMode = false; + transmitting = true; + } +} + // Set room from URL on load window.addEventListener('load', () => { const room = getRoom();