diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index e8d8902..6587b8e 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -129,6 +129,7 @@ pub struct FederationManager { room_mgr: Arc>, endpoint: quinn::Endpoint, local_tls_fp: String, + metrics: Arc, /// Active peer connections, keyed by normalized fingerprint. peer_links: Arc>>, /// Dedup filter for incoming federation datagrams. @@ -145,6 +146,7 @@ impl FederationManager { room_mgr: Arc>, endpoint: quinn::Endpoint, local_tls_fp: String, + metrics: Arc, ) -> Self { Self { peers, @@ -153,6 +155,7 @@ impl FederationManager { room_mgr, endpoint, local_tls_fp, + metrics, peer_links: Arc::new(Mutex::new(HashMap::new())), dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), rate_limiters: Mutex::new(HashMap::new()), @@ -252,6 +255,8 @@ impl FederationManager { tagged.extend_from_slice(media_data); match link.transport.send_raw_datagram(&tagged) { Ok(()) => { + self.metrics.federation_packets_forwarded + .with_label_values(&[&link.label, "out"]).inc(); } Err(e) => warn!(peer = %link.label, "federation send error: {e}"), } @@ -399,7 +404,8 @@ async fn run_federation_link( peer_fp: String, peer_label: String, ) -> Result<(), anyhow::Error> { - // Register peer link + // Register peer link + metrics + fm.metrics.federation_peer_status.with_label_values(&[&peer_label]).set(1); { let mut links = fm.peer_links.lock().await; links.insert(peer_fp.clone(), PeerLink { @@ -483,7 +489,8 @@ async fn run_federation_link( _ = rtt_task => {} } - // Cleanup: remove peer link + // Cleanup: remove peer link + metrics + fm.metrics.federation_peer_status.with_label_values(&[&peer_label]).set(0); { let mut links = fm.peer_links.lock().await; links.remove(&peer_fp); @@ -507,6 +514,11 @@ async fn handle_signal( let mut links = fm.peer_links.lock().await; if let Some(link) = links.get_mut(peer_fp) { link.active_rooms.insert(room.clone()); + } + // Update active rooms metric + let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); + fm.metrics.federation_active_rooms.set(total as i64); + if let Some(link) = links.get_mut(peer_fp) { link.remote_participants.insert(room.clone(), participants.clone()); } // Propagate to other peers @@ -596,6 +608,10 @@ async fn handle_datagram( None => return, }; + // Count inbound federation packet + fm.metrics.federation_packets_forwarded + .with_label_values(&[source_peer_fp, "in"]).inc(); + // Dedup: drop packets we've already seen (multi-path duplicates) { let mut dedup = fm.dedup.lock().await; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index a6ca877..75de5c8 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -392,6 +392,7 @@ async fn main() -> anyhow::Result<()> { room_mgr.clone(), endpoint.clone(), tls_fp.clone(), + metrics.clone(), )); let fm_run = fm.clone(); tokio::spawn(async move { fm_run.run().await });