feat: multi-party rooms (SFU) + push-to-talk radio mode
Room-based SFU relay: - Clients join named rooms (room name from QUIC SNI) - Each participant's packets forwarded to all others (no mixing) - Multiple rooms run concurrently on one relay - Web bridge passes room name from URL path to relay Push-to-talk (radio mode): - Toggle "Radio mode" checkbox after connecting - Hold PTT button or spacebar to transmit - Release to mute mic (receive-only) - Works on desktop (spacebar) and mobile (touch) URL routing: - /myroom → joins room "myroom" - Room name input field as fallback Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,11 @@
|
||||
//! WarzonePhone relay daemon entry point.
|
||||
//!
|
||||
//! Accepts client QUIC connections and bridges pairs of clients together.
|
||||
//! When a --remote relay is configured, forwards traffic to it instead.
|
||||
//! Supports two modes:
|
||||
//! - **Room mode** (default): clients join named rooms, packets forwarded to all others (SFU)
|
||||
//! - **Forward mode** (--remote): all traffic forwarded to a remote relay
|
||||
//!
|
||||
//! Room names are passed via the QUIC SNI (server_name) field.
|
||||
//! The web bridge connects with room name as SNI.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
@@ -14,7 +18,7 @@ use tracing::{error, info};
|
||||
use wzp_proto::MediaTransport;
|
||||
use wzp_relay::config::RelayConfig;
|
||||
use wzp_relay::pipeline::{PipelineConfig, RelayPipeline};
|
||||
use wzp_relay::session_mgr::SessionManager;
|
||||
use wzp_relay::room::{self, RoomManager};
|
||||
|
||||
fn parse_args() -> RelayConfig {
|
||||
let mut config = RelayConfig::default();
|
||||
@@ -39,7 +43,12 @@ fn parse_args() -> RelayConfig {
|
||||
eprintln!();
|
||||
eprintln!("Options:");
|
||||
eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433)");
|
||||
eprintln!(" --remote <addr> Remote relay address for forwarding");
|
||||
eprintln!(" --remote <addr> Remote relay for forwarding (disables room mode)");
|
||||
eprintln!();
|
||||
eprintln!("Room mode (default):");
|
||||
eprintln!(" Clients join rooms by name. Packets are forwarded to all");
|
||||
eprintln!(" other participants in the same room (SFU model).");
|
||||
eprintln!(" Room name comes from QUIC SNI or defaults to 'default'.");
|
||||
std::process::exit(0);
|
||||
}
|
||||
other => {
|
||||
@@ -57,78 +66,6 @@ struct RelayStats {
|
||||
downstream_packets: AtomicU64,
|
||||
}
|
||||
|
||||
/// Bridge two transports: A's packets go to B, B's go to A.
|
||||
async fn run_bridge(
|
||||
a: Arc<wzp_transport::QuinnTransport>,
|
||||
b: Arc<wzp_transport::QuinnTransport>,
|
||||
a_addr: SocketAddr,
|
||||
b_addr: SocketAddr,
|
||||
) {
|
||||
info!(%a_addr, %b_addr, "bridging two clients");
|
||||
|
||||
let stats = Arc::new(RelayStats {
|
||||
upstream_packets: AtomicU64::new(0),
|
||||
downstream_packets: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
let stats_log = stats.clone();
|
||||
let stats_handle = tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let ab = stats_log.upstream_packets.load(Ordering::Relaxed);
|
||||
let ba = stats_log.downstream_packets.load(Ordering::Relaxed);
|
||||
info!(a_to_b = ab, b_to_a = ba, "bridge stats");
|
||||
}
|
||||
});
|
||||
|
||||
let a1 = a.clone();
|
||||
let b1 = b.clone();
|
||||
let s1 = stats.clone();
|
||||
let a_to_b = tokio::spawn(async move {
|
||||
loop {
|
||||
match a1.recv_media().await {
|
||||
Ok(Some(pkt)) => {
|
||||
if let Err(e) = b1.send_media(&pkt).await {
|
||||
error!("A→B send error: {e}");
|
||||
break;
|
||||
}
|
||||
s1.upstream_packets.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Ok(None) => { info!(%a_addr, "client A disconnected"); break; }
|
||||
Err(e) => { error!(%a_addr, "A recv error: {e}"); break; }
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let a2 = a.clone();
|
||||
let b2 = b.clone();
|
||||
let s2 = stats.clone();
|
||||
let b_to_a = tokio::spawn(async move {
|
||||
loop {
|
||||
match b2.recv_media().await {
|
||||
Ok(Some(pkt)) => {
|
||||
if let Err(e) = a2.send_media(&pkt).await {
|
||||
error!("B→A send error: {e}");
|
||||
break;
|
||||
}
|
||||
s2.downstream_packets.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Ok(None) => { info!(%b_addr, "client B disconnected"); break; }
|
||||
Err(e) => { error!(%b_addr, "B recv error: {e}"); break; }
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
_ = a_to_b => {}
|
||||
_ = b_to_a => {}
|
||||
}
|
||||
stats_handle.abort();
|
||||
info!(%a_addr, %b_addr, "bridge ended");
|
||||
}
|
||||
|
||||
/// Run upstream forwarding: client → pipeline → remote.
|
||||
async fn run_upstream(
|
||||
client: Arc<wzp_transport::QuinnTransport>,
|
||||
remote: Arc<wzp_transport::QuinnTransport>,
|
||||
@@ -159,7 +96,6 @@ async fn run_upstream(
|
||||
}
|
||||
}
|
||||
|
||||
/// Run downstream forwarding: remote → pipeline → client.
|
||||
async fn run_downstream(
|
||||
client: Arc<wzp_transport::QuinnTransport>,
|
||||
remote: Arc<wzp_transport::QuinnTransport>,
|
||||
@@ -190,12 +126,6 @@ async fn run_downstream(
|
||||
}
|
||||
}
|
||||
|
||||
/// Waiting client: address + transport.
|
||||
struct WaitingClient {
|
||||
addr: SocketAddr,
|
||||
transport: Arc<wzp_transport::QuinnTransport>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let config = parse_args();
|
||||
@@ -205,29 +135,24 @@ async fn main() -> anyhow::Result<()> {
|
||||
.expect("failed to install rustls crypto provider");
|
||||
|
||||
info!(addr = %config.listen_addr, "WarzonePhone relay starting");
|
||||
if let Some(remote) = config.remote_relay {
|
||||
info!(%remote, "forwarding mode → remote relay");
|
||||
} else {
|
||||
info!("bridge mode — pairs clients together (echo when alone)");
|
||||
}
|
||||
|
||||
let (server_config, _cert) = wzp_transport::server_config();
|
||||
let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?;
|
||||
|
||||
let _sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions)));
|
||||
|
||||
// Remote relay transport (forwarding mode only)
|
||||
// Forward mode
|
||||
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> =
|
||||
if let Some(remote_addr) = config.remote_relay {
|
||||
info!(%remote_addr, "forward mode → remote relay");
|
||||
let client_cfg = wzp_transport::client_config();
|
||||
let conn = wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?;
|
||||
Some(Arc::new(wzp_transport::QuinnTransport::new(conn)))
|
||||
} else {
|
||||
info!("room mode — clients join named rooms (SFU)");
|
||||
None
|
||||
};
|
||||
|
||||
// Bridge mode: slot for waiting client
|
||||
let waiting: Arc<Mutex<Option<WaitingClient>>> = Arc::new(Mutex::new(None));
|
||||
// Room manager (room mode only)
|
||||
let room_mgr = Arc::new(Mutex::new(RoomManager::new()));
|
||||
|
||||
info!("Listening for connections...");
|
||||
|
||||
@@ -238,15 +163,27 @@ async fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let remote_transport = remote_transport.clone();
|
||||
let waiting = waiting.clone();
|
||||
let room_mgr = room_mgr.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let addr = connection.remote_address();
|
||||
|
||||
// Extract room name from QUIC handshake data (SNI).
|
||||
// The web bridge connects with the room name as server_name.
|
||||
let room_name = connection
|
||||
.handshake_data()
|
||||
.and_then(|hd| {
|
||||
hd.downcast::<quinn::crypto::rustls::HandshakeData>().ok()
|
||||
})
|
||||
.and_then(|hd| hd.server_name.clone())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
|
||||
let transport = Arc::new(wzp_transport::QuinnTransport::new(connection));
|
||||
info!(%addr, "new client");
|
||||
|
||||
info!(%addr, room = %room_name, "new client");
|
||||
|
||||
if let Some(remote) = remote_transport {
|
||||
// Forwarding mode
|
||||
// Forward mode — same as before
|
||||
let stats = Arc::new(RelayStats {
|
||||
upstream_packets: AtomicU64::new(0),
|
||||
downstream_packets: AtomicU64::new(0),
|
||||
@@ -273,71 +210,21 @@ async fn main() -> anyhow::Result<()> {
|
||||
tokio::select! { _ = up => {} _ = dn => {} }
|
||||
stats_handle.abort();
|
||||
transport.close().await.ok();
|
||||
info!(%addr, "forwarding session ended");
|
||||
} else {
|
||||
// Bridge mode — try to pair with a waiting client
|
||||
let peer = {
|
||||
let mut slot = waiting.lock().await;
|
||||
slot.take()
|
||||
// Room mode — join room and forward to all others
|
||||
let participant_id = {
|
||||
let mut mgr = room_mgr.lock().await;
|
||||
mgr.join(&room_name, addr, transport.clone())
|
||||
};
|
||||
|
||||
if let Some(peer_client) = peer {
|
||||
// Second client — bridge immediately
|
||||
run_bridge(peer_client.transport.clone(), transport.clone(), peer_client.addr, addr).await;
|
||||
peer_client.transport.close().await.ok();
|
||||
transport.close().await.ok();
|
||||
room::run_participant(
|
||||
room_mgr.clone(),
|
||||
room_name,
|
||||
participant_id,
|
||||
transport.clone(),
|
||||
).await;
|
||||
|
||||
// After bridge ends, clean up so next pair can form
|
||||
info!("bridge complete, ready for next pair");
|
||||
} else {
|
||||
// First client — register and wait
|
||||
{
|
||||
let mut slot = waiting.lock().await;
|
||||
*slot = Some(WaitingClient { addr, transport: transport.clone() });
|
||||
}
|
||||
info!(%addr, "waiting for peer (echo in meantime)");
|
||||
|
||||
// Echo loop — but check periodically if we've been claimed by a bridge
|
||||
loop {
|
||||
// Check if we've been taken from the waiting slot
|
||||
// (meaning a second client connected and started the bridge)
|
||||
{
|
||||
let slot = waiting.lock().await;
|
||||
if slot.is_none() {
|
||||
// We were taken — a bridge is running with our transport.
|
||||
// Just exit this task; the bridge task handles everything.
|
||||
info!(%addr, "peer connected, exiting echo loop");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Echo with a short timeout so we can check the slot again
|
||||
match tokio::time::timeout(
|
||||
Duration::from_millis(100),
|
||||
transport.recv_media()
|
||||
).await {
|
||||
Ok(Ok(Some(pkt))) => {
|
||||
let _ = transport.send_media(&pkt).await;
|
||||
}
|
||||
Ok(Ok(None)) => {
|
||||
info!(%addr, "disconnected while waiting");
|
||||
// Clean up our slot
|
||||
let mut slot = waiting.lock().await;
|
||||
*slot = None;
|
||||
return;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!(%addr, "echo error: {e}");
|
||||
let mut slot = waiting.lock().await;
|
||||
*slot = None;
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout — loop back and check if we got paired
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
transport.close().await.ok();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user