feat: relay bridge mode — pairs two clients for real calls

When no --remote is configured, the relay now operates in bridge mode:
- First client connects → echoes while waiting for a peer
- Second client connects → both clients are bridged bidirectionally
- A's packets go to B, B's packets go to A
- Stats logged every 5 seconds (a_to_b / b_to_a packet counts)
- Falls back to echo if only one client connects

This enables the core use case: two clients on different networks
calling each other through a single relay.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-27 17:21:12 +04:00
parent 0723f52d76
commit 26ed015cca

View File

@@ -9,7 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, watch};
use tracing::{error, info, warn};
use wzp_proto::MediaTransport;
@@ -192,6 +192,10 @@ async fn main() -> anyhow::Result<()> {
None
};
// Shared slot for bridge mode: first client waits here for a peer.
let waiting_peer: Arc<Mutex<Option<Arc<wzp_transport::QuinnTransport>>>> =
Arc::new(Mutex::new(None));
info!("Listening for connections...");
loop {
@@ -205,6 +209,7 @@ async fn main() -> anyhow::Result<()> {
let sessions = sessions.clone();
let remote_transport = remote_transport.clone();
let waiting_peer = waiting_peer.clone();
tokio::spawn(async move {
let remote_addr = connection.remote_address();
@@ -290,33 +295,124 @@ async fn main() -> anyhow::Result<()> {
info!(%remote_addr, "session ended");
}
None => {
// No remote relay — echo mode: send packets back to the sender.
// This lets two clients on the same relay talk to each other,
// or a single client hear its own audio looped back.
info!("no remote relay configured, running in echo mode");
let mut echo_count = 0u64;
loop {
match client_transport.recv_media().await {
Ok(Some(packet)) => {
// Echo the packet back to the sender
if let Err(e) = client_transport.send_media(&packet).await {
error!("echo send error: {e}");
// No remote relay — bridge mode: pair two clients together.
// First client waits, second client connects, then they're bridged.
// If only one client, fall back to echo mode.
// Try to take a waiting peer, or register ourselves as waiting
let peer = {
let mut slot = waiting_peer.lock().await;
slot.take()
};
if let Some(peer_transport) = peer {
// We're the second client — bridge with the waiting peer
let peer_addr = peer_transport.connection().remote_address();
info!(%remote_addr, %peer_addr, "bridging two clients");
let stats = Arc::new(RelayStats {
upstream_packets: AtomicU64::new(0),
downstream_packets: AtomicU64::new(0),
});
let stats_log = stats.clone();
let log_a = remote_addr;
let log_b = peer_addr;
let stats_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let a_to_b = stats_log.upstream_packets.load(Ordering::Relaxed);
let b_to_a = stats_log.downstream_packets.load(Ordering::Relaxed);
info!(
a = %log_a, b = %log_b,
a_to_b, b_to_a,
"bridge stats"
);
}
});
// A→B: recv from us (new client), send to peer
let a_tx = client_transport.clone();
let b_tx = peer_transport.clone();
let s1 = stats.clone();
let a_to_b = tokio::spawn(async move {
loop {
match a_tx.recv_media().await {
Ok(Some(pkt)) => {
if let Err(e) = b_tx.send_media(&pkt).await {
error!("A→B send error: {e}");
break;
}
s1.upstream_packets.fetch_add(1, Ordering::Relaxed);
}
Ok(None) => { info!("client A disconnected"); break; }
Err(e) => { error!("A recv error: {e}"); break; }
}
}
});
// B→A: recv from peer, send to us (new client)
let a_tx2 = client_transport.clone();
let b_tx2 = peer_transport.clone();
let s2 = stats.clone();
let b_to_a = tokio::spawn(async move {
loop {
match b_tx2.recv_media().await {
Ok(Some(pkt)) => {
if let Err(e) = a_tx2.send_media(&pkt).await {
error!("B→A send error: {e}");
break;
}
s2.downstream_packets.fetch_add(1, Ordering::Relaxed);
}
Ok(None) => { info!("client B disconnected"); break; }
Err(e) => { error!("B recv error: {e}"); break; }
}
}
});
tokio::select! {
_ = a_to_b => {}
_ = b_to_a => {}
}
stats_handle.abort();
info!(%remote_addr, %peer_addr, "bridge ended");
} else {
// We're the first client — register and wait, or echo
{
let mut slot = waiting_peer.lock().await;
*slot = Some(client_transport.clone());
}
info!(%remote_addr, "waiting for second client to bridge (echo in meantime)");
// Echo while waiting for a peer to connect
let mut echo_count = 0u64;
loop {
match client_transport.recv_media().await {
Ok(Some(packet)) => {
if let Err(e) = client_transport.send_media(&packet).await {
error!("echo send error: {e}");
break;
}
echo_count += 1;
if echo_count % 250 == 0 {
info!(echoed = echo_count, "echo mode stats");
}
}
Ok(None) => {
info!(%remote_addr, "connection closed");
break;
}
echo_count += 1;
if echo_count % 250 == 0 {
info!(echoed = echo_count, "echo mode stats");
Err(e) => {
error!(%remote_addr, "recv error: {e}");
break;
}
}
Ok(None) => {
info!(%remote_addr, "connection closed");
break;
}
Err(e) => {
error!(%remote_addr, "recv error: {e}");
break;
}
}
// Clean up waiting slot if we disconnect
let mut slot = waiting_peer.lock().await;
*slot = None;
}
}
}