feat: federation Prometheus metrics — peer status, packets, active rooms
Some checks failed
Mirror to GitHub / mirror (push) Failing after 35s
Build Release Binaries / build-amd64 (push) Failing after 2m8s

Wires up the existing RelayMetrics federation fields:
- wzp_federation_peer_status{peer} — 1=connected, 0=disconnected
- wzp_federation_packets_forwarded_total{peer,direction} — in/out counts
- wzp_federation_active_rooms — number of active federated rooms

These are critical for monitoring federation health and will feed into
the adaptive codec selection system (PRD-coordinated-codec.md).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 11:00:13 +04:00
parent 8080713098
commit ff6d0444c0
2 changed files with 19 additions and 2 deletions

View File

@@ -129,6 +129,7 @@ pub struct FederationManager {
room_mgr: Arc<Mutex<RoomManager>>, room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint, endpoint: quinn::Endpoint,
local_tls_fp: String, local_tls_fp: String,
metrics: Arc<crate::metrics::RelayMetrics>,
/// Active peer connections, keyed by normalized fingerprint. /// Active peer connections, keyed by normalized fingerprint.
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>, peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
/// Dedup filter for incoming federation datagrams. /// Dedup filter for incoming federation datagrams.
@@ -145,6 +146,7 @@ impl FederationManager {
room_mgr: Arc<Mutex<RoomManager>>, room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint, endpoint: quinn::Endpoint,
local_tls_fp: String, local_tls_fp: String,
metrics: Arc<crate::metrics::RelayMetrics>,
) -> Self { ) -> Self {
Self { Self {
peers, peers,
@@ -153,6 +155,7 @@ impl FederationManager {
room_mgr, room_mgr,
endpoint, endpoint,
local_tls_fp, local_tls_fp,
metrics,
peer_links: Arc::new(Mutex::new(HashMap::new())), peer_links: Arc::new(Mutex::new(HashMap::new())),
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
rate_limiters: Mutex::new(HashMap::new()), rate_limiters: Mutex::new(HashMap::new()),
@@ -252,6 +255,8 @@ impl FederationManager {
tagged.extend_from_slice(media_data); tagged.extend_from_slice(media_data);
match link.transport.send_raw_datagram(&tagged) { match link.transport.send_raw_datagram(&tagged) {
Ok(()) => { Ok(()) => {
self.metrics.federation_packets_forwarded
.with_label_values(&[&link.label, "out"]).inc();
} }
Err(e) => warn!(peer = %link.label, "federation send error: {e}"), Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
} }
@@ -399,7 +404,8 @@ async fn run_federation_link(
peer_fp: String, peer_fp: String,
peer_label: String, peer_label: String,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
// Register peer link // Register peer link + metrics
fm.metrics.federation_peer_status.with_label_values(&[&peer_label]).set(1);
{ {
let mut links = fm.peer_links.lock().await; let mut links = fm.peer_links.lock().await;
links.insert(peer_fp.clone(), PeerLink { links.insert(peer_fp.clone(), PeerLink {
@@ -483,7 +489,8 @@ async fn run_federation_link(
_ = rtt_task => {} _ = rtt_task => {}
} }
// Cleanup: remove peer link // Cleanup: remove peer link + metrics
fm.metrics.federation_peer_status.with_label_values(&[&peer_label]).set(0);
{ {
let mut links = fm.peer_links.lock().await; let mut links = fm.peer_links.lock().await;
links.remove(&peer_fp); links.remove(&peer_fp);
@@ -507,6 +514,11 @@ async fn handle_signal(
let mut links = fm.peer_links.lock().await; let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(peer_fp) { if let Some(link) = links.get_mut(peer_fp) {
link.active_rooms.insert(room.clone()); link.active_rooms.insert(room.clone());
}
// Update active rooms metric
let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
fm.metrics.federation_active_rooms.set(total as i64);
if let Some(link) = links.get_mut(peer_fp) {
link.remote_participants.insert(room.clone(), participants.clone()); link.remote_participants.insert(room.clone(), participants.clone());
} }
// Propagate to other peers // Propagate to other peers
@@ -596,6 +608,10 @@ async fn handle_datagram(
None => return, None => return,
}; };
// Count inbound federation packet
fm.metrics.federation_packets_forwarded
.with_label_values(&[source_peer_fp, "in"]).inc();
// Dedup: drop packets we've already seen (multi-path duplicates) // Dedup: drop packets we've already seen (multi-path duplicates)
{ {
let mut dedup = fm.dedup.lock().await; let mut dedup = fm.dedup.lock().await;

View File

@@ -392,6 +392,7 @@ async fn main() -> anyhow::Result<()> {
room_mgr.clone(), room_mgr.clone(),
endpoint.clone(), endpoint.clone(),
tls_fp.clone(), tls_fp.clone(),
metrics.clone(),
)); ));
let fm_run = fm.clone(); let fm_run = fm.clone();
tokio::spawn(async move { fm_run.run().await }); tokio::spawn(async move { fm_run.run().await });