diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 7ac1fb0..d66d155 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -144,14 +144,25 @@ impl FederationManager { if links.is_empty() { return; } - for link in links.values() { + let mut sent = 0u32; + for (fp, link) in links.iter() { if link.active_rooms.contains(room_name) { let mut tagged = Vec::with_capacity(8 + media_data.len()); tagged.extend_from_slice(room_hash); tagged.extend_from_slice(media_data); - let _ = link.transport.send_raw_datagram(&tagged); + match link.transport.send_raw_datagram(&tagged) { + Ok(()) => sent += 1, + Err(e) => warn!(peer = %link.label, "federation send error: {e}"), + } } } + if sent == 0 && !links.is_empty() { + // Debug: no peer had this room active + let active_rooms: Vec<_> = links.values() + .flat_map(|l| l.active_rooms.iter().cloned()) + .collect(); + warn!(room = %room_name, peer_count = links.len(), ?active_rooms, "no peer has this room active"); + } } // ── Trust verification (kept from previous implementation) ── @@ -191,9 +202,15 @@ pub async fn run_federation_media_egress( fm: Arc, mut rx: tokio::sync::mpsc::Receiver, ) { + let mut count: u64 = 0; while let Some(out) = rx.recv().await { + count += 1; + if count == 1 || count % 250 == 0 { + info!(room = %out.room_name, count, "federation egress: forwarding media"); + } fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data).await; } + info!(total = count, "federation egress task ended"); } // ── Room event dispatcher ── diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 4be8e4a..9a18ea9 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -721,12 +721,15 @@ async fn main() -> anyhow::Result<()> { .collect(); // Set up federation media channel if this is a global room let federation_tx = if let Some(ref fm) = federation_mgr { - if fm.is_global_room(&room_name) { + let is_global = fm.is_global_room(&room_name); + info!(room = %room_name, is_global, "checking if room is global for federation"); + if is_global { let (tx, rx) = tokio::sync::mpsc::channel(256); let fm_clone = fm.clone(); tokio::spawn(async move { wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await; }); + info!(room = %room_name, "federation media egress channel created"); Some(tx) } else { None