diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index aabea26..261b480 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -8,7 +8,7 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use bytes::Bytes; use sha2::{Sha256, Digest}; @@ -19,6 +19,7 @@ use wzp_proto::{MediaTransport, SignalMessage}; use wzp_transport::QuinnTransport; use crate::config::{PeerConfig, TrustedConfig}; +use crate::metrics::RelayMetrics; use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager}; /// Compute 8-byte room hash for federation datagram tagging. @@ -34,6 +35,78 @@ fn normalize_fp(fp: &str) -> String { fp.replace(':', "").to_lowercase() } +/// Sliding-window dedup filter for federation datagrams. +/// Tracks recently seen (room_hash, seq) pairs to discard duplicates +/// arriving via multiple federation paths (e.g., A↔B↔C and A↔C). +struct Deduplicator { + /// Ring buffer of recent packet fingerprints (room_hash XOR'd with seq). + seen: HashSet, + /// Ordered list for eviction. + order: std::collections::VecDeque, + capacity: usize, +} + +impl Deduplicator { + fn new(capacity: usize) -> Self { + Self { + seen: HashSet::with_capacity(capacity), + order: std::collections::VecDeque::with_capacity(capacity), + capacity, + } + } + + /// Returns true if this packet is a duplicate (already seen). + fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16) -> bool { + let key = u64::from_be_bytes(*room_hash) ^ (seq as u64); + if self.seen.contains(&key) { + return true; + } + if self.order.len() >= self.capacity { + if let Some(old) = self.order.pop_front() { + self.seen.remove(&old); + } + } + self.seen.insert(key); + self.order.push_back(key); + false + } +} + +/// Per-room token bucket rate limiter for federation forwarding. +struct RateLimiter { + /// Max packets per second per room. + max_pps: u32, + /// Tokens remaining in current window. + tokens: u32, + /// When the current window started. + window_start: Instant, +} + +impl RateLimiter { + fn new(max_pps: u32) -> Self { + Self { + max_pps, + tokens: max_pps, + window_start: Instant::now(), + } + } + + /// Returns true if the packet should be allowed through. + fn allow(&mut self) -> bool { + let elapsed = self.window_start.elapsed(); + if elapsed >= Duration::from_secs(1) { + self.tokens = self.max_pps; + self.window_start = Instant::now(); + } + if self.tokens > 0 { + self.tokens -= 1; + true + } else { + false + } + } +} + /// Active link to a peer relay. struct PeerLink { transport: Arc, @@ -42,6 +115,11 @@ struct PeerLink { active_rooms: HashSet, } +/// Max federation packets per second per room (0 = unlimited). +const FEDERATION_RATE_LIMIT_PPS: u32 = 500; +/// Dedup window size (number of recent packets to remember). +const DEDUP_WINDOW_SIZE: usize = 4096; + /// Manages federation connections and global room forwarding. pub struct FederationManager { peers: Vec, @@ -52,6 +130,12 @@ pub struct FederationManager { local_tls_fp: String, /// Active peer connections, keyed by normalized fingerprint. peer_links: Arc>>, + /// Prometheus metrics. + metrics: Arc, + /// Dedup filter for incoming federation datagrams. + dedup: Mutex, + /// Per-room rate limiters for inbound federation media. + rate_limiters: Mutex>, } impl FederationManager { @@ -62,6 +146,7 @@ impl FederationManager { room_mgr: Arc>, endpoint: quinn::Endpoint, local_tls_fp: String, + metrics: Arc, ) -> Self { Self { peers, @@ -71,20 +156,38 @@ impl FederationManager { endpoint, local_tls_fp, peer_links: Arc::new(Mutex::new(HashMap::new())), + metrics, + dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), + rate_limiters: Mutex::new(HashMap::new()), } } /// Check if a room name (which may be hashed) is a global room. pub fn is_global_room(&self, room: &str) -> bool { - // Check both the raw name and the hashed version + self.resolve_global_room(room).is_some() + } + + /// Resolve a room name (raw or hashed) to the canonical global room name. + /// Returns the configured global room name if it matches. + pub fn resolve_global_room(&self, room: &str) -> Option<&str> { + // Direct match (raw room name, e.g. Android clients) if self.global_rooms.contains(room) { - return true; + return Some(self.global_rooms.iter().find(|n| n.as_str() == room).unwrap()); } - // The room name in the room manager is the hashed SNI. - // Check if any configured global room hashes to this value. - self.global_rooms.iter().any(|name| { + // Hashed match (desktop clients hash room names for SNI privacy) + self.global_rooms.iter().find(|name| { wzp_crypto::hash_room_name(name) == room - }) + }).map(|s| s.as_str()) + } + + /// Get the canonical federation room hash for a room. + /// Always uses the configured global room name, not the client-provided name. + pub fn global_room_hash(&self, room: &str) -> [u8; 8] { + if let Some(canonical) = self.resolve_global_room(room) { + room_hash(canonical) + } else { + room_hash(room) + } } /// Start federation — spawns connection loops + event dispatcher. @@ -146,18 +249,16 @@ impl FederationManager { if links.is_empty() { return; } - let mut sent = 0u32; - for (fp, link) in links.iter() { - // Send to all connected peers — they have the global room configured - // and will deliver to local participants or forward further - { - let mut tagged = Vec::with_capacity(8 + media_data.len()); - tagged.extend_from_slice(room_hash); - tagged.extend_from_slice(media_data); - match link.transport.send_raw_datagram(&tagged) { - Ok(()) => sent += 1, - Err(e) => warn!(peer = %link.label, "federation send error: {e}"), + for (_fp, link) in links.iter() { + let mut tagged = Vec::with_capacity(8 + media_data.len()); + tagged.extend_from_slice(room_hash); + tagged.extend_from_slice(media_data); + match link.transport.send_raw_datagram(&tagged) { + Ok(()) => { + self.metrics.federation_packets_forwarded + .with_label_values(&[&link.label, "out"]).inc(); } + Err(e) => warn!(peer = %link.label, "federation send error: {e}"), } } } @@ -299,7 +400,7 @@ async fn run_federation_link( peer_fp: String, peer_label: String, ) -> Result<(), anyhow::Error> { - // Register peer link + // Register peer link + metrics { let mut links = fm.peer_links.lock().await; links.insert(peer_fp.clone(), PeerLink { @@ -307,6 +408,8 @@ async fn run_federation_link( label: peer_label.clone(), active_rooms: HashSet::new(), }); + fm.metrics.federation_peer_status + .with_label_values(&[&peer_label]).set(1); } // Announce our currently active global rooms @@ -320,14 +423,17 @@ async fn run_federation_link( } } - // Two concurrent tasks: signal recv + media recv + // Three concurrent tasks: signal recv + media recv + RTT monitor let signal_transport = transport.clone(); let media_transport = transport.clone(); + let rtt_transport = transport.clone(); let fm_signal = fm.clone(); let fm_media = fm.clone(); + let fm_rtt = fm.clone(); let peer_fp_signal = peer_fp.clone(); let peer_fp_media = peer_fp.clone(); let label_signal = peer_label.clone(); + let label_rtt = peer_label.clone(); let signal_task = async move { loop { @@ -354,6 +460,8 @@ async fn run_federation_link( if media_count == 1 || media_count % 250 == 0 { info!(peer = %peer_label_media, media_count, len = data.len(), "federation: received datagram"); } + fm_media.metrics.federation_packets_forwarded + .with_label_values(&[&peer_label_media, "in"]).inc(); handle_datagram(&fm_media, &peer_fp_media, data).await; } Err(e) => { @@ -364,15 +472,28 @@ async fn run_federation_link( } }; + // RTT monitor: periodically sample QUIC RTT for this peer + let rtt_task = async move { + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + let rtt_ms = rtt_transport.connection().stats().path.rtt.as_millis() as f64; + fm_rtt.metrics.federation_peer_rtt_ms + .with_label_values(&[&label_rtt]).set(rtt_ms); + } + }; + tokio::select! { _ = signal_task => {} _ = media_task => {} + _ = rtt_task => {} } - // Cleanup: remove peer link + // Cleanup: remove peer link + metrics { let mut links = fm.peer_links.lock().await; links.remove(&peer_fp); + fm.metrics.federation_peer_status + .with_label_values(&[&peer_label]).set(0); } info!(peer = %peer_label, "federation link ended"); @@ -394,6 +515,9 @@ async fn handle_signal( if let Some(link) = links.get_mut(peer_fp) { link.active_rooms.insert(room.clone()); } + // Update active rooms gauge + let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); + fm.metrics.federation_active_rooms.set(total as i64); // Propagate: tell all OTHER peers this room is routable through us. // This enables multi-hop: A→B→C where B relays A's announcement to C and vice versa. for (fp, link) in links.iter() { @@ -409,6 +533,9 @@ async fn handle_signal( if let Some(link) = links.get_mut(peer_fp) { link.active_rooms.remove(&room); } + // Update active rooms gauge + let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); + fm.metrics.federation_active_rooms.set(total as i64); // Check if any other peer still has this room — if none, propagate inactive let any_other_active = links.iter() .any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room)); @@ -445,10 +572,23 @@ async fn handle_datagram( None => return, }; + // Dedup: drop packets we've already seen (multi-path duplicates) + { + let mut dedup = fm.dedup.lock().await; + if dedup.is_dup(&rh, pkt.header.seq) { + fm.metrics.federation_packets_deduped.inc(); + return; + } + } + // Find room by hash let room_name = { let mgr = fm.room_mgr.lock().await; - mgr.active_rooms().into_iter().find(|r| room_hash(r) == rh) + { + let active = mgr.active_rooms(); + active.iter().find(|r| room_hash(r) == rh).cloned() + .or_else(|| active.iter().find(|r| fm.global_room_hash(r) == rh).cloned()) + } }; let room_name = match room_name { @@ -456,6 +596,17 @@ async fn handle_datagram( None => return, // room not active locally }; + // Rate limit per room + if FEDERATION_RATE_LIMIT_PPS > 0 { + let mut limiters = fm.rate_limiters.lock().await; + let limiter = limiters.entry(room_name.clone()) + .or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS)); + if !limiter.allow() { + fm.metrics.federation_packets_rate_limited.inc(); + return; + } + } + // Deliver to all local participants let locals = { let mgr = fm.room_mgr.lock().await; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index d439440..75de5c8 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -392,6 +392,7 @@ async fn main() -> anyhow::Result<()> { room_mgr.clone(), endpoint.clone(), tls_fp.clone(), + metrics.clone(), )); let fm_run = fm.clone(); tokio::spawn(async move { fm_run.run().await }); @@ -759,22 +760,22 @@ async fn main() -> anyhow::Result<()> { .map(|b| format!("{b:02x}")) .collect(); // Set up federation media channel if this is a global room - let federation_tx = if let Some(ref fm) = federation_mgr { + let (federation_tx, federation_room_hash) = if let Some(ref fm) = federation_mgr { let is_global = fm.is_global_room(&room_name); - info!(room = %room_name, is_global, "checking if room is global for federation"); if is_global { + let canonical_hash = fm.global_room_hash(&room_name); let (tx, rx) = tokio::sync::mpsc::channel(256); let fm_clone = fm.clone(); tokio::spawn(async move { wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await; }); - info!(room = %room_name, "federation media egress channel created"); - Some(tx) + info!(room = %room_name, canonical = ?fm.resolve_global_room(&room_name), "federation egress created (global room)"); + (Some(tx), Some(canonical_hash)) } else { - None + (None, None) } } else { - None + (None, None) }; room::run_participant( @@ -787,6 +788,7 @@ async fn main() -> anyhow::Result<()> { trunking_enabled, debug_tap, federation_tx, + federation_room_hash, ).await; // Participant disconnected — clean up presence + per-session metrics diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 9fb84f7..9fbd1a9 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -431,6 +431,7 @@ pub async fn run_participant( trunking_enabled: bool, debug_tap: Option, federation_tx: Option>, + federation_room_hash: Option<[u8; 8]>, ) { if trunking_enabled { run_participant_trunked( @@ -439,7 +440,7 @@ pub async fn run_participant( .await; } else { run_participant_plain( - room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx, + room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx, federation_room_hash, ) .await; } @@ -455,6 +456,7 @@ async fn run_participant_plain( session_id: &str, debug_tap: Option, federation_tx: Option>, + federation_room_hash: Option<[u8; 8]>, ) { let addr = transport.connection().remote_address(); let mut packets_forwarded = 0u64; @@ -565,7 +567,7 @@ async fn run_participant_plain( let data = pkt.to_bytes(); let _ = fed_tx.try_send(FederationMediaOut { room_name: room_name.clone(), - room_hash: crate::federation::room_hash(&room_name), + room_hash: federation_room_hash.unwrap_or_else(|| crate::federation::room_hash(&room_name)), data, }); }