From ce6aacb25f453bd988c1150a333286e8c013734e Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Fri, 27 Mar 2026 19:49:27 +0400 Subject: [PATCH] fix: bridge pairing + auto-reconnect + test stability Bridge mode rewrite: - First client echoes while waiting, checks every 100ms if paired - Second client triggers bridge immediately, first exits echo loop - After bridge ends, slot is cleared for the next pair - No more two tasks competing for the same transport recv Web client auto-reconnect: - On WebSocket close/error, automatically reconnects after 1s - Keeps retrying as long as the user hasn't clicked Disconnect Test fix: - Install rustls crypto provider in transport config tests (fixes race condition when running full workspace tests) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-relay/src/main.rs | 532 ++++++++++++----------------- crates/wzp-transport/src/config.rs | 2 + crates/wzp-web/static/index.html | 17 +- 3 files changed, 241 insertions(+), 310 deletions(-) diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 51b2f78..f50db81 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -1,8 +1,7 @@ //! WarzonePhone relay daemon entry point. //! -//! Accepts client QUIC connections and optionally forwards media to a remote -//! relay. Each client connection spawns two tasks for bidirectional forwarding -//! through the relay pipeline (FEC decode -> jitter -> FEC encode). +//! Accepts client QUIC connections and bridges pairs of clients together. +//! When a --remote relay is configured, forwards traffic to it instead. use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; @@ -17,9 +16,6 @@ use wzp_relay::config::RelayConfig; use wzp_relay::pipeline::{PipelineConfig, RelayPipeline}; use wzp_relay::session_mgr::SessionManager; -/// Parse CLI arguments using std::env::args(). -/// -/// Usage: wzp-relay [--listen ] [--remote ] fn parse_args() -> RelayConfig { let mut config = RelayConfig::default(); let args: Vec = std::env::args().collect(); @@ -28,39 +24,26 @@ fn parse_args() -> RelayConfig { match args[i].as_str() { "--listen" => { i += 1; - if i < args.len() { - config.listen_addr = args[i] - .parse::() - .expect("invalid --listen address"); - } else { - eprintln!("--listen requires an address argument"); - std::process::exit(1); - } + config.listen_addr = args.get(i).expect("--listen requires an address") + .parse().expect("invalid --listen address"); } "--remote" => { i += 1; - if i < args.len() { - config.remote_relay = Some( - args[i] - .parse::() - .expect("invalid --remote address"), - ); - } else { - eprintln!("--remote requires an address argument"); - std::process::exit(1); - } + config.remote_relay = Some( + args.get(i).expect("--remote requires an address") + .parse().expect("invalid --remote address"), + ); } "--help" | "-h" => { eprintln!("Usage: wzp-relay [--listen ] [--remote ]"); eprintln!(); eprintln!("Options:"); - eprintln!(" --listen Listen address (default: 0.0.0.0:4433, use [::]:4433 for IPv6)"); + eprintln!(" --listen Listen address (default: 0.0.0.0:4433)"); eprintln!(" --remote Remote relay address for forwarding"); std::process::exit(0); } other => { eprintln!("unknown argument: {other}"); - eprintln!("Usage: wzp-relay [--listen ] [--remote ]"); std::process::exit(1); } } @@ -69,350 +52,287 @@ fn parse_args() -> RelayConfig { config } -/// Shared packet counters for periodic logging. struct RelayStats { upstream_packets: AtomicU64, downstream_packets: AtomicU64, } -/// Run the upstream forwarding task: client -> pipeline -> remote. +/// Bridge two transports: A's packets go to B, B's go to A. +async fn run_bridge( + a: Arc, + b: Arc, + a_addr: SocketAddr, + b_addr: SocketAddr, +) { + info!(%a_addr, %b_addr, "bridging two clients"); + + let stats = Arc::new(RelayStats { + upstream_packets: AtomicU64::new(0), + downstream_packets: AtomicU64::new(0), + }); + + let stats_log = stats.clone(); + let stats_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + let ab = stats_log.upstream_packets.load(Ordering::Relaxed); + let ba = stats_log.downstream_packets.load(Ordering::Relaxed); + info!(a_to_b = ab, b_to_a = ba, "bridge stats"); + } + }); + + let a1 = a.clone(); + let b1 = b.clone(); + let s1 = stats.clone(); + let a_to_b = tokio::spawn(async move { + loop { + match a1.recv_media().await { + Ok(Some(pkt)) => { + if let Err(e) = b1.send_media(&pkt).await { + error!("A→B send error: {e}"); + break; + } + s1.upstream_packets.fetch_add(1, Ordering::Relaxed); + } + Ok(None) => { info!(%a_addr, "client A disconnected"); break; } + Err(e) => { error!(%a_addr, "A recv error: {e}"); break; } + } + } + }); + + let a2 = a.clone(); + let b2 = b.clone(); + let s2 = stats.clone(); + let b_to_a = tokio::spawn(async move { + loop { + match b2.recv_media().await { + Ok(Some(pkt)) => { + if let Err(e) = a2.send_media(&pkt).await { + error!("B→A send error: {e}"); + break; + } + s2.downstream_packets.fetch_add(1, Ordering::Relaxed); + } + Ok(None) => { info!(%b_addr, "client B disconnected"); break; } + Err(e) => { error!(%b_addr, "B recv error: {e}"); break; } + } + } + }); + + tokio::select! { + _ = a_to_b => {} + _ = b_to_a => {} + } + stats_handle.abort(); + info!(%a_addr, %b_addr, "bridge ended"); +} + +/// Run upstream forwarding: client → pipeline → remote. async fn run_upstream( - client_transport: Arc, - remote_transport: Arc, + client: Arc, + remote: Arc, pipeline: Arc>, stats: Arc, ) { loop { - let packet = match client_transport.recv_media().await { - Ok(Some(pkt)) => pkt, - Ok(None) => { - info!("client connection closed (upstream)"); - break; - } - Err(e) => { - error!("upstream recv error: {e}"); - break; - } - }; - - // Process through pipeline - let outbound = { - let mut pipe = pipeline.lock().await; - let decoded = pipe.ingest(packet); - let mut out = Vec::new(); - for pkt in decoded { - out.extend(pipe.prepare_outbound(pkt)); - } - out - }; - - // Forward to remote - for pkt in &outbound { - if let Err(e) = remote_transport.send_media(pkt).await { - error!("upstream send error: {e}"); - return; + match client.recv_media().await { + Ok(Some(pkt)) => { + let outbound = { + let mut pipe = pipeline.lock().await; + let decoded = pipe.ingest(pkt); + let mut out = Vec::new(); + for p in decoded { out.extend(pipe.prepare_outbound(p)); } + out + }; + for p in &outbound { + if let Err(e) = remote.send_media(p).await { + error!("upstream send: {e}"); + return; + } + } + stats.upstream_packets.fetch_add(outbound.len() as u64, Ordering::Relaxed); } + Ok(None) => { info!("client disconnected (upstream)"); break; } + Err(e) => { error!("upstream recv: {e}"); break; } } - stats - .upstream_packets - .fetch_add(outbound.len() as u64, Ordering::Relaxed); } } -/// Run the downstream forwarding task: remote -> pipeline -> client. +/// Run downstream forwarding: remote → pipeline → client. async fn run_downstream( - client_transport: Arc, - remote_transport: Arc, + client: Arc, + remote: Arc, pipeline: Arc>, stats: Arc, ) { loop { - let packet = match remote_transport.recv_media().await { - Ok(Some(pkt)) => pkt, - Ok(None) => { - info!("remote connection closed (downstream)"); - break; - } - Err(e) => { - error!("downstream recv error: {e}"); - break; - } - }; - - // Process through pipeline - let outbound = { - let mut pipe = pipeline.lock().await; - let decoded = pipe.ingest(packet); - let mut out = Vec::new(); - for pkt in decoded { - out.extend(pipe.prepare_outbound(pkt)); - } - out - }; - - // Forward to client - for pkt in &outbound { - if let Err(e) = client_transport.send_media(pkt).await { - error!("downstream send error: {e}"); - return; + match remote.recv_media().await { + Ok(Some(pkt)) => { + let outbound = { + let mut pipe = pipeline.lock().await; + let decoded = pipe.ingest(pkt); + let mut out = Vec::new(); + for p in decoded { out.extend(pipe.prepare_outbound(p)); } + out + }; + for p in &outbound { + if let Err(e) = client.send_media(p).await { + error!("downstream send: {e}"); + return; + } + } + stats.downstream_packets.fetch_add(outbound.len() as u64, Ordering::Relaxed); } + Ok(None) => { info!("remote disconnected (downstream)"); break; } + Err(e) => { error!("downstream recv: {e}"); break; } } - stats - .downstream_packets - .fetch_add(outbound.len() as u64, Ordering::Relaxed); } } +/// Waiting client: address + transport. +struct WaitingClient { + addr: SocketAddr, + transport: Arc, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let config = parse_args(); - tracing_subscriber::fmt().init(); info!(addr = %config.listen_addr, "WarzonePhone relay starting"); if let Some(remote) = config.remote_relay { - info!(%remote, "will connect to remote relay"); + info!(%remote, "forwarding mode → remote relay"); + } else { + info!("bridge mode — pairs clients together (echo when alone)"); } - let (server_config, _cert_der) = wzp_transport::server_config(); + let (server_config, _cert) = wzp_transport::server_config(); let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?; - let sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); + let _sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); - // If a remote relay is configured, connect to it on startup + // Remote relay transport (forwarding mode only) let remote_transport: Option> = if let Some(remote_addr) = config.remote_relay { - info!(%remote_addr, "connecting to remote relay"); let client_cfg = wzp_transport::client_config(); - let remote_conn = - wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?; - info!(%remote_addr, "connected to remote relay"); - Some(Arc::new(wzp_transport::QuinnTransport::new(remote_conn))) + let conn = wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?; + Some(Arc::new(wzp_transport::QuinnTransport::new(conn))) } else { None }; - // Shared slot for bridge mode: first client waits here for a peer. - let waiting_peer: Arc>>> = - Arc::new(Mutex::new(None)); + // Bridge mode: slot for waiting client + let waiting: Arc>> = Arc::new(Mutex::new(None)); info!("Listening for connections..."); loop { let connection = match wzp_transport::accept(&endpoint).await { Ok(conn) => conn, - Err(e) => { - error!("accept error: {e}"); - continue; - } + Err(e) => { error!("accept: {e}"); continue; } }; - let sessions = sessions.clone(); let remote_transport = remote_transport.clone(); - let waiting_peer = waiting_peer.clone(); + let waiting = waiting.clone(); tokio::spawn(async move { - let remote_addr = connection.remote_address(); - info!(%remote_addr, "new client connection"); + let addr = connection.remote_address(); + let transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); + info!(%addr, "new client"); - let client_transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); + if let Some(remote) = remote_transport { + // Forwarding mode + let stats = Arc::new(RelayStats { + upstream_packets: AtomicU64::new(0), + downstream_packets: AtomicU64::new(0), + }); + let up_pipe = Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default()))); + let dn_pipe = Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default()))); - match remote_transport { - Some(remote_tx) => { - // Create pipelines for both directions - let upstream_pipeline = - Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default()))); - let downstream_pipeline = - Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default()))); + let stats_log = stats.clone(); + let stats_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + info!( + up = stats_log.upstream_packets.load(Ordering::Relaxed), + down = stats_log.downstream_packets.load(Ordering::Relaxed), + "forward stats" + ); + } + }); - // Register session + let up = tokio::spawn(run_upstream(transport.clone(), remote.clone(), up_pipe, stats.clone())); + let dn = tokio::spawn(run_downstream(transport.clone(), remote.clone(), dn_pipe, stats)); + + tokio::select! { _ = up => {} _ = dn => {} } + stats_handle.abort(); + transport.close().await.ok(); + info!(%addr, "forwarding session ended"); + } else { + // Bridge mode — try to pair with a waiting client + let peer = { + let mut slot = waiting.lock().await; + slot.take() + }; + + if let Some(peer_client) = peer { + // Second client — bridge immediately + run_bridge(peer_client.transport.clone(), transport.clone(), peer_client.addr, addr).await; + peer_client.transport.close().await.ok(); + transport.close().await.ok(); + + // After bridge ends, clean up so next pair can form + info!("bridge complete, ready for next pair"); + } else { + // First client — register and wait { - let mut mgr = sessions.lock().await; - let session_id = { - let mut id = [0u8; 16]; - let addr_bytes = remote_addr.to_string(); - let bytes = addr_bytes.as_bytes(); - let len = bytes.len().min(16); - id[..len].copy_from_slice(&bytes[..len]); - id - }; - mgr.create_session(session_id, PipelineConfig::default()); + let mut slot = waiting.lock().await; + *slot = Some(WaitingClient { addr, transport: transport.clone() }); } + info!(%addr, "waiting for peer (echo in meantime)"); - let stats = Arc::new(RelayStats { - upstream_packets: AtomicU64::new(0), - downstream_packets: AtomicU64::new(0), - }); - - // Spawn periodic stats logger - let stats_log = stats.clone(); - let log_remote = remote_addr; - let stats_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - interval.tick().await; - let up = stats_log.upstream_packets.load(Ordering::Relaxed); - let down = stats_log.downstream_packets.load(Ordering::Relaxed); - info!( - client = %log_remote, - upstream = up, - downstream = down, - "relay stats" - ); - } - }); - - // Spawn upstream and downstream tasks - let up_handle = tokio::spawn(run_upstream( - client_transport.clone(), - remote_tx.clone(), - upstream_pipeline, - stats.clone(), - )); - - let down_handle = tokio::spawn(run_downstream( - client_transport.clone(), - remote_tx, - downstream_pipeline, - stats, - )); - - // Wait for either direction to finish, then clean up - tokio::select! { - _ = up_handle => { - info!(%remote_addr, "upstream task ended"); - } - _ = down_handle => { - info!(%remote_addr, "downstream task ended"); - } - } - - // Abort the stats logger and close transport - stats_handle.abort(); - if let Err(e) = client_transport.close().await { - warn!(%remote_addr, "error closing client transport: {e}"); - } - info!(%remote_addr, "session ended"); - } - None => { - // No remote relay — bridge mode: pair two clients together. - // First client waits, second client connects, then they're bridged. - // If only one client, fall back to echo mode. - - // Try to take a waiting peer, or register ourselves as waiting - let peer = { - let mut slot = waiting_peer.lock().await; - slot.take() - }; - - if let Some(peer_transport) = peer { - // We're the second client — bridge with the waiting peer - let peer_addr = peer_transport.connection().remote_address(); - info!(%remote_addr, %peer_addr, "bridging two clients"); - - let stats = Arc::new(RelayStats { - upstream_packets: AtomicU64::new(0), - downstream_packets: AtomicU64::new(0), - }); - - let stats_log = stats.clone(); - let log_a = remote_addr; - let log_b = peer_addr; - let stats_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - interval.tick().await; - let a_to_b = stats_log.upstream_packets.load(Ordering::Relaxed); - let b_to_a = stats_log.downstream_packets.load(Ordering::Relaxed); - info!( - a = %log_a, b = %log_b, - a_to_b, b_to_a, - "bridge stats" - ); - } - }); - - // A→B: recv from us (new client), send to peer - let a_tx = client_transport.clone(); - let b_tx = peer_transport.clone(); - let s1 = stats.clone(); - let a_to_b = tokio::spawn(async move { - loop { - match a_tx.recv_media().await { - Ok(Some(pkt)) => { - if let Err(e) = b_tx.send_media(&pkt).await { - error!("A→B send error: {e}"); - break; - } - s1.upstream_packets.fetch_add(1, Ordering::Relaxed); - } - Ok(None) => { info!("client A disconnected"); break; } - Err(e) => { error!("A recv error: {e}"); break; } - } - } - }); - - // B→A: recv from peer, send to us (new client) - let a_tx2 = client_transport.clone(); - let b_tx2 = peer_transport.clone(); - let s2 = stats.clone(); - let b_to_a = tokio::spawn(async move { - loop { - match b_tx2.recv_media().await { - Ok(Some(pkt)) => { - if let Err(e) = a_tx2.send_media(&pkt).await { - error!("B→A send error: {e}"); - break; - } - s2.downstream_packets.fetch_add(1, Ordering::Relaxed); - } - Ok(None) => { info!("client B disconnected"); break; } - Err(e) => { error!("B recv error: {e}"); break; } - } - } - }); - - tokio::select! { - _ = a_to_b => {} - _ = b_to_a => {} - } - stats_handle.abort(); - info!(%remote_addr, %peer_addr, "bridge ended"); - } else { - // We're the first client — register and wait, or echo + // Echo loop — but check periodically if we've been claimed by a bridge + loop { + // Check if we've been taken from the waiting slot + // (meaning a second client connected and started the bridge) { - let mut slot = waiting_peer.lock().await; - *slot = Some(client_transport.clone()); - } - info!(%remote_addr, "waiting for second client to bridge (echo in meantime)"); - - // Echo while waiting for a peer to connect - let mut echo_count = 0u64; - loop { - match client_transport.recv_media().await { - Ok(Some(packet)) => { - if let Err(e) = client_transport.send_media(&packet).await { - error!("echo send error: {e}"); - break; - } - echo_count += 1; - if echo_count % 250 == 0 { - info!(echoed = echo_count, "echo mode stats"); - } - } - Ok(None) => { - info!(%remote_addr, "connection closed"); - break; - } - Err(e) => { - error!(%remote_addr, "recv error: {e}"); - break; - } + let slot = waiting.lock().await; + if slot.is_none() { + // We were taken — a bridge is running with our transport. + // Just exit this task; the bridge task handles everything. + info!(%addr, "peer connected, exiting echo loop"); + return; + } + } + + // Echo with a short timeout so we can check the slot again + match tokio::time::timeout( + Duration::from_millis(100), + transport.recv_media() + ).await { + Ok(Ok(Some(pkt))) => { + let _ = transport.send_media(&pkt).await; + } + Ok(Ok(None)) => { + info!(%addr, "disconnected while waiting"); + // Clean up our slot + let mut slot = waiting.lock().await; + *slot = None; + return; + } + Ok(Err(e)) => { + error!(%addr, "echo error: {e}"); + let mut slot = waiting.lock().await; + *slot = None; + return; + } + Err(_) => { + // Timeout — loop back and check if we got paired } } - // Clean up waiting slot if we disconnect - let mut slot = waiting_peer.lock().await; - *slot = None; } } } diff --git a/crates/wzp-transport/src/config.rs b/crates/wzp-transport/src/config.rs index 73a826e..6138fd9 100644 --- a/crates/wzp-transport/src/config.rs +++ b/crates/wzp-transport/src/config.rs @@ -139,6 +139,7 @@ mod tests { #[test] fn server_config_creates_without_error() { + let _ = rustls::crypto::ring::default_provider().install_default(); let (cfg, cert_der) = server_config(); assert!(!cert_der.is_empty()); // Verify the config was created (no panic) @@ -147,6 +148,7 @@ mod tests { #[test] fn client_config_creates_without_error() { + let _ = rustls::crypto::ring::default_provider().install_default(); let cfg = client_config(); drop(cfg); } diff --git a/crates/wzp-web/static/index.html b/crates/wzp-web/static/index.html index 8f56593..1611d25 100644 --- a/crates/wzp-web/static/index.html +++ b/crates/wzp-web/static/index.html @@ -99,13 +99,22 @@ async function startCall() { }; ws.onclose = () => { - setStatus('Disconnected'); - stopCall(); + if (active) { + setStatus('Disconnected — reconnecting...'); + setTimeout(() => { if (active) { stopCall(); startCall(); } }, 1000); + } else { + setStatus('Disconnected'); + } }; ws.onerror = (e) => { - setStatus('Connection error'); - stopCall(); + if (active) { + setStatus('Connection error — reconnecting...'); + setTimeout(() => { if (active) { stopCall(); startCall(); } }, 1000); + } else { + setStatus('Connection error'); + stopCall(); + } }; }