From 26ed015cca48c6c620b1b8cc0973b3f01ee3ef3a Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Fri, 27 Mar 2026 17:21:12 +0400 Subject: [PATCH] =?UTF-8?q?feat:=20relay=20bridge=20mode=20=E2=80=94=20pai?= =?UTF-8?q?rs=20two=20clients=20for=20real=20calls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When no --remote is configured, the relay now operates in bridge mode: - First client connects → echoes while waiting for a peer - Second client connects → both clients are bridged bidirectionally - A's packets go to B, B's packets go to A - Stats logged every 5 seconds (a_to_b / b_to_a packet counts) - Falls back to echo if only one client connects This enables the core use case: two clients on different networks calling each other through a single relay. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-relay/src/main.rs | 142 +++++++++++++++++++++++++++++------ 1 file changed, 119 insertions(+), 23 deletions(-) diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 63a3a7a..0c35137 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -9,7 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, watch}; use tracing::{error, info, warn}; use wzp_proto::MediaTransport; @@ -192,6 +192,10 @@ async fn main() -> anyhow::Result<()> { None }; + // Shared slot for bridge mode: first client waits here for a peer. + let waiting_peer: Arc>>> = + Arc::new(Mutex::new(None)); + info!("Listening for connections..."); loop { @@ -205,6 +209,7 @@ async fn main() -> anyhow::Result<()> { let sessions = sessions.clone(); let remote_transport = remote_transport.clone(); + let waiting_peer = waiting_peer.clone(); tokio::spawn(async move { let remote_addr = connection.remote_address(); @@ -290,33 +295,124 @@ async fn main() -> anyhow::Result<()> { info!(%remote_addr, "session ended"); } None => { - // No remote relay — echo mode: send packets back to the sender. - // This lets two clients on the same relay talk to each other, - // or a single client hear its own audio looped back. - info!("no remote relay configured, running in echo mode"); - let mut echo_count = 0u64; - loop { - match client_transport.recv_media().await { - Ok(Some(packet)) => { - // Echo the packet back to the sender - if let Err(e) = client_transport.send_media(&packet).await { - error!("echo send error: {e}"); + // No remote relay — bridge mode: pair two clients together. + // First client waits, second client connects, then they're bridged. + // If only one client, fall back to echo mode. + + // Try to take a waiting peer, or register ourselves as waiting + let peer = { + let mut slot = waiting_peer.lock().await; + slot.take() + }; + + if let Some(peer_transport) = peer { + // We're the second client — bridge with the waiting peer + let peer_addr = peer_transport.connection().remote_address(); + info!(%remote_addr, %peer_addr, "bridging two clients"); + + let stats = Arc::new(RelayStats { + upstream_packets: AtomicU64::new(0), + downstream_packets: AtomicU64::new(0), + }); + + let stats_log = stats.clone(); + let log_a = remote_addr; + let log_b = peer_addr; + let stats_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + let a_to_b = stats_log.upstream_packets.load(Ordering::Relaxed); + let b_to_a = stats_log.downstream_packets.load(Ordering::Relaxed); + info!( + a = %log_a, b = %log_b, + a_to_b, b_to_a, + "bridge stats" + ); + } + }); + + // A→B: recv from us (new client), send to peer + let a_tx = client_transport.clone(); + let b_tx = peer_transport.clone(); + let s1 = stats.clone(); + let a_to_b = tokio::spawn(async move { + loop { + match a_tx.recv_media().await { + Ok(Some(pkt)) => { + if let Err(e) = b_tx.send_media(&pkt).await { + error!("A→B send error: {e}"); + break; + } + s1.upstream_packets.fetch_add(1, Ordering::Relaxed); + } + Ok(None) => { info!("client A disconnected"); break; } + Err(e) => { error!("A recv error: {e}"); break; } + } + } + }); + + // B→A: recv from peer, send to us (new client) + let a_tx2 = client_transport.clone(); + let b_tx2 = peer_transport.clone(); + let s2 = stats.clone(); + let b_to_a = tokio::spawn(async move { + loop { + match b_tx2.recv_media().await { + Ok(Some(pkt)) => { + if let Err(e) = a_tx2.send_media(&pkt).await { + error!("B→A send error: {e}"); + break; + } + s2.downstream_packets.fetch_add(1, Ordering::Relaxed); + } + Ok(None) => { info!("client B disconnected"); break; } + Err(e) => { error!("B recv error: {e}"); break; } + } + } + }); + + tokio::select! { + _ = a_to_b => {} + _ = b_to_a => {} + } + stats_handle.abort(); + info!(%remote_addr, %peer_addr, "bridge ended"); + } else { + // We're the first client — register and wait, or echo + { + let mut slot = waiting_peer.lock().await; + *slot = Some(client_transport.clone()); + } + info!(%remote_addr, "waiting for second client to bridge (echo in meantime)"); + + // Echo while waiting for a peer to connect + let mut echo_count = 0u64; + loop { + match client_transport.recv_media().await { + Ok(Some(packet)) => { + if let Err(e) = client_transport.send_media(&packet).await { + error!("echo send error: {e}"); + break; + } + echo_count += 1; + if echo_count % 250 == 0 { + info!(echoed = echo_count, "echo mode stats"); + } + } + Ok(None) => { + info!(%remote_addr, "connection closed"); break; } - echo_count += 1; - if echo_count % 250 == 0 { - info!(echoed = echo_count, "echo mode stats"); + Err(e) => { + error!(%remote_addr, "recv error: {e}"); + break; } } - Ok(None) => { - info!(%remote_addr, "connection closed"); - break; - } - Err(e) => { - error!(%remote_addr, "recv error: {e}"); - break; - } } + // Clean up waiting slot if we disconnect + let mut slot = waiting_peer.lock().await; + *slot = None; } } }