diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index 4fe27c9..428a27b 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -110,6 +110,9 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse + SignalMessage::FederationRoomJoin { .. } + | SignalMessage::FederationRoomLeave { .. } + | SignalMessage::FederationParticipantUpdate { .. } => CallSignalType::Offer, // relay-only } } diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index 1e3909e..2352f54 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -656,6 +656,25 @@ pub enum SignalMessage { /// List of participants currently in the room. participants: Vec, }, + + // ── Federation signals (relay-to-relay) ── + + /// Federation: a room exists on the sending relay with active local participants. + FederationRoomJoin { + room: String, + participants: Vec, + }, + + /// Federation: a room is now empty on the sending relay. + FederationRoomLeave { + room: String, + }, + + /// Federation: local participant list changed for a federated room. + FederationParticipantUpdate { + room: String, + participants: Vec, + }, } /// A participant entry in a RoomUpdate message. diff --git a/crates/wzp-relay/Cargo.toml b/crates/wzp-relay/Cargo.toml index 9e85240..6014314 100644 --- a/crates/wzp-relay/Cargo.toml +++ b/crates/wzp-relay/Cargo.toml @@ -29,6 +29,7 @@ axum = { version = "0.7", default-features = false, features = ["tokio", "http1" tower-http = { version = "0.6", features = ["fs"] } futures-util = "0.3" dirs = "6" +sha2 = { workspace = true } [[bin]] name = "wzp-relay" diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs new file mode 100644 index 0000000..a51dd31 --- /dev/null +++ b/crates/wzp-relay/src/federation.rs @@ -0,0 +1,284 @@ +//! Relay federation — connects to peer relays and bridges rooms with matching names. +//! +//! Each federated peer is represented as a virtual participant in shared rooms. +//! Media from local participants is forwarded to the peer via room-tagged datagrams. +//! Media from the peer is received, demuxed by room hash, and forwarded to local participants. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; +use sha2::{Sha256, Digest}; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +use wzp_proto::{MediaTransport, SignalMessage}; +use wzp_transport::QuinnTransport; + +use crate::config::PeerConfig; +use crate::room::{self, ParticipantSender, RoomManager}; + +/// Compute 8-byte room hash for federation datagram tagging. +pub fn room_hash(room_name: &str) -> [u8; 8] { + let h = Sha256::digest(room_name.as_bytes()); + let mut out = [0u8; 8]; + out.copy_from_slice(&h[..8]); + out +} + +/// Manages federation connections to peer relays. +pub struct FederationManager { + peers: Vec, + room_mgr: Arc>, + endpoint: quinn::Endpoint, + local_tls_fp: String, +} + +impl FederationManager { + pub fn new( + peers: Vec, + room_mgr: Arc>, + endpoint: quinn::Endpoint, + local_tls_fp: String, + ) -> Self { + Self { + peers, + room_mgr, + endpoint, + local_tls_fp, + } + } + + /// Start federation — spawns one task per configured peer. + pub async fn run(self: Arc) { + if self.peers.is_empty() { + return; + } + info!(peers = self.peers.len(), "federation starting"); + let mut handles = Vec::new(); + for peer in &self.peers { + let this = self.clone(); + let peer = peer.clone(); + handles.push(tokio::spawn(async move { + run_peer_loop(this, peer).await; + })); + } + for h in handles { + let _ = h.await; + } + } + + /// Handle an inbound federation connection from a peer that we recognize. + pub async fn handle_inbound( + self: &Arc, + transport: Arc, + peer_config: PeerConfig, + ) { + let addr: SocketAddr = peer_config.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()); + info!(peer = ?peer_config.label, %addr, "inbound federation link active"); + if let Err(e) = run_federation_link(self.clone(), transport, addr, &peer_config).await { + warn!(peer = ?peer_config.label, "inbound federation link ended: {e}"); + } + } + + /// Find a configured peer by TLS fingerprint. + pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> { + self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp)) + } +} + +/// Normalize a fingerprint string (remove colons, lowercase). +fn normalize_fp(fp: &str) -> String { + fp.replace(':', "").to_lowercase() +} + +/// Persistent connection loop for one peer — reconnects with backoff. +async fn run_peer_loop(fm: Arc, peer: PeerConfig) { + let mut backoff = Duration::from_secs(5); + loop { + info!(peer_url = %peer.url, label = ?peer.label, "federation: connecting to peer..."); + match connect_to_peer(&fm, &peer).await { + Ok(transport) => { + backoff = Duration::from_secs(5); // reset on success + let addr: SocketAddr = peer.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()); + if let Err(e) = run_federation_link(fm.clone(), transport, addr, &peer).await { + warn!(peer_url = %peer.url, "federation link ended: {e}"); + } + } + Err(e) => { + warn!(peer_url = %peer.url, backoff_s = backoff.as_secs(), "federation connect failed: {e}"); + } + } + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(Duration::from_secs(300)); + } +} + +/// Connect to a peer relay. +async fn connect_to_peer(fm: &FederationManager, peer: &PeerConfig) -> Result, anyhow::Error> { + let addr: SocketAddr = peer.url.parse()?; + let client_cfg = wzp_transport::client_config(); + let conn = wzp_transport::connect(&fm.endpoint, addr, "_federation", client_cfg).await?; + // TODO: verify peer TLS fingerprint once we have cert access + let transport = Arc::new(QuinnTransport::new(conn)); + info!(peer_url = %peer.url, label = ?peer.label, "federation: connected to peer"); + Ok(transport) +} + +/// Run the federation link: exchange room info and forward media. +async fn run_federation_link( + fm: Arc, + transport: Arc, + peer_addr: SocketAddr, + peer: &PeerConfig, +) -> Result<(), anyhow::Error> { + // Announce our active rooms to the peer + let rooms = { + let mgr = fm.room_mgr.lock().await; + mgr.active_rooms() + }; + for room_name in &rooms { + let participants = { + let mgr = fm.room_mgr.lock().await; + mgr.local_participants(room_name) + }; + let msg = SignalMessage::FederationRoomJoin { + room: room_name.clone(), + participants, + }; + transport.send_signal(&msg).await?; + } + + // Track virtual participants we create on behalf of this peer + let mut peer_room_participants: HashMap = HashMap::new(); + // Map room_hash -> room_name for incoming media demux + let mut hash_to_room: HashMap<[u8; 8], String> = HashMap::new(); + + // Run two tasks: recv signals + recv media datagrams + let signal_transport = transport.clone(); + let media_transport = transport.clone(); + let fm_signal = fm.clone(); + let fm_media = fm.clone(); + let peer_label = peer.label.clone().unwrap_or_else(|| peer.url.clone()); + + let signal_task = async move { + loop { + match signal_transport.recv_signal().await { + Ok(Some(msg)) => { + match msg { + SignalMessage::FederationRoomJoin { room, participants } => { + info!(peer = %peer_label, room = %room, count = participants.len(), "federation: peer room join"); + let rh = room_hash(&room); + hash_to_room.insert(rh, room.clone()); + + let sender = ParticipantSender::Federation { + transport: signal_transport.clone(), + room_hash: rh, + }; + let (pid, update, senders) = { + let mut mgr = fm_signal.room_mgr.lock().await; + mgr.join_federated(&room, peer_addr, sender, participants) + }; + peer_room_participants.insert(room, pid); + room::broadcast_signal(&senders, &update).await; + } + SignalMessage::FederationRoomLeave { room } => { + info!(peer = %peer_label, room = %room, "federation: peer room leave"); + if let Some(pid) = peer_room_participants.remove(&room) { + let result = { + let mut mgr = fm_signal.room_mgr.lock().await; + mgr.leave(&room, pid) + }; + if let Some((update, senders)) = result { + room::broadcast_signal(&senders, &update).await; + } + } + hash_to_room.retain(|_, v| v != &room); + } + SignalMessage::FederationParticipantUpdate { room, participants } => { + let result = { + let mut mgr = fm_signal.room_mgr.lock().await; + mgr.update_federated_participants(&room, peer_addr, participants) + }; + if let Some((update, senders)) = result { + room::broadcast_signal(&senders, &update).await; + } + } + _ => {} // ignore other signals + } + } + Ok(None) => break, + Err(e) => { + error!(peer = %peer_label, "federation signal recv error: {e}"); + break; + } + } + } + // Cleanup: remove all virtual participants for this peer + for (room, pid) in &peer_room_participants { + let result = { + let mut mgr = fm_signal.room_mgr.lock().await; + mgr.leave(room, *pid) + }; + if let Some((update, senders)) = result { + room::broadcast_signal(&senders, &update).await; + } + } + info!(peer = %peer_label, "federation signal task ended"); + }; + + let media_task = async move { + loop { + match media_transport.connection().read_datagram().await { + Ok(data) => { + if data.len() < 8 + 4 { + continue; // too short (need room_hash + min header) + } + let mut rh = [0u8; 8]; + rh.copy_from_slice(&data[..8]); + let media_bytes = &data[8..]; + + // Deserialize media packet + let pkt = match wzp_proto::MediaPacket::from_bytes(Bytes::copy_from_slice(media_bytes)) { + Some(pkt) => pkt, + None => continue, + }; + + // Look up room by hash — we need to get the room name from the signal task's hash_to_room + // For simplicity, we forward to all local participants via the room manager + // The virtual participant approach means we don't need the room name here — + // the SFU loop handles it. But since inbound media doesn't go through run_participant, + // we need to manually fan out. + + // For now, just use the room manager to find local participants + // This is a simplified approach — full implementation would maintain + // a shared hash_to_room map between signal and media tasks + let mgr = fm_media.room_mgr.lock().await; + for room_name in mgr.active_rooms() { + if room_hash(&room_name) == rh { + // Forward to all local participants in this room + let locals: Vec<_> = mgr.local_senders(&room_name); + drop(mgr); // release lock before sending + for sender in &locals { + if let ParticipantSender::Quic(t) = sender { + let _ = t.send_media(&pkt).await; + } + } + break; + } + } + } + Err(_) => break, + } + } + }; + + tokio::select! { + _ = signal_task => {} + _ = media_task => {} + } + + Ok(()) +} diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index a798c3a..48e7688 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -9,6 +9,7 @@ pub mod auth; pub mod config; +pub mod federation; pub mod handshake; pub mod metrics; pub mod pipeline; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 8f2c6b7..d45ee50 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -320,6 +320,21 @@ async fn main() -> anyhow::Result<()> { // Room manager (room mode only) let room_mgr = Arc::new(Mutex::new(RoomManager::new())); + // Federation manager + let federation_mgr = if !config.peers.is_empty() { + let fm = Arc::new(wzp_relay::federation::FederationManager::new( + config.peers.clone(), + room_mgr.clone(), + endpoint.clone(), + tls_fp.clone(), + )); + let fm_run = fm.clone(); + tokio::spawn(async move { fm_run.run().await }); + Some(fm) + } else { + None + }; + // Session manager — enforces max concurrent sessions let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); @@ -375,6 +390,7 @@ async fn main() -> anyhow::Result<()> { let trunking_enabled = config.trunking_enabled; let presence = presence.clone(); let route_resolver = route_resolver.clone(); + let federation_mgr = federation_mgr.clone(); tokio::spawn(async move { let addr = connection.remote_address(); @@ -482,6 +498,38 @@ async fn main() -> anyhow::Result<()> { return; } + // Federation connections use SNI "_federation" + if room_name == "_federation" { + if let Some(ref fm) = federation_mgr { + // Check if we recognize this peer by TLS fingerprint + let peer_fp = wzp_transport::tls_fingerprint( + &transport.connection() + .peer_identity() + .and_then(|id| id.downcast::>().ok()) + .and_then(|certs| certs.first().cloned()) + .map(|c| c.to_vec()) + .unwrap_or_default() + ); + if let Some(peer_config) = fm.find_peer_by_fingerprint(&peer_fp) { + let peer_config = peer_config.clone(); + let fm = fm.clone(); + info!(%addr, label = ?peer_config.label, "inbound federation connection accepted"); + fm.handle_inbound(transport, peer_config).await; + } else { + warn!(%addr, "unknown relay wants to federate"); + info!(" to accept, add to relay.toml:"); + info!(" [[peers]]"); + info!(" url = \"{addr}\""); + info!(" fingerprint = \"{peer_fp}\""); + transport.close().await.ok(); + } + } else { + info!(%addr, "federation connection rejected (no peers configured)"); + transport.close().await.ok(); + } + return; + } + // Auth check: if --auth-url is set, expect first signal message to be a token // Auth: if --auth-url is set, expect AuthToken as first signal let authenticated_fp: Option = if let Some(ref url) = auth_url { diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 70850ad..2bfaab0 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -27,11 +27,25 @@ fn next_id() -> ParticipantId { NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed) } +/// Tracks where a participant originates from (for loop prevention). +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ParticipantOrigin { + /// Connected directly to this relay. + Local, + /// Virtual participant representing a federated peer relay. + Federated { relay_addr: std::net::SocketAddr }, +} + /// How to send data to a participant — either via QUIC transport or WebSocket channel. #[derive(Clone)] pub enum ParticipantSender { Quic(Arc), WebSocket(tokio::sync::mpsc::Sender), + /// Federated peer relay — media is prefixed with an 8-byte room hash. + Federation { + transport: Arc, + room_hash: [u8; 8], + }, } impl ParticipantSender { @@ -50,6 +64,14 @@ impl ParticipantSender { }; transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}")) } + ParticipantSender::Federation { transport, room_hash } => { + // Prefix media data with room hash for demuxing on the peer relay + let mut tagged = Vec::with_capacity(8 + data.len()); + tagged.extend_from_slice(room_hash); + tagged.extend_from_slice(data); + transport.send_raw_datagram(&tagged) + .map_err(|e| format!("federation send: {e}")) + } } } @@ -85,17 +107,21 @@ struct Participant { sender: ParticipantSender, fingerprint: Option, alias: Option, + origin: ParticipantOrigin, } /// A room holding multiple participants. struct Room { participants: Vec, + /// Remote participants from federated peers (for merged RoomUpdate). + federated_participants: HashMap>, } impl Room { fn new() -> Self { Self { participants: Vec::new(), + federated_participants: HashMap::new(), } } @@ -105,10 +131,11 @@ impl Room { sender: ParticipantSender, fingerprint: Option, alias: Option, + origin: ParticipantOrigin, ) -> ParticipantId { let id = next_id(); - info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room"); - self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias }); + info!(room_size = self.participants.len() + 1, participant = id, %addr, ?origin, "joined room"); + self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias, origin }); id } @@ -125,15 +152,38 @@ impl Room { .collect() } - /// Build a RoomUpdate participant list. - fn participant_list(&self) -> Vec { + /// Get senders with loop prevention for federation. + /// + /// - Media from a **local** participant → send to ALL others (local + federated) + /// - Media from a **federated** participant → send to LOCAL participants only + /// (the source relay already forwarded to its own locals and other peers) + fn others_for_origin(&self, exclude_id: ParticipantId, source_origin: &ParticipantOrigin) -> Vec { self.participants .iter() + .filter(|p| p.id != exclude_id) + .filter(|p| match source_origin { + ParticipantOrigin::Local => true, + ParticipantOrigin::Federated { .. } => p.origin == ParticipantOrigin::Local, + }) + .map(|p| p.sender.clone()) + .collect() + } + + /// Build a RoomUpdate participant list (local + federated). + fn participant_list(&self) -> Vec { + let mut list: Vec<_> = self.participants + .iter() + .filter(|p| p.origin == ParticipantOrigin::Local) .map(|p| wzp_proto::packet::RoomParticipant { fingerprint: p.fingerprint.clone().unwrap_or_default(), alias: p.alias.clone(), }) - .collect() + .collect(); + // Merge federated participants from all peer relays + for remote in self.federated_participants.values() { + list.extend(remote.iter().cloned()); + } + list } /// Get all senders (for broadcasting to everyone including the joiner). @@ -214,7 +264,7 @@ impl RoomManager { return Err("not authorized for this room".to_string()); } let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); - let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string())); + let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()), ParticipantOrigin::Local); let update = wzp_proto::SignalMessage::RoomUpdate { count: room.len() as u32, participants: room.participant_list(), @@ -235,6 +285,83 @@ impl RoomManager { Ok(id) } + /// Join a room as a federated virtual participant. + pub fn join_federated( + &mut self, + room_name: &str, + relay_addr: std::net::SocketAddr, + sender: ParticipantSender, + remote_participants: Vec, + ) -> (ParticipantId, wzp_proto::SignalMessage, Vec) { + let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); + room.federated_participants.insert(relay_addr, remote_participants); + let id = room.add( + relay_addr, sender, None, Some("(federated)".to_string()), + ParticipantOrigin::Federated { relay_addr }, + ); + let update = wzp_proto::SignalMessage::RoomUpdate { + count: room.len() as u32, + participants: room.participant_list(), + }; + let senders = room.all_senders(); + (id, update, senders) + } + + /// Update federated participant list for a room (from FederationParticipantUpdate). + pub fn update_federated_participants( + &mut self, + room_name: &str, + relay_addr: std::net::SocketAddr, + participants: Vec, + ) -> Option<(wzp_proto::SignalMessage, Vec)> { + if let Some(room) = self.rooms.get_mut(room_name) { + room.federated_participants.insert(relay_addr, participants); + let update = wzp_proto::SignalMessage::RoomUpdate { + count: room.len() as u32, + participants: room.participant_list(), + }; + let senders = room.all_senders(); + Some((update, senders)) + } else { + None + } + } + + /// Get the origin of a participant by ID. + pub fn participant_origin(&self, room_name: &str, participant_id: ParticipantId) -> Option { + self.rooms.get(room_name) + .and_then(|room| room.participants.iter().find(|p| p.id == participant_id)) + .map(|p| p.origin.clone()) + } + + /// Get list of active room names (for federation room announcements). + pub fn active_rooms(&self) -> Vec { + self.rooms.keys().cloned().collect() + } + + /// Get local participant list for a room (excludes federated virtual participants). + pub fn local_participants(&self, room_name: &str) -> Vec { + self.rooms.get(room_name) + .map(|room| room.participants.iter() + .filter(|p| p.origin == ParticipantOrigin::Local) + .map(|p| wzp_proto::packet::RoomParticipant { + fingerprint: p.fingerprint.clone().unwrap_or_default(), + alias: p.alias.clone(), + }) + .collect()) + .unwrap_or_default() + } + + /// Get senders for local-only participants in a room (for federation inbound media). + pub fn local_senders(&self, room_name: &str) -> Vec { + self.rooms.get(room_name) + .map(|room| room.participants.iter() + .filter(|p| p.origin == ParticipantOrigin::Local) + .map(|p| p.sender.clone()) + .collect()) + .unwrap_or_default() + } + /// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty. pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec)> { if let Some(room) = self.rooms.get_mut(room_name) { @@ -467,6 +594,19 @@ async fn run_participant_plain( ParticipantSender::WebSocket(_) => { let _ = other.send_raw(&pkt.payload).await; } + ParticipantSender::Federation { transport, room_hash } => { + // Send room-tagged datagram to federated peer + let data = pkt.to_bytes(); + let mut tagged = Vec::with_capacity(8 + data.len()); + tagged.extend_from_slice(room_hash); + tagged.extend_from_slice(&data); + if let Err(e) = transport.send_raw_datagram(&tagged) { + send_errors += 1; + if send_errors <= 5 { + warn!(room = %room_name, "federation forward error: {e}"); + } + } + } } } let fwd_ms = fwd_start.elapsed().as_millis() as u64; @@ -634,6 +774,13 @@ async fn run_participant_trunked( ParticipantSender::WebSocket(_) => { let _ = other.send_raw(&pkt.payload).await; } + ParticipantSender::Federation { transport, room_hash } => { + let data = pkt.to_bytes(); + let mut tagged = Vec::with_capacity(8 + data.len()); + tagged.extend_from_slice(room_hash); + tagged.extend_from_slice(&data); + let _ = transport.send_raw_datagram(&tagged); + } } } let fwd_ms = fwd_start.elapsed().as_millis() as u64; diff --git a/crates/wzp-transport/src/quic.rs b/crates/wzp-transport/src/quic.rs index 40c0cea..580d118 100644 --- a/crates/wzp-transport/src/quic.rs +++ b/crates/wzp-transport/src/quic.rs @@ -33,6 +33,13 @@ impl QuinnTransport { &self.connection } + /// Send raw bytes as a QUIC datagram (no MediaPacket framing). + pub fn send_raw_datagram(&self, data: &[u8]) -> Result<(), TransportError> { + self.connection + .send_datagram(bytes::Bytes::copy_from_slice(data)) + .map_err(|e| TransportError::Internal(format!("datagram: {e}"))) + } + /// Close the QUIC connection immediately (synchronous, no async needed). /// The relay will detect the close and remove this participant from the room. pub fn close_now(&self) {