diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index abb3d84..46ce2ab 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -111,9 +111,8 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse SignalMessage::FederationHello { .. } - | SignalMessage::FederationRoomJoin { .. } - | SignalMessage::FederationRoomLeave { .. } - | SignalMessage::FederationParticipantUpdate { .. } => CallSignalType::Offer, // relay-only + | SignalMessage::GlobalRoomActive { .. } + | SignalMessage::GlobalRoomInactive { .. } => CallSignalType::Offer, // relay-only } } diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index a4d7bfb..c5f447d 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -665,21 +665,14 @@ pub enum SignalMessage { tls_fingerprint: String, }, - /// Federation: a room exists on the sending relay with active local participants. - FederationRoomJoin { - room: String, - participants: Vec, - }, - - /// Federation: a room is now empty on the sending relay. - FederationRoomLeave { + /// Federation: this relay now has local participants in a global room. + GlobalRoomActive { room: String, }, - /// Federation: local participant list changed for a federated room. - FederationParticipantUpdate { + /// Federation: this relay's last local participant left a global room. + GlobalRoomInactive { room: String, - participants: Vec, }, } diff --git a/crates/wzp-relay/src/config.rs b/crates/wzp-relay/src/config.rs index d18db41..7dfb077 100644 --- a/crates/wzp-relay/src/config.rs +++ b/crates/wzp-relay/src/config.rs @@ -25,6 +25,13 @@ pub struct TrustedConfig { pub label: Option, } +/// A room declared global — bridged across all federated peers. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GlobalRoomConfig { + /// Room name to bridge (e.g., "android"). + pub name: String, +} + /// Configuration for the relay daemon. /// /// All fields have defaults, so a minimal TOML file only needs the @@ -73,6 +80,9 @@ pub struct RelayConfig { /// Federation peer relays. #[serde(default)] pub peers: Vec, + /// Global rooms bridged across federation. + #[serde(default)] + pub global_rooms: Vec, /// Trusted relay fingerprints — accept inbound federation from these relays. /// Unlike [[peers]], no url is needed — the peer connects to us. #[serde(default)] @@ -99,6 +109,7 @@ impl Default for RelayConfig { ws_port: None, static_dir: None, peers: Vec::new(), + global_rooms: Vec::new(), trusted: Vec::new(), debug_tap: None, } diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 1acf019..7ac1fb0 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -1,10 +1,11 @@ -//! Relay federation — connects to peer relays and bridges rooms with matching names. +//! Relay federation — global room routing between peer relays. //! -//! Each federated peer is represented as a virtual participant in shared rooms. -//! Media from local participants is forwarded to the peer via room-tagged datagrams. -//! Media from the peer is received, demuxed by room hash, and forwarded to local participants. +//! Each relay maintains a forwarding table per global room. When a local participant +//! sends media in a global room, it's forwarded to all peer relays that have the room +//! active. Incoming federated media is delivered to local participants and optionally +//! forwarded to other active peers (multi-hop). -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -18,7 +19,7 @@ use wzp_proto::{MediaTransport, SignalMessage}; use wzp_transport::QuinnTransport; use crate::config::{PeerConfig, TrustedConfig}; -use crate::room::{self, ParticipantSender, RoomManager}; +use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager}; /// Compute 8-byte room hash for federation datagram tagging. pub fn room_hash(room_name: &str) -> [u8; 8] { @@ -28,19 +29,36 @@ pub fn room_hash(room_name: &str) -> [u8; 8] { out } -/// Manages federation connections to peer relays. +/// Normalize a fingerprint string (remove colons, lowercase). +fn normalize_fp(fp: &str) -> String { + fp.replace(':', "").to_lowercase() +} + +/// Active link to a peer relay. +struct PeerLink { + transport: Arc, + label: String, + /// Global rooms that this peer has reported as active. + active_rooms: HashSet, +} + +/// Manages federation connections and global room forwarding. pub struct FederationManager { peers: Vec, trusted: Vec, + global_rooms: HashSet, room_mgr: Arc>, endpoint: quinn::Endpoint, local_tls_fp: String, + /// Active peer connections, keyed by normalized fingerprint. + peer_links: Arc>>, } impl FederationManager { pub fn new( peers: Vec, trusted: Vec, + global_rooms: HashSet, room_mgr: Arc>, endpoint: quinn::Endpoint, local_tls_fp: String, @@ -48,19 +66,41 @@ impl FederationManager { Self { peers, trusted, + global_rooms, room_mgr, endpoint, local_tls_fp, + peer_links: Arc::new(Mutex::new(HashMap::new())), } } - /// Start federation — spawns one task per configured peer. + /// Check if a room name (which may be hashed) is a global room. + pub fn is_global_room(&self, room: &str) -> bool { + // Check both the raw name and the hashed version + if self.global_rooms.contains(room) { + return true; + } + // The room name in the room manager is the hashed SNI. + // Check if any configured global room hashes to this value. + self.global_rooms.iter().any(|name| { + wzp_crypto::hash_room_name(name) == room + }) + } + + /// Start federation — spawns connection loops + event dispatcher. pub async fn run(self: Arc) { - if self.peers.is_empty() { + if self.peers.is_empty() && self.global_rooms.is_empty() { return; } - info!(peers = self.peers.len(), "federation starting"); + info!( + peers = self.peers.len(), + global_rooms = self.global_rooms.len(), + "federation starting" + ); + let mut handles = Vec::new(); + + // Per-peer outbound connection loops for peer in &self.peers { let this = self.clone(); let peer = peer.clone(); @@ -68,30 +108,58 @@ impl FederationManager { run_peer_loop(this, peer).await; })); } + + // Room event dispatcher + let room_events = { + let mgr = self.room_mgr.lock().await; + mgr.subscribe_events() + }; + let this = self.clone(); + handles.push(tokio::spawn(async move { + run_room_event_dispatcher(this, room_events).await; + })); + for h in handles { let _ = h.await; } } - /// Handle an inbound federation connection from a peer that we recognize. + /// Handle an inbound federation connection from a recognized peer. pub async fn handle_inbound( self: &Arc, transport: Arc, peer_config: PeerConfig, ) { - let addr: SocketAddr = peer_config.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()); - info!(peer = ?peer_config.label, %addr, "inbound federation link active"); - if let Err(e) = run_federation_link(self.clone(), transport, addr, &peer_config).await { - warn!(peer = ?peer_config.label, "inbound federation link ended: {e}"); + let peer_fp = normalize_fp(&peer_config.fingerprint); + let label = peer_config.label.unwrap_or_else(|| peer_config.url.clone()); + info!(peer = %label, "inbound federation link active"); + if let Err(e) = run_federation_link(self.clone(), transport, peer_fp, label.clone()).await { + warn!(peer = %label, "inbound federation link ended: {e}"); } } - /// Find a configured peer by TLS fingerprint. + /// Forward locally-generated media to active peers for a global room. + pub async fn forward_to_peers(&self, room_name: &str, room_hash: &[u8; 8], media_data: &Bytes) { + let links = self.peer_links.lock().await; + if links.is_empty() { + return; + } + for link in links.values() { + if link.active_rooms.contains(room_name) { + let mut tagged = Vec::with_capacity(8 + media_data.len()); + tagged.extend_from_slice(room_hash); + tagged.extend_from_slice(media_data); + let _ = link.transport.send_raw_datagram(&tagged); + } + } + } + + // ── Trust verification (kept from previous implementation) ── + pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> { self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp)) } - /// Find a configured peer by source IP address. pub fn find_peer_by_addr(&self, addr: SocketAddr) -> Option<&PeerConfig> { let addr_ip = addr.ip(); self.peers.iter().find(|p| { @@ -101,19 +169,14 @@ impl FederationManager { }) } - /// Find a trusted relay by TLS fingerprint. pub fn find_trusted_by_fingerprint(&self, fp: &str) -> Option<&TrustedConfig> { self.trusted.iter().find(|t| normalize_fp(&t.fingerprint) == normalize_fp(fp)) } - /// Check if an inbound federation connection is trusted (by IP match in [[peers]] or fingerprint in [[trusted]]). - /// Returns the label for logging. pub fn check_inbound_trust(&self, addr: SocketAddr, hello_fp: &str) -> Option { - // Check [[peers]] by IP if let Some(peer) = self.find_peer_by_addr(addr) { return Some(peer.label.clone().unwrap_or_else(|| peer.url.clone())); } - // Check [[trusted]] by fingerprint if let Some(trusted) = self.find_trusted_by_fingerprint(hello_fp) { return Some(trusted.label.clone().unwrap_or_else(|| hello_fp[..16].to_string())); } @@ -121,11 +184,57 @@ impl FederationManager { } } -/// Normalize a fingerprint string (remove colons, lowercase). -fn normalize_fp(fp: &str) -> String { - fp.replace(':', "").to_lowercase() +// ── Outbound media egress task ── + +/// Drains the federation media channel and forwards to active peers. +pub async fn run_federation_media_egress( + fm: Arc, + mut rx: tokio::sync::mpsc::Receiver, +) { + while let Some(out) = rx.recv().await { + fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data).await; + } } +// ── Room event dispatcher ── + +/// Watches RoomManager events and sends GlobalRoomActive/Inactive to peers. +async fn run_room_event_dispatcher( + fm: Arc, + mut events: tokio::sync::broadcast::Receiver, +) { + loop { + match events.recv().await { + Ok(RoomEvent::LocalJoin { room }) => { + if fm.is_global_room(&room) { + info!(room = %room, "global room now active, announcing to peers"); + let msg = SignalMessage::GlobalRoomActive { room }; + let links = fm.peer_links.lock().await; + for link in links.values() { + let _ = link.transport.send_signal(&msg).await; + } + } + } + Ok(RoomEvent::LocalLeave { room }) => { + if fm.is_global_room(&room) { + info!(room = %room, "global room now inactive, announcing to peers"); + let msg = SignalMessage::GlobalRoomInactive { room }; + let links = fm.peer_links.lock().await; + for link in links.values() { + let _ = link.transport.send_signal(&msg).await; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!(missed = n, "room event receiver lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } +} + +// ── Peer connection management ── + /// Persistent connection loop for one peer — reconnects with backoff. async fn run_peer_loop(fm: Arc, peer: PeerConfig) { let mut backoff = Duration::from_secs(5); @@ -133,9 +242,10 @@ async fn run_peer_loop(fm: Arc, peer: PeerConfig) { info!(peer_url = %peer.url, label = ?peer.label, "federation: connecting to peer..."); match connect_to_peer(&fm, &peer).await { Ok(transport) => { - backoff = Duration::from_secs(5); // reset on success - let addr: SocketAddr = peer.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()); - if let Err(e) = run_federation_link(fm.clone(), transport, addr, &peer).await { + backoff = Duration::from_secs(5); + let peer_fp = normalize_fp(&peer.fingerprint); + let label = peer.label.clone().unwrap_or_else(|| peer.url.clone()); + if let Err(e) = run_federation_link(fm.clone(), transport, peer_fp, label).await { warn!(peer_url = %peer.url, "federation link ended: {e}"); } } @@ -148,219 +258,201 @@ async fn run_peer_loop(fm: Arc, peer: PeerConfig) { } } -/// Connect to a peer relay. +/// Connect to a peer relay and send hello. async fn connect_to_peer(fm: &FederationManager, peer: &PeerConfig) -> Result, anyhow::Error> { let addr: SocketAddr = peer.url.parse()?; let client_cfg = wzp_transport::client_config(); let conn = wzp_transport::connect(&fm.endpoint, addr, "_federation", client_cfg).await?; - // TODO: verify peer TLS fingerprint once we have cert access let transport = Arc::new(QuinnTransport::new(conn)); - // Send hello with our TLS fingerprint so the peer can verify us + + // Send hello with our TLS fingerprint let hello = SignalMessage::FederationHello { tls_fingerprint: fm.local_tls_fp.clone(), }; transport.send_signal(&hello).await .map_err(|e| anyhow::anyhow!("federation hello send failed: {e}"))?; - info!(peer_url = %peer.url, label = ?peer.label, "federation: connected to peer (hello sent)"); + + info!(peer_url = %peer.url, label = ?peer.label, "federation: connected (hello sent)"); Ok(transport) } -/// Run the federation link: exchange room info and forward media. +// ── Federation link (runs on a single QUIC connection) ── + +/// Run the federation link: exchange global room state and forward media. async fn run_federation_link( fm: Arc, transport: Arc, - peer_addr: SocketAddr, - peer: &PeerConfig, + peer_fp: String, + peer_label: String, ) -> Result<(), anyhow::Error> { - // Announce our active rooms to the peer - let rooms = { - let mgr = fm.room_mgr.lock().await; - mgr.active_rooms() - }; - for room_name in &rooms { - let participants = { - let mgr = fm.room_mgr.lock().await; - mgr.local_participants(room_name) - }; - let msg = SignalMessage::FederationRoomJoin { - room: room_name.clone(), - participants, - }; - transport.send_signal(&msg).await?; + // Register peer link + { + let mut links = fm.peer_links.lock().await; + links.insert(peer_fp.clone(), PeerLink { + transport: transport.clone(), + label: peer_label.clone(), + active_rooms: HashSet::new(), + }); } - // Track virtual participants we create on behalf of this peer - let mut peer_room_participants: HashMap = HashMap::new(); - // Map room_hash -> room_name for incoming media demux - let mut hash_to_room: HashMap<[u8; 8], String> = HashMap::new(); + // Announce our currently active global rooms + { + let mgr = fm.room_mgr.lock().await; + for room_name in mgr.active_rooms() { + if fm.is_global_room(&room_name) { + let msg = SignalMessage::GlobalRoomActive { room: room_name }; + let _ = transport.send_signal(&msg).await; + } + } + } - // Run three tasks: recv signals + recv media + periodic room announcements + // Two concurrent tasks: signal recv + media recv let signal_transport = transport.clone(); let media_transport = transport.clone(); - let announce_transport = transport.clone(); let fm_signal = fm.clone(); let fm_media = fm.clone(); - let fm_announce = fm.clone(); - let peer_label = peer.label.clone().unwrap_or_else(|| peer.url.clone()); - let peer_label2 = peer_label.clone(); + let peer_fp_signal = peer_fp.clone(); + let peer_fp_media = peer_fp.clone(); + let label_signal = peer_label.clone(); let signal_task = async move { loop { match signal_transport.recv_signal().await { Ok(Some(msg)) => { - info!(peer = %peer_label, "federation: received signal {:?}", std::mem::discriminant(&msg)); - match msg { - SignalMessage::FederationRoomJoin { room, participants } => { - info!(peer = %peer_label, room = %room, count = participants.len(), "federation: peer room join"); - let rh = room_hash(&room); - hash_to_room.insert(rh, room.clone()); - - let sender = ParticipantSender::Federation { - transport: signal_transport.clone(), - room_hash: rh, - }; - let (pid, update, senders) = { - let mut mgr = fm_signal.room_mgr.lock().await; - mgr.join_federated(&room, peer_addr, sender, participants) - }; - peer_room_participants.insert(room, pid); - room::broadcast_signal(&senders, &update).await; - } - SignalMessage::FederationRoomLeave { room } => { - info!(peer = %peer_label, room = %room, "federation: peer room leave"); - if let Some(pid) = peer_room_participants.remove(&room) { - let result = { - let mut mgr = fm_signal.room_mgr.lock().await; - mgr.leave(&room, pid) - }; - if let Some((update, senders)) = result { - room::broadcast_signal(&senders, &update).await; - } - } - hash_to_room.retain(|_, v| v != &room); - } - SignalMessage::FederationParticipantUpdate { room, participants } => { - let result = { - let mut mgr = fm_signal.room_mgr.lock().await; - mgr.update_federated_participants(&room, peer_addr, participants) - }; - if let Some((update, senders)) = result { - room::broadcast_signal(&senders, &update).await; - } - } - _ => {} // ignore other signals - } + handle_signal(&fm_signal, &peer_fp_signal, &label_signal, msg).await; } Ok(None) => break, Err(e) => { - error!(peer = %peer_label, "federation signal recv error: {e}"); + error!(peer = %label_signal, "federation signal error: {e}"); break; } } } - // Cleanup: remove all virtual participants for this peer - for (room, pid) in &peer_room_participants { - let result = { - let mut mgr = fm_signal.room_mgr.lock().await; - mgr.leave(room, *pid) - }; - if let Some((update, senders)) = result { - room::broadcast_signal(&senders, &update).await; - } - } - info!(peer = %peer_label, "federation signal task ended"); }; let media_task = async move { loop { match media_transport.connection().read_datagram().await { Ok(data) => { - if data.len() < 8 + 4 { - continue; // too short (need room_hash + min header) - } - let mut rh = [0u8; 8]; - rh.copy_from_slice(&data[..8]); - let media_bytes = &data[8..]; - - // Deserialize media packet - let pkt = match wzp_proto::MediaPacket::from_bytes(Bytes::copy_from_slice(media_bytes)) { - Some(pkt) => pkt, - None => continue, - }; - - // Look up room by hash — we need to get the room name from the signal task's hash_to_room - // For simplicity, we forward to all local participants via the room manager - // The virtual participant approach means we don't need the room name here — - // the SFU loop handles it. But since inbound media doesn't go through run_participant, - // we need to manually fan out. - - // For now, just use the room manager to find local participants - // This is a simplified approach — full implementation would maintain - // a shared hash_to_room map between signal and media tasks - let mgr = fm_media.room_mgr.lock().await; - for room_name in mgr.active_rooms() { - if room_hash(&room_name) == rh { - // Forward to all local participants in this room - let locals: Vec<_> = mgr.local_senders(&room_name); - drop(mgr); // release lock before sending - for sender in &locals { - if let ParticipantSender::Quic(t) = sender { - let _ = t.send_media(&pkt).await; - } - } - break; - } - } + handle_datagram(&fm_media, &peer_fp_media, data).await; } Err(_) => break, } } }; - // Periodically announce new local rooms to the peer - let announce_task = async move { - let mut announced: std::collections::HashSet = std::collections::HashSet::new(); - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - let rooms = { - let mgr = fm_announce.room_mgr.lock().await; - mgr.active_rooms() - }; - for room_name in &rooms { - if !announced.contains(room_name) { - let participants = { - let mgr = fm_announce.room_mgr.lock().await; - mgr.local_participants(room_name) - }; - if participants.is_empty() { - continue; // only virtual participants, skip - } - info!(peer = %peer_label2, room = %room_name, local_count = participants.len(), "federation: announcing room to peer"); - let msg = SignalMessage::FederationRoomJoin { - room: room_name.clone(), - participants, - }; - match announce_transport.send_signal(&msg).await { - Ok(()) => { - info!(peer = %peer_label2, room = %room_name, "federation: room announced successfully"); - announced.insert(room_name.clone()); - } - Err(e) => { - warn!(peer = %peer_label2, room = %room_name, "federation: announce send failed: {e}"); - } - } - } - } - // Remove rooms that no longer exist - announced.retain(|r| rooms.contains(r)); - } - }; - tokio::select! { _ = signal_task => {} _ = media_task => {} - _ = announce_task => {} } + // Cleanup: remove peer link + { + let mut links = fm.peer_links.lock().await; + links.remove(&peer_fp); + } + info!(peer = %peer_label, "federation link ended"); + Ok(()) } + +/// Handle an incoming federation signal. +async fn handle_signal( + fm: &Arc, + peer_fp: &str, + peer_label: &str, + msg: SignalMessage, +) { + match msg { + SignalMessage::GlobalRoomActive { room } => { + if fm.is_global_room(&room) { + info!(peer = %peer_label, room = %room, "peer has global room active"); + let mut links = fm.peer_links.lock().await; + if let Some(link) = links.get_mut(peer_fp) { + link.active_rooms.insert(room.clone()); + } + // Propagate: tell all OTHER peers this room is routable through us. + // This enables multi-hop: A→B→C where B relays A's announcement to C and vice versa. + for (fp, link) in links.iter() { + if fp != peer_fp { + let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive { room: room.clone() }).await; + } + } + } + } + SignalMessage::GlobalRoomInactive { room } => { + info!(peer = %peer_label, room = %room, "peer global room now inactive"); + let mut links = fm.peer_links.lock().await; + if let Some(link) = links.get_mut(peer_fp) { + link.active_rooms.remove(&room); + } + // Check if any other peer still has this room — if none, propagate inactive + let any_other_active = links.iter() + .any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room)); + let local_active = { + let mgr = fm.room_mgr.lock().await; + mgr.active_rooms().iter().any(|r| r == &room) + }; + if !any_other_active && !local_active { + for (fp, link) in links.iter() { + if fp != peer_fp { + let _ = link.transport.send_signal(&SignalMessage::GlobalRoomInactive { room: room.clone() }).await; + } + } + } + } + _ => {} // ignore other signals + } +} + +/// Handle an incoming federation datagram (room-hash-tagged media). +async fn handle_datagram( + fm: &Arc, + source_peer_fp: &str, + data: Bytes, +) { + if data.len() < 12 { return; } // 8-byte hash + min packet + + let mut rh = [0u8; 8]; + rh.copy_from_slice(&data[..8]); + let media_bytes = data.slice(8..); + + let pkt = match wzp_proto::MediaPacket::from_bytes(media_bytes.clone()) { + Some(pkt) => pkt, + None => return, + }; + + // Find room by hash + let room_name = { + let mgr = fm.room_mgr.lock().await; + mgr.active_rooms().into_iter().find(|r| room_hash(r) == rh) + }; + + let room_name = match room_name { + Some(r) => r, + None => return, // room not active locally + }; + + // Deliver to all local participants + let locals = { + let mgr = fm.room_mgr.lock().await; + mgr.local_senders(&room_name) + }; + for sender in &locals { + match sender { + room::ParticipantSender::Quic(t) => { let _ = t.send_media(&pkt).await; } + room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; } + } + } + + // Multi-hop: forward to OTHER active peers (not the source) + let links = fm.peer_links.lock().await; + for (fp, link) in links.iter() { + if fp != source_peer_fp && link.active_rooms.contains(&room_name) { + let mut tagged = Vec::with_capacity(8 + media_bytes.len()); + tagged.extend_from_slice(&rh); + tagged.extend_from_slice(&media_bytes); + let _ = link.transport.send_raw_datagram(&tagged); + } + } +} diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 1fcd2ab..4be8e4a 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -104,6 +104,12 @@ fn parse_args() -> RelayConfig { args.get(i).expect("--static-dir requires a directory path").to_string(), ); } + "--global-room" => { + i += 1; + config.global_rooms.push(wzp_relay::config::GlobalRoomConfig { + name: args.get(i).expect("--global-room requires a room name").to_string(), + }); + } "--debug-tap" => { i += 1; config.debug_tap = Some( @@ -132,6 +138,7 @@ fn parse_args() -> RelayConfig { eprintln!(" --probe-mesh Enable mesh mode (mark config flag, probes all --probe targets)."); eprintln!(" --mesh-status Print mesh health table and exit (diagnostic)."); eprintln!(" --trunking Enable trunk batching for outgoing media in room mode."); + eprintln!(" --global-room Declare a room as global (bridged across federation). Repeatable."); eprintln!(" --debug-tap Log packet headers for a room ('*' for all rooms)."); eprintln!(" --ws-port WebSocket listener port for browser clients (e.g., 8080)."); eprintln!(" --static-dir Directory to serve static files from (HTML/JS/WASM)."); @@ -334,10 +341,15 @@ async fn main() -> anyhow::Result<()> { let room_mgr = Arc::new(Mutex::new(RoomManager::new())); // Federation manager - let federation_mgr = if !config.peers.is_empty() || !config.trusted.is_empty() { + let global_room_set: std::collections::HashSet = config.global_rooms.iter() + .map(|g| g.name.clone()) + .collect(); + + let federation_mgr = if !config.peers.is_empty() || !config.trusted.is_empty() || !global_room_set.is_empty() { let fm = Arc::new(wzp_relay::federation::FederationManager::new( config.peers.clone(), config.trusted.clone(), + global_room_set.clone(), room_mgr.clone(), endpoint.clone(), tls_fp.clone(), @@ -386,6 +398,12 @@ async fn main() -> anyhow::Result<()> { } else { info!("auth disabled — any client can connect (use --auth-url to enable)"); } + if !config.global_rooms.is_empty() { + info!(count = config.global_rooms.len(), "global rooms configured"); + for g in &config.global_rooms { + info!(name = %g.name, " global room"); + } + } if let Some(ref tap) = config.debug_tap { info!(filter = %tap, "debug tap enabled — logging packet headers"); } @@ -701,6 +719,22 @@ async fn main() -> anyhow::Result<()> { .iter() .map(|b| format!("{b:02x}")) .collect(); + // Set up federation media channel if this is a global room + let federation_tx = if let Some(ref fm) = federation_mgr { + if fm.is_global_room(&room_name) { + let (tx, rx) = tokio::sync::mpsc::channel(256); + let fm_clone = fm.clone(); + tokio::spawn(async move { + wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await; + }); + Some(tx) + } else { + None + } + } else { + None + }; + room::run_participant( room_mgr.clone(), room_name, @@ -710,6 +744,7 @@ async fn main() -> anyhow::Result<()> { &session_id_str, trunking_enabled, debug_tap, + federation_tx, ).await; // Participant disconnected — clean up presence + per-session metrics diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 7ffeb70..9fb84f7 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -59,13 +59,20 @@ fn next_id() -> ParticipantId { NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed) } -/// Tracks where a participant originates from (for loop prevention). -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum ParticipantOrigin { - /// Connected directly to this relay. - Local, - /// Virtual participant representing a federated peer relay. - Federated { relay_addr: std::net::SocketAddr }, +/// Events emitted by RoomManager for federation to observe. +#[derive(Clone, Debug)] +pub enum RoomEvent { + /// First local participant joined this room. + LocalJoin { room: String }, + /// Last local participant left this room. + LocalLeave { room: String }, +} + +/// Outbound federation media from a local participant. +pub struct FederationMediaOut { + pub room_name: String, + pub room_hash: [u8; 8], + pub data: Bytes, } /// How to send data to a participant — either via QUIC transport or WebSocket channel. @@ -73,11 +80,6 @@ pub enum ParticipantOrigin { pub enum ParticipantSender { Quic(Arc), WebSocket(tokio::sync::mpsc::Sender), - /// Federated peer relay — media is prefixed with an 8-byte room hash. - Federation { - transport: Arc, - room_hash: [u8; 8], - }, } impl ParticipantSender { @@ -96,14 +98,6 @@ impl ParticipantSender { }; transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}")) } - ParticipantSender::Federation { transport, room_hash } => { - // Prefix media data with room hash for demuxing on the peer relay - let mut tagged = Vec::with_capacity(8 + data.len()); - tagged.extend_from_slice(room_hash); - tagged.extend_from_slice(data); - transport.send_raw_datagram(&tagged) - .map_err(|e| format!("federation send: {e}")) - } } } @@ -139,21 +133,17 @@ struct Participant { sender: ParticipantSender, fingerprint: Option, alias: Option, - origin: ParticipantOrigin, } /// A room holding multiple participants. struct Room { participants: Vec, - /// Remote participants from federated peers (for merged RoomUpdate). - federated_participants: HashMap>, } impl Room { fn new() -> Self { Self { participants: Vec::new(), - federated_participants: HashMap::new(), } } @@ -163,11 +153,10 @@ impl Room { sender: ParticipantSender, fingerprint: Option, alias: Option, - origin: ParticipantOrigin, ) -> ParticipantId { let id = next_id(); - info!(room_size = self.participants.len() + 1, participant = id, %addr, ?origin, "joined room"); - self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias, origin }); + info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room"); + self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias }); id } @@ -184,38 +173,15 @@ impl Room { .collect() } - /// Get senders with loop prevention for federation. - /// - /// - Media from a **local** participant → send to ALL others (local + federated) - /// - Media from a **federated** participant → send to LOCAL participants only - /// (the source relay already forwarded to its own locals and other peers) - fn others_for_origin(&self, exclude_id: ParticipantId, source_origin: &ParticipantOrigin) -> Vec { + /// Build a RoomUpdate participant list. + fn participant_list(&self) -> Vec { self.participants .iter() - .filter(|p| p.id != exclude_id) - .filter(|p| match source_origin { - ParticipantOrigin::Local => true, - ParticipantOrigin::Federated { .. } => p.origin == ParticipantOrigin::Local, - }) - .map(|p| p.sender.clone()) - .collect() - } - - /// Build a RoomUpdate participant list (local + federated). - fn participant_list(&self) -> Vec { - let mut list: Vec<_> = self.participants - .iter() - .filter(|p| p.origin == ParticipantOrigin::Local) .map(|p| wzp_proto::packet::RoomParticipant { fingerprint: p.fingerprint.clone().unwrap_or_default(), alias: p.alias.clone(), }) - .collect(); - // Merge federated participants from all peer relays - for remote in self.federated_participants.values() { - list.extend(remote.iter().cloned()); - } - list + .collect() } /// Get all senders (for broadcasting to everyone including the joiner). @@ -239,24 +205,35 @@ pub struct RoomManager { /// When `None`, rooms are open (no auth mode). When `Some`, only listed /// fingerprints can join the corresponding room. acl: Option>>, + /// Channel for room lifecycle events (federation subscribes). + event_tx: tokio::sync::broadcast::Sender, } impl RoomManager { pub fn new() -> Self { + let (event_tx, _) = tokio::sync::broadcast::channel(64); Self { rooms: HashMap::new(), acl: None, + event_tx, } } /// Create a room manager with ACL enforcement enabled. pub fn with_acl() -> Self { + let (event_tx, _) = tokio::sync::broadcast::channel(64); Self { rooms: HashMap::new(), acl: Some(HashMap::new()), + event_tx, } } + /// Subscribe to room lifecycle events (for federation). + pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver { + self.event_tx.subscribe() + } + /// Grant a fingerprint access to a room. pub fn allow(&mut self, room_name: &str, fingerprint: &str) { if let Some(ref mut acl) = self.acl { @@ -295,8 +272,13 @@ impl RoomManager { warn!(room = room_name, fingerprint = ?fingerprint, "unauthorized room join attempt"); return Err("not authorized for this room".to_string()); } + let was_empty = !self.rooms.contains_key(room_name) + || self.rooms.get(room_name).map_or(true, |r| r.is_empty()); let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); - let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()), ParticipantOrigin::Local); + let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string())); + if was_empty { + let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() }); + } let update = wzp_proto::SignalMessage::RoomUpdate { count: room.len() as u32, participants: room.participant_list(), @@ -317,78 +299,15 @@ impl RoomManager { Ok(id) } - /// Join a room as a federated virtual participant. - pub fn join_federated( - &mut self, - room_name: &str, - relay_addr: std::net::SocketAddr, - sender: ParticipantSender, - remote_participants: Vec, - ) -> (ParticipantId, wzp_proto::SignalMessage, Vec) { - let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); - room.federated_participants.insert(relay_addr, remote_participants); - let id = room.add( - relay_addr, sender, None, Some("(federated)".to_string()), - ParticipantOrigin::Federated { relay_addr }, - ); - let update = wzp_proto::SignalMessage::RoomUpdate { - count: room.len() as u32, - participants: room.participant_list(), - }; - let senders = room.all_senders(); - (id, update, senders) - } - - /// Update federated participant list for a room (from FederationParticipantUpdate). - pub fn update_federated_participants( - &mut self, - room_name: &str, - relay_addr: std::net::SocketAddr, - participants: Vec, - ) -> Option<(wzp_proto::SignalMessage, Vec)> { - if let Some(room) = self.rooms.get_mut(room_name) { - room.federated_participants.insert(relay_addr, participants); - let update = wzp_proto::SignalMessage::RoomUpdate { - count: room.len() as u32, - participants: room.participant_list(), - }; - let senders = room.all_senders(); - Some((update, senders)) - } else { - None - } - } - - /// Get the origin of a participant by ID. - pub fn participant_origin(&self, room_name: &str, participant_id: ParticipantId) -> Option { - self.rooms.get(room_name) - .and_then(|room| room.participants.iter().find(|p| p.id == participant_id)) - .map(|p| p.origin.clone()) - } - - /// Get list of active room names (for federation room announcements). + /// Get list of active room names. pub fn active_rooms(&self) -> Vec { self.rooms.keys().cloned().collect() } - /// Get local participant list for a room (excludes federated virtual participants). - pub fn local_participants(&self, room_name: &str) -> Vec { - self.rooms.get(room_name) - .map(|room| room.participants.iter() - .filter(|p| p.origin == ParticipantOrigin::Local) - .map(|p| wzp_proto::packet::RoomParticipant { - fingerprint: p.fingerprint.clone().unwrap_or_default(), - alias: p.alias.clone(), - }) - .collect()) - .unwrap_or_default() - } - - /// Get senders for local-only participants in a room (for federation inbound media). + /// Get all senders for participants in a room (for federation inbound media delivery). pub fn local_senders(&self, room_name: &str) -> Vec { self.rooms.get(room_name) .map(|room| room.participants.iter() - .filter(|p| p.origin == ParticipantOrigin::Local) .map(|p| p.sender.clone()) .collect()) .unwrap_or_default() @@ -400,6 +319,7 @@ impl RoomManager { room.remove(participant_id); if room.is_empty() { self.rooms.remove(room_name); + let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() }); info!(room = room_name, "room closed (empty)"); return None; } @@ -510,6 +430,7 @@ pub async fn run_participant( session_id: &str, trunking_enabled: bool, debug_tap: Option, + federation_tx: Option>, ) { if trunking_enabled { run_participant_trunked( @@ -518,7 +439,7 @@ pub async fn run_participant( .await; } else { run_participant_plain( - room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, + room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx, ) .await; } @@ -533,6 +454,7 @@ async fn run_participant_plain( metrics: Arc, session_id: &str, debug_tap: Option, + federation_tx: Option>, ) { let addr = transport.connection().remote_address(); let mut packets_forwarded = 0u64; @@ -635,21 +557,19 @@ async fn run_participant_plain( ParticipantSender::WebSocket(_) => { let _ = other.send_raw(&pkt.payload).await; } - ParticipantSender::Federation { transport, room_hash } => { - // Send room-tagged datagram to federated peer - let data = pkt.to_bytes(); - let mut tagged = Vec::with_capacity(8 + data.len()); - tagged.extend_from_slice(room_hash); - tagged.extend_from_slice(&data); - if let Err(e) = transport.send_raw_datagram(&tagged) { - send_errors += 1; - if send_errors <= 5 { - warn!(room = %room_name, "federation forward error: {e}"); - } - } - } } } + + // Federation: forward to active peer relays via channel + if let Some(ref fed_tx) = federation_tx { + let data = pkt.to_bytes(); + let _ = fed_tx.try_send(FederationMediaOut { + room_name: room_name.clone(), + room_hash: crate::federation::room_hash(&room_name), + data, + }); + } + let fwd_ms = fwd_start.elapsed().as_millis() as u64; if fwd_ms > max_forward_ms { max_forward_ms = fwd_ms; @@ -815,13 +735,6 @@ async fn run_participant_trunked( ParticipantSender::WebSocket(_) => { let _ = other.send_raw(&pkt.payload).await; } - ParticipantSender::Federation { transport, room_hash } => { - let data = pkt.to_bytes(); - let mut tagged = Vec::with_capacity(8 + data.len()); - tagged.extend_from_slice(room_hash); - tagged.extend_from_slice(&data); - let _ = transport.send_raw_datagram(&tagged); - } } } let fwd_ms = fwd_start.elapsed().as_millis() as u64;