feat: federation media forwarding WORKING — global rooms router model complete
2-relay test: 5.0s audio, RMS 4748, PASS. Full pipeline verified: - Room correctly identified as global (hash matching works) - Federation egress channel created and connected - GlobalRoomActive signals exchanged between peers - 300 packets (250 source + 50 FEC) forwarded via tagged datagrams - Client B on relay B received full 5-second tone from client A on relay A Added debug logging: is_global check, egress channel creation, per-peer forwarding with active_rooms diagnostic when no match found. Also logs egress packet count (first + every 250th). Multi-hop propagation: GlobalRoomActive signals forwarded to other peers so A→B→C chain knows about rooms across the full mesh. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -144,14 +144,25 @@ impl FederationManager {
|
|||||||
if links.is_empty() {
|
if links.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for link in links.values() {
|
let mut sent = 0u32;
|
||||||
|
for (fp, link) in links.iter() {
|
||||||
if link.active_rooms.contains(room_name) {
|
if link.active_rooms.contains(room_name) {
|
||||||
let mut tagged = Vec::with_capacity(8 + media_data.len());
|
let mut tagged = Vec::with_capacity(8 + media_data.len());
|
||||||
tagged.extend_from_slice(room_hash);
|
tagged.extend_from_slice(room_hash);
|
||||||
tagged.extend_from_slice(media_data);
|
tagged.extend_from_slice(media_data);
|
||||||
let _ = link.transport.send_raw_datagram(&tagged);
|
match link.transport.send_raw_datagram(&tagged) {
|
||||||
|
Ok(()) => sent += 1,
|
||||||
|
Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if sent == 0 && !links.is_empty() {
|
||||||
|
// Debug: no peer had this room active
|
||||||
|
let active_rooms: Vec<_> = links.values()
|
||||||
|
.flat_map(|l| l.active_rooms.iter().cloned())
|
||||||
|
.collect();
|
||||||
|
warn!(room = %room_name, peer_count = links.len(), ?active_rooms, "no peer has this room active");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Trust verification (kept from previous implementation) ──
|
// ── Trust verification (kept from previous implementation) ──
|
||||||
@@ -191,9 +202,15 @@ pub async fn run_federation_media_egress(
|
|||||||
fm: Arc<FederationManager>,
|
fm: Arc<FederationManager>,
|
||||||
mut rx: tokio::sync::mpsc::Receiver<FederationMediaOut>,
|
mut rx: tokio::sync::mpsc::Receiver<FederationMediaOut>,
|
||||||
) {
|
) {
|
||||||
|
let mut count: u64 = 0;
|
||||||
while let Some(out) = rx.recv().await {
|
while let Some(out) = rx.recv().await {
|
||||||
|
count += 1;
|
||||||
|
if count == 1 || count % 250 == 0 {
|
||||||
|
info!(room = %out.room_name, count, "federation egress: forwarding media");
|
||||||
|
}
|
||||||
fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data).await;
|
fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data).await;
|
||||||
}
|
}
|
||||||
|
info!(total = count, "federation egress task ended");
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Room event dispatcher ──
|
// ── Room event dispatcher ──
|
||||||
|
|||||||
@@ -721,12 +721,15 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.collect();
|
.collect();
|
||||||
// Set up federation media channel if this is a global room
|
// Set up federation media channel if this is a global room
|
||||||
let federation_tx = if let Some(ref fm) = federation_mgr {
|
let federation_tx = if let Some(ref fm) = federation_mgr {
|
||||||
if fm.is_global_room(&room_name) {
|
let is_global = fm.is_global_room(&room_name);
|
||||||
|
info!(room = %room_name, is_global, "checking if room is global for federation");
|
||||||
|
if is_global {
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(256);
|
let (tx, rx) = tokio::sync::mpsc::channel(256);
|
||||||
let fm_clone = fm.clone();
|
let fm_clone = fm.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await;
|
wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await;
|
||||||
});
|
});
|
||||||
|
info!(room = %room_name, "federation media egress channel created");
|
||||||
Some(tx)
|
Some(tx)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
|||||||
Reference in New Issue
Block a user