fix: bridge pairing + auto-reconnect + test stability

Bridge mode rewrite:
- First client echoes while waiting, checks every 100ms if paired
- Second client triggers bridge immediately, first exits echo loop
- After bridge ends, slot is cleared for the next pair
- No more two tasks competing for the same transport recv

Web client auto-reconnect:
- On WebSocket close/error, automatically reconnects after 1s
- Keeps retrying as long as the user hasn't clicked Disconnect

Test fix:
- Install rustls crypto provider in transport config tests
  (fixes race condition when running full workspace tests)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-27 19:49:27 +04:00
parent 38ae62b542
commit ce6aacb25f
3 changed files with 241 additions and 310 deletions

View File

@@ -1,8 +1,7 @@
//! WarzonePhone relay daemon entry point. //! WarzonePhone relay daemon entry point.
//! //!
//! Accepts client QUIC connections and optionally forwards media to a remote //! Accepts client QUIC connections and bridges pairs of clients together.
//! relay. Each client connection spawns two tasks for bidirectional forwarding //! When a --remote relay is configured, forwards traffic to it instead.
//! through the relay pipeline (FEC decode -> jitter -> FEC encode).
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
@@ -17,9 +16,6 @@ use wzp_relay::config::RelayConfig;
use wzp_relay::pipeline::{PipelineConfig, RelayPipeline}; use wzp_relay::pipeline::{PipelineConfig, RelayPipeline};
use wzp_relay::session_mgr::SessionManager; use wzp_relay::session_mgr::SessionManager;
/// Parse CLI arguments using std::env::args().
///
/// Usage: wzp-relay [--listen <addr>] [--remote <addr>]
fn parse_args() -> RelayConfig { fn parse_args() -> RelayConfig {
let mut config = RelayConfig::default(); let mut config = RelayConfig::default();
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
@@ -28,39 +24,26 @@ fn parse_args() -> RelayConfig {
match args[i].as_str() { match args[i].as_str() {
"--listen" => { "--listen" => {
i += 1; i += 1;
if i < args.len() { config.listen_addr = args.get(i).expect("--listen requires an address")
config.listen_addr = args[i] .parse().expect("invalid --listen address");
.parse::<SocketAddr>()
.expect("invalid --listen address");
} else {
eprintln!("--listen requires an address argument");
std::process::exit(1);
}
} }
"--remote" => { "--remote" => {
i += 1; i += 1;
if i < args.len() {
config.remote_relay = Some( config.remote_relay = Some(
args[i] args.get(i).expect("--remote requires an address")
.parse::<SocketAddr>() .parse().expect("invalid --remote address"),
.expect("invalid --remote address"),
); );
} else {
eprintln!("--remote requires an address argument");
std::process::exit(1);
}
} }
"--help" | "-h" => { "--help" | "-h" => {
eprintln!("Usage: wzp-relay [--listen <addr>] [--remote <addr>]"); eprintln!("Usage: wzp-relay [--listen <addr>] [--remote <addr>]");
eprintln!(); eprintln!();
eprintln!("Options:"); eprintln!("Options:");
eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433, use [::]:4433 for IPv6)"); eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433)");
eprintln!(" --remote <addr> Remote relay address for forwarding"); eprintln!(" --remote <addr> Remote relay address for forwarding");
std::process::exit(0); std::process::exit(0);
} }
other => { other => {
eprintln!("unknown argument: {other}"); eprintln!("unknown argument: {other}");
eprintln!("Usage: wzp-relay [--listen <addr>] [--remote <addr>]");
std::process::exit(1); std::process::exit(1);
} }
} }
@@ -69,246 +52,19 @@ fn parse_args() -> RelayConfig {
config config
} }
/// Shared packet counters for periodic logging.
struct RelayStats { struct RelayStats {
upstream_packets: AtomicU64, upstream_packets: AtomicU64,
downstream_packets: AtomicU64, downstream_packets: AtomicU64,
} }
/// Run the upstream forwarding task: client -> pipeline -> remote. /// Bridge two transports: A's packets go to B, B's go to A.
async fn run_upstream( async fn run_bridge(
client_transport: Arc<wzp_transport::QuinnTransport>, a: Arc<wzp_transport::QuinnTransport>,
remote_transport: Arc<wzp_transport::QuinnTransport>, b: Arc<wzp_transport::QuinnTransport>,
pipeline: Arc<Mutex<RelayPipeline>>, a_addr: SocketAddr,
stats: Arc<RelayStats>, b_addr: SocketAddr,
) { ) {
loop { info!(%a_addr, %b_addr, "bridging two clients");
let packet = match client_transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
info!("client connection closed (upstream)");
break;
}
Err(e) => {
error!("upstream recv error: {e}");
break;
}
};
// Process through pipeline
let outbound = {
let mut pipe = pipeline.lock().await;
let decoded = pipe.ingest(packet);
let mut out = Vec::new();
for pkt in decoded {
out.extend(pipe.prepare_outbound(pkt));
}
out
};
// Forward to remote
for pkt in &outbound {
if let Err(e) = remote_transport.send_media(pkt).await {
error!("upstream send error: {e}");
return;
}
}
stats
.upstream_packets
.fetch_add(outbound.len() as u64, Ordering::Relaxed);
}
}
/// Run the downstream forwarding task: remote -> pipeline -> client.
async fn run_downstream(
client_transport: Arc<wzp_transport::QuinnTransport>,
remote_transport: Arc<wzp_transport::QuinnTransport>,
pipeline: Arc<Mutex<RelayPipeline>>,
stats: Arc<RelayStats>,
) {
loop {
let packet = match remote_transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
info!("remote connection closed (downstream)");
break;
}
Err(e) => {
error!("downstream recv error: {e}");
break;
}
};
// Process through pipeline
let outbound = {
let mut pipe = pipeline.lock().await;
let decoded = pipe.ingest(packet);
let mut out = Vec::new();
for pkt in decoded {
out.extend(pipe.prepare_outbound(pkt));
}
out
};
// Forward to client
for pkt in &outbound {
if let Err(e) = client_transport.send_media(pkt).await {
error!("downstream send error: {e}");
return;
}
}
stats
.downstream_packets
.fetch_add(outbound.len() as u64, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = parse_args();
tracing_subscriber::fmt().init();
info!(addr = %config.listen_addr, "WarzonePhone relay starting");
if let Some(remote) = config.remote_relay {
info!(%remote, "will connect to remote relay");
}
let (server_config, _cert_der) = wzp_transport::server_config();
let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?;
let sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions)));
// If a remote relay is configured, connect to it on startup
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> =
if let Some(remote_addr) = config.remote_relay {
info!(%remote_addr, "connecting to remote relay");
let client_cfg = wzp_transport::client_config();
let remote_conn =
wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?;
info!(%remote_addr, "connected to remote relay");
Some(Arc::new(wzp_transport::QuinnTransport::new(remote_conn)))
} else {
None
};
// Shared slot for bridge mode: first client waits here for a peer.
let waiting_peer: Arc<Mutex<Option<Arc<wzp_transport::QuinnTransport>>>> =
Arc::new(Mutex::new(None));
info!("Listening for connections...");
loop {
let connection = match wzp_transport::accept(&endpoint).await {
Ok(conn) => conn,
Err(e) => {
error!("accept error: {e}");
continue;
}
};
let sessions = sessions.clone();
let remote_transport = remote_transport.clone();
let waiting_peer = waiting_peer.clone();
tokio::spawn(async move {
let remote_addr = connection.remote_address();
info!(%remote_addr, "new client connection");
let client_transport = Arc::new(wzp_transport::QuinnTransport::new(connection));
match remote_transport {
Some(remote_tx) => {
// Create pipelines for both directions
let upstream_pipeline =
Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default())));
let downstream_pipeline =
Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default())));
// Register session
{
let mut mgr = sessions.lock().await;
let session_id = {
let mut id = [0u8; 16];
let addr_bytes = remote_addr.to_string();
let bytes = addr_bytes.as_bytes();
let len = bytes.len().min(16);
id[..len].copy_from_slice(&bytes[..len]);
id
};
mgr.create_session(session_id, PipelineConfig::default());
}
let stats = Arc::new(RelayStats {
upstream_packets: AtomicU64::new(0),
downstream_packets: AtomicU64::new(0),
});
// Spawn periodic stats logger
let stats_log = stats.clone();
let log_remote = remote_addr;
let stats_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let up = stats_log.upstream_packets.load(Ordering::Relaxed);
let down = stats_log.downstream_packets.load(Ordering::Relaxed);
info!(
client = %log_remote,
upstream = up,
downstream = down,
"relay stats"
);
}
});
// Spawn upstream and downstream tasks
let up_handle = tokio::spawn(run_upstream(
client_transport.clone(),
remote_tx.clone(),
upstream_pipeline,
stats.clone(),
));
let down_handle = tokio::spawn(run_downstream(
client_transport.clone(),
remote_tx,
downstream_pipeline,
stats,
));
// Wait for either direction to finish, then clean up
tokio::select! {
_ = up_handle => {
info!(%remote_addr, "upstream task ended");
}
_ = down_handle => {
info!(%remote_addr, "downstream task ended");
}
}
// Abort the stats logger and close transport
stats_handle.abort();
if let Err(e) = client_transport.close().await {
warn!(%remote_addr, "error closing client transport: {e}");
}
info!(%remote_addr, "session ended");
}
None => {
// No remote relay — bridge mode: pair two clients together.
// First client waits, second client connects, then they're bridged.
// If only one client, fall back to echo mode.
// Try to take a waiting peer, or register ourselves as waiting
let peer = {
let mut slot = waiting_peer.lock().await;
slot.take()
};
if let Some(peer_transport) = peer {
// We're the second client — bridge with the waiting peer
let peer_addr = peer_transport.connection().remote_address();
info!(%remote_addr, %peer_addr, "bridging two clients");
let stats = Arc::new(RelayStats { let stats = Arc::new(RelayStats {
upstream_packets: AtomicU64::new(0), upstream_packets: AtomicU64::new(0),
@@ -316,58 +72,50 @@ async fn main() -> anyhow::Result<()> {
}); });
let stats_log = stats.clone(); let stats_log = stats.clone();
let log_a = remote_addr;
let log_b = peer_addr;
let stats_handle = tokio::spawn(async move { let stats_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
loop { loop {
interval.tick().await; interval.tick().await;
let a_to_b = stats_log.upstream_packets.load(Ordering::Relaxed); let ab = stats_log.upstream_packets.load(Ordering::Relaxed);
let b_to_a = stats_log.downstream_packets.load(Ordering::Relaxed); let ba = stats_log.downstream_packets.load(Ordering::Relaxed);
info!( info!(a_to_b = ab, b_to_a = ba, "bridge stats");
a = %log_a, b = %log_b,
a_to_b, b_to_a,
"bridge stats"
);
} }
}); });
// A→B: recv from us (new client), send to peer let a1 = a.clone();
let a_tx = client_transport.clone(); let b1 = b.clone();
let b_tx = peer_transport.clone();
let s1 = stats.clone(); let s1 = stats.clone();
let a_to_b = tokio::spawn(async move { let a_to_b = tokio::spawn(async move {
loop { loop {
match a_tx.recv_media().await { match a1.recv_media().await {
Ok(Some(pkt)) => { Ok(Some(pkt)) => {
if let Err(e) = b_tx.send_media(&pkt).await { if let Err(e) = b1.send_media(&pkt).await {
error!("A→B send error: {e}"); error!("A→B send error: {e}");
break; break;
} }
s1.upstream_packets.fetch_add(1, Ordering::Relaxed); s1.upstream_packets.fetch_add(1, Ordering::Relaxed);
} }
Ok(None) => { info!("client A disconnected"); break; } Ok(None) => { info!(%a_addr, "client A disconnected"); break; }
Err(e) => { error!("A recv error: {e}"); break; } Err(e) => { error!(%a_addr, "A recv error: {e}"); break; }
} }
} }
}); });
// B→A: recv from peer, send to us (new client) let a2 = a.clone();
let a_tx2 = client_transport.clone(); let b2 = b.clone();
let b_tx2 = peer_transport.clone();
let s2 = stats.clone(); let s2 = stats.clone();
let b_to_a = tokio::spawn(async move { let b_to_a = tokio::spawn(async move {
loop { loop {
match b_tx2.recv_media().await { match b2.recv_media().await {
Ok(Some(pkt)) => { Ok(Some(pkt)) => {
if let Err(e) = a_tx2.send_media(&pkt).await { if let Err(e) = a2.send_media(&pkt).await {
error!("B→A send error: {e}"); error!("B→A send error: {e}");
break; break;
} }
s2.downstream_packets.fetch_add(1, Ordering::Relaxed); s2.downstream_packets.fetch_add(1, Ordering::Relaxed);
} }
Ok(None) => { info!("client B disconnected"); break; } Ok(None) => { info!(%b_addr, "client B disconnected"); break; }
Err(e) => { error!("B recv error: {e}"); break; } Err(e) => { error!(%b_addr, "B recv error: {e}"); break; }
} }
} }
}); });
@@ -377,42 +125,214 @@ async fn main() -> anyhow::Result<()> {
_ = b_to_a => {} _ = b_to_a => {}
} }
stats_handle.abort(); stats_handle.abort();
info!(%remote_addr, %peer_addr, "bridge ended"); info!(%a_addr, %b_addr, "bridge ended");
} else {
// We're the first client — register and wait, or echo
{
let mut slot = waiting_peer.lock().await;
*slot = Some(client_transport.clone());
} }
info!(%remote_addr, "waiting for second client to bridge (echo in meantime)");
// Echo while waiting for a peer to connect /// Run upstream forwarding: client → pipeline → remote.
let mut echo_count = 0u64; async fn run_upstream(
client: Arc<wzp_transport::QuinnTransport>,
remote: Arc<wzp_transport::QuinnTransport>,
pipeline: Arc<Mutex<RelayPipeline>>,
stats: Arc<RelayStats>,
) {
loop { loop {
match client_transport.recv_media().await { match client.recv_media().await {
Ok(Some(packet)) => { Ok(Some(pkt)) => {
if let Err(e) = client_transport.send_media(&packet).await { let outbound = {
error!("echo send error: {e}"); let mut pipe = pipeline.lock().await;
break; let decoded = pipe.ingest(pkt);
} let mut out = Vec::new();
echo_count += 1; for p in decoded { out.extend(pipe.prepare_outbound(p)); }
if echo_count % 250 == 0 { out
info!(echoed = echo_count, "echo mode stats"); };
for p in &outbound {
if let Err(e) = remote.send_media(p).await {
error!("upstream send: {e}");
return;
} }
} }
Ok(None) => { stats.upstream_packets.fetch_add(outbound.len() as u64, Ordering::Relaxed);
info!(%remote_addr, "connection closed");
break;
} }
Err(e) => { Ok(None) => { info!("client disconnected (upstream)"); break; }
error!(%remote_addr, "recv error: {e}"); Err(e) => { error!("upstream recv: {e}"); break; }
break;
} }
} }
} }
// Clean up waiting slot if we disconnect
let mut slot = waiting_peer.lock().await; /// Run downstream forwarding: remote → pipeline → client.
async fn run_downstream(
client: Arc<wzp_transport::QuinnTransport>,
remote: Arc<wzp_transport::QuinnTransport>,
pipeline: Arc<Mutex<RelayPipeline>>,
stats: Arc<RelayStats>,
) {
loop {
match remote.recv_media().await {
Ok(Some(pkt)) => {
let outbound = {
let mut pipe = pipeline.lock().await;
let decoded = pipe.ingest(pkt);
let mut out = Vec::new();
for p in decoded { out.extend(pipe.prepare_outbound(p)); }
out
};
for p in &outbound {
if let Err(e) = client.send_media(p).await {
error!("downstream send: {e}");
return;
}
}
stats.downstream_packets.fetch_add(outbound.len() as u64, Ordering::Relaxed);
}
Ok(None) => { info!("remote disconnected (downstream)"); break; }
Err(e) => { error!("downstream recv: {e}"); break; }
}
}
}
/// Waiting client: address + transport.
struct WaitingClient {
addr: SocketAddr,
transport: Arc<wzp_transport::QuinnTransport>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = parse_args();
tracing_subscriber::fmt().init();
info!(addr = %config.listen_addr, "WarzonePhone relay starting");
if let Some(remote) = config.remote_relay {
info!(%remote, "forwarding mode → remote relay");
} else {
info!("bridge mode — pairs clients together (echo when alone)");
}
let (server_config, _cert) = wzp_transport::server_config();
let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?;
let _sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions)));
// Remote relay transport (forwarding mode only)
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> =
if let Some(remote_addr) = config.remote_relay {
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?;
Some(Arc::new(wzp_transport::QuinnTransport::new(conn)))
} else {
None
};
// Bridge mode: slot for waiting client
let waiting: Arc<Mutex<Option<WaitingClient>>> = Arc::new(Mutex::new(None));
info!("Listening for connections...");
loop {
let connection = match wzp_transport::accept(&endpoint).await {
Ok(conn) => conn,
Err(e) => { error!("accept: {e}"); continue; }
};
let remote_transport = remote_transport.clone();
let waiting = waiting.clone();
tokio::spawn(async move {
let addr = connection.remote_address();
let transport = Arc::new(wzp_transport::QuinnTransport::new(connection));
info!(%addr, "new client");
if let Some(remote) = remote_transport {
// Forwarding mode
let stats = Arc::new(RelayStats {
upstream_packets: AtomicU64::new(0),
downstream_packets: AtomicU64::new(0),
});
let up_pipe = Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default())));
let dn_pipe = Arc::new(Mutex::new(RelayPipeline::new(PipelineConfig::default())));
let stats_log = stats.clone();
let stats_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
info!(
up = stats_log.upstream_packets.load(Ordering::Relaxed),
down = stats_log.downstream_packets.load(Ordering::Relaxed),
"forward stats"
);
}
});
let up = tokio::spawn(run_upstream(transport.clone(), remote.clone(), up_pipe, stats.clone()));
let dn = tokio::spawn(run_downstream(transport.clone(), remote.clone(), dn_pipe, stats));
tokio::select! { _ = up => {} _ = dn => {} }
stats_handle.abort();
transport.close().await.ok();
info!(%addr, "forwarding session ended");
} else {
// Bridge mode — try to pair with a waiting client
let peer = {
let mut slot = waiting.lock().await;
slot.take()
};
if let Some(peer_client) = peer {
// Second client — bridge immediately
run_bridge(peer_client.transport.clone(), transport.clone(), peer_client.addr, addr).await;
peer_client.transport.close().await.ok();
transport.close().await.ok();
// After bridge ends, clean up so next pair can form
info!("bridge complete, ready for next pair");
} else {
// First client — register and wait
{
let mut slot = waiting.lock().await;
*slot = Some(WaitingClient { addr, transport: transport.clone() });
}
info!(%addr, "waiting for peer (echo in meantime)");
// Echo loop — but check periodically if we've been claimed by a bridge
loop {
// Check if we've been taken from the waiting slot
// (meaning a second client connected and started the bridge)
{
let slot = waiting.lock().await;
if slot.is_none() {
// We were taken — a bridge is running with our transport.
// Just exit this task; the bridge task handles everything.
info!(%addr, "peer connected, exiting echo loop");
return;
}
}
// Echo with a short timeout so we can check the slot again
match tokio::time::timeout(
Duration::from_millis(100),
transport.recv_media()
).await {
Ok(Ok(Some(pkt))) => {
let _ = transport.send_media(&pkt).await;
}
Ok(Ok(None)) => {
info!(%addr, "disconnected while waiting");
// Clean up our slot
let mut slot = waiting.lock().await;
*slot = None; *slot = None;
return;
}
Ok(Err(e)) => {
error!(%addr, "echo error: {e}");
let mut slot = waiting.lock().await;
*slot = None;
return;
}
Err(_) => {
// Timeout — loop back and check if we got paired
}
}
} }
} }
} }

View File

@@ -139,6 +139,7 @@ mod tests {
#[test] #[test]
fn server_config_creates_without_error() { fn server_config_creates_without_error() {
let _ = rustls::crypto::ring::default_provider().install_default();
let (cfg, cert_der) = server_config(); let (cfg, cert_der) = server_config();
assert!(!cert_der.is_empty()); assert!(!cert_der.is_empty());
// Verify the config was created (no panic) // Verify the config was created (no panic)
@@ -147,6 +148,7 @@ mod tests {
#[test] #[test]
fn client_config_creates_without_error() { fn client_config_creates_without_error() {
let _ = rustls::crypto::ring::default_provider().install_default();
let cfg = client_config(); let cfg = client_config();
drop(cfg); drop(cfg);
} }

View File

@@ -99,13 +99,22 @@ async function startCall() {
}; };
ws.onclose = () => { ws.onclose = () => {
if (active) {
setStatus('Disconnected — reconnecting...');
setTimeout(() => { if (active) { stopCall(); startCall(); } }, 1000);
} else {
setStatus('Disconnected'); setStatus('Disconnected');
stopCall(); }
}; };
ws.onerror = (e) => { ws.onerror = (e) => {
if (active) {
setStatus('Connection error — reconnecting...');
setTimeout(() => { if (active) { stopCall(); startCall(); } }, 1000);
} else {
setStatus('Connection error'); setStatus('Connection error');
stopCall(); stopCall();
}
}; };
} }