From e50925e05aaf1edb429077a0093a0200451805c6 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 8 Apr 2026 05:49:37 +0400 Subject: [PATCH] fix: IP-based peer matching for inbound federation + room announcements - Inbound federation connections now matched by source IP against configured peer URLs (QUIC clients don't present TLS certs, so fingerprint matching fails for inbound direction). - Added periodic room announcement task (1s poll) that sends FederationRoomJoin to peers when new rooms appear with local participants. Handles rooms created after federation link is up. - Added find_peer_by_addr() to FederationManager. Federation link topology: each relay pair has 2 connections (outbound from each side). Outbound sends signals, peer's inbound receives them. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-relay/src/federation.rs | 50 +++++++++++++++++++++++++++++- crates/wzp-relay/src/main.rs | 15 ++------- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index a51dd31..e34f5fa 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -87,6 +87,17 @@ impl FederationManager { pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> { self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp)) } + + /// Find a configured peer by source IP address. + /// Used for inbound connections where the client doesn't present a TLS cert. + pub fn find_peer_by_addr(&self, addr: SocketAddr) -> Option<&PeerConfig> { + let addr_ip = addr.ip(); + self.peers.iter().find(|p| { + p.url.parse::() + .map(|sa| sa.ip() == addr_ip) + .unwrap_or(false) + }) + } } /// Normalize a fingerprint string (remove colons, lowercase). @@ -156,12 +167,15 @@ async fn run_federation_link( // Map room_hash -> room_name for incoming media demux let mut hash_to_room: HashMap<[u8; 8], String> = HashMap::new(); - // Run two tasks: recv signals + recv media datagrams + // Run three tasks: recv signals + recv media + periodic room announcements let signal_transport = transport.clone(); let media_transport = transport.clone(); + let announce_transport = transport.clone(); let fm_signal = fm.clone(); let fm_media = fm.clone(); + let fm_announce = fm.clone(); let peer_label = peer.label.clone().unwrap_or_else(|| peer.url.clone()); + let peer_label2 = peer_label.clone(); let signal_task = async move { loop { @@ -275,9 +289,43 @@ async fn run_federation_link( } }; + // Periodically announce new local rooms to the peer + let announce_task = async move { + let mut announced: std::collections::HashSet = std::collections::HashSet::new(); + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let rooms = { + let mgr = fm_announce.room_mgr.lock().await; + mgr.active_rooms() + }; + for room_name in &rooms { + if !announced.contains(room_name) { + let participants = { + let mgr = fm_announce.room_mgr.lock().await; + mgr.local_participants(room_name) + }; + if participants.is_empty() { + continue; // only virtual participants, skip + } + let msg = SignalMessage::FederationRoomJoin { + room: room_name.clone(), + participants, + }; + if announce_transport.send_signal(&msg).await.is_ok() { + info!(peer = %peer_label2, room = %room_name, "federation: announced room to peer"); + announced.insert(room_name.clone()); + } + } + } + // Remove rooms that no longer exist + announced.retain(|r| rooms.contains(r)); + } + }; + tokio::select! { _ = signal_task => {} _ = media_task => {} + _ = announce_task => {} } Ok(()) diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index d45ee50..ab8cce1 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -501,16 +501,8 @@ async fn main() -> anyhow::Result<()> { // Federation connections use SNI "_federation" if room_name == "_federation" { if let Some(ref fm) = federation_mgr { - // Check if we recognize this peer by TLS fingerprint - let peer_fp = wzp_transport::tls_fingerprint( - &transport.connection() - .peer_identity() - .and_then(|id| id.downcast::>().ok()) - .and_then(|certs| certs.first().cloned()) - .map(|c| c.to_vec()) - .unwrap_or_default() - ); - if let Some(peer_config) = fm.find_peer_by_fingerprint(&peer_fp) { + // Match inbound peer by source IP (client connections don't present TLS certs) + if let Some(peer_config) = fm.find_peer_by_addr(addr) { let peer_config = peer_config.clone(); let fm = fm.clone(); info!(%addr, label = ?peer_config.label, "inbound federation connection accepted"); @@ -520,12 +512,9 @@ async fn main() -> anyhow::Result<()> { info!(" to accept, add to relay.toml:"); info!(" [[peers]]"); info!(" url = \"{addr}\""); - info!(" fingerprint = \"{peer_fp}\""); - transport.close().await.ok(); } } else { info!(%addr, "federation connection rejected (no peers configured)"); - transport.close().await.ok(); } return; }