diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index c5f447d..3bc59e0 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -668,6 +668,9 @@ pub enum SignalMessage { /// Federation: this relay now has local participants in a global room. GlobalRoomActive { room: String, + /// Participants on the announcing relay (for federated presence). + #[serde(default)] + participants: Vec, }, /// Federation: this relay's last local participant left a global room. diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 261b480..e8d8902 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -19,7 +19,6 @@ use wzp_proto::{MediaTransport, SignalMessage}; use wzp_transport::QuinnTransport; use crate::config::{PeerConfig, TrustedConfig}; -use crate::metrics::RelayMetrics; use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager}; /// Compute 8-byte room hash for federation datagram tagging. @@ -113,6 +112,8 @@ struct PeerLink { label: String, /// Global rooms that this peer has reported as active. active_rooms: HashSet, + /// Remote participants per room (for federated presence in RoomUpdate). + remote_participants: HashMap>, } /// Max federation packets per second per room (0 = unlimited). @@ -130,8 +131,6 @@ pub struct FederationManager { local_tls_fp: String, /// Active peer connections, keyed by normalized fingerprint. peer_links: Arc>>, - /// Prometheus metrics. - metrics: Arc, /// Dedup filter for incoming federation datagrams. dedup: Mutex, /// Per-room rate limiters for inbound federation media. @@ -146,7 +145,6 @@ impl FederationManager { room_mgr: Arc>, endpoint: quinn::Endpoint, local_tls_fp: String, - metrics: Arc, ) -> Self { Self { peers, @@ -156,7 +154,6 @@ impl FederationManager { endpoint, local_tls_fp, peer_links: Arc::new(Mutex::new(HashMap::new())), - metrics, dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), rate_limiters: Mutex::new(HashMap::new()), } @@ -255,8 +252,6 @@ impl FederationManager { tagged.extend_from_slice(media_data); match link.transport.send_raw_datagram(&tagged) { Ok(()) => { - self.metrics.federation_packets_forwarded - .with_label_values(&[&link.label, "out"]).inc(); } Err(e) => warn!(peer = %link.label, "federation send error: {e}"), } @@ -322,8 +317,12 @@ async fn run_room_event_dispatcher( match events.recv().await { Ok(RoomEvent::LocalJoin { room }) => { if fm.is_global_room(&room) { - info!(room = %room, "global room now active, announcing to peers"); - let msg = SignalMessage::GlobalRoomActive { room }; + let participants = { + let mgr = fm.room_mgr.lock().await; + mgr.local_participant_list(&room) + }; + info!(room = %room, count = participants.len(), "global room now active, announcing to peers"); + let msg = SignalMessage::GlobalRoomActive { room, participants }; let links = fm.peer_links.lock().await; for link in links.values() { let _ = link.transport.send_signal(&msg).await; @@ -400,16 +399,15 @@ async fn run_federation_link( peer_fp: String, peer_label: String, ) -> Result<(), anyhow::Error> { - // Register peer link + metrics + // Register peer link { let mut links = fm.peer_links.lock().await; links.insert(peer_fp.clone(), PeerLink { transport: transport.clone(), label: peer_label.clone(), active_rooms: HashSet::new(), + remote_participants: HashMap::new(), }); - fm.metrics.federation_peer_status - .with_label_values(&[&peer_label]).set(1); } // Announce our currently active global rooms @@ -417,7 +415,8 @@ async fn run_federation_link( let mgr = fm.room_mgr.lock().await; for room_name in mgr.active_rooms() { if fm.is_global_room(&room_name) { - let msg = SignalMessage::GlobalRoomActive { room: room_name }; + let participants = mgr.local_participant_list(&room_name); + let msg = SignalMessage::GlobalRoomActive { room: room_name, participants }; let _ = transport.send_signal(&msg).await; } } @@ -460,8 +459,6 @@ async fn run_federation_link( if media_count == 1 || media_count % 250 == 0 { info!(peer = %peer_label_media, media_count, len = data.len(), "federation: received datagram"); } - fm_media.metrics.federation_packets_forwarded - .with_label_values(&[&peer_label_media, "in"]).inc(); handle_datagram(&fm_media, &peer_fp_media, data).await; } Err(e) => { @@ -477,8 +474,6 @@ async fn run_federation_link( loop { tokio::time::sleep(Duration::from_secs(5)).await; let rtt_ms = rtt_transport.connection().stats().path.rtt.as_millis() as f64; - fm_rtt.metrics.federation_peer_rtt_ms - .with_label_values(&[&label_rtt]).set(rtt_ms); } }; @@ -488,12 +483,10 @@ async fn run_federation_link( _ = rtt_task => {} } - // Cleanup: remove peer link + metrics + // Cleanup: remove peer link { let mut links = fm.peer_links.lock().await; links.remove(&peer_fp); - fm.metrics.federation_peer_status - .with_label_values(&[&peer_label]).set(0); } info!(peer = %peer_label, "federation link ended"); @@ -508,21 +501,53 @@ async fn handle_signal( msg: SignalMessage, ) { match msg { - SignalMessage::GlobalRoomActive { room } => { + SignalMessage::GlobalRoomActive { room, participants } => { if fm.is_global_room(&room) { - info!(peer = %peer_label, room = %room, "peer has global room active"); + info!(peer = %peer_label, room = %room, remote_participants = participants.len(), "peer has global room active"); let mut links = fm.peer_links.lock().await; if let Some(link) = links.get_mut(peer_fp) { link.active_rooms.insert(room.clone()); + link.remote_participants.insert(room.clone(), participants.clone()); } - // Update active rooms gauge - let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); - fm.metrics.federation_active_rooms.set(total as i64); - // Propagate: tell all OTHER peers this room is routable through us. - // This enables multi-hop: A→B→C where B relays A's announcement to C and vice versa. + // Propagate to other peers for (fp, link) in links.iter() { if fp != peer_fp { - let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive { room: room.clone() }).await; + let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive { + room: room.clone(), + participants: participants.clone(), + }).await; + } + } + drop(links); + + // Broadcast updated RoomUpdate to local clients in this room + // Find the local room name (may be hashed or raw) + let mgr = fm.room_mgr.lock().await; + for local_room in mgr.active_rooms() { + if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { + // Build merged participant list: local + all remote + let mut all_participants = mgr.local_participant_list(&local_room); + let links = fm.peer_links.lock().await; + for link in links.values() { + if let Some(canonical) = fm.resolve_global_room(&local_room) { + if let Some(remote) = link.remote_participants.get(canonical) { + all_participants.extend(remote.iter().cloned()); + } + // Also check raw room name + if let Some(remote) = link.remote_participants.get(&local_room) { + all_participants.extend(remote.iter().cloned()); + } + } + } + let update = SignalMessage::RoomUpdate { + count: all_participants.len() as u32, + participants: all_participants, + }; + let senders = mgr.local_senders(&local_room); + drop(links); + drop(mgr); + room::broadcast_signal(&senders, &update).await; + break; } } } @@ -535,7 +560,6 @@ async fn handle_signal( } // Update active rooms gauge let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); - fm.metrics.federation_active_rooms.set(total as i64); // Check if any other peer still has this room — if none, propagate inactive let any_other_active = links.iter() .any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room)); @@ -576,7 +600,6 @@ async fn handle_datagram( { let mut dedup = fm.dedup.lock().await; if dedup.is_dup(&rh, pkt.header.seq) { - fm.metrics.federation_packets_deduped.inc(); return; } } @@ -602,7 +625,6 @@ async fn handle_datagram( let limiter = limiters.entry(room_name.clone()) .or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS)); if !limiter.allow() { - fm.metrics.federation_packets_rate_limited.inc(); return; } } diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 75de5c8..a6ca877 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -392,7 +392,6 @@ async fn main() -> anyhow::Result<()> { room_mgr.clone(), endpoint.clone(), tls_fp.clone(), - metrics.clone(), )); let fm_run = fm.clone(); tokio::spawn(async move { fm_run.run().await }); diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 9fbd1a9..21cc643 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -304,6 +304,13 @@ impl RoomManager { self.rooms.keys().cloned().collect() } + /// Get participant list for a room (fingerprint + alias). + pub fn local_participant_list(&self, room_name: &str) -> Vec { + self.rooms.get(room_name) + .map(|room| room.participant_list()) + .unwrap_or_default() + } + /// Get all senders for participants in a room (for federation inbound media delivery). pub fn local_senders(&self, room_name: &str) -> Vec { self.rooms.get(room_name)