fix: IP-based peer matching for inbound federation + room announcements
- Inbound federation connections now matched by source IP against configured peer URLs (QUIC clients don't present TLS certs, so fingerprint matching fails for inbound direction). - Added periodic room announcement task (1s poll) that sends FederationRoomJoin to peers when new rooms appear with local participants. Handles rooms created after federation link is up. - Added find_peer_by_addr() to FederationManager. Federation link topology: each relay pair has 2 connections (outbound from each side). Outbound sends signals, peer's inbound receives them. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -87,6 +87,17 @@ impl FederationManager {
|
||||
pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> {
|
||||
self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp))
|
||||
}
|
||||
|
||||
/// Find a configured peer by source IP address.
|
||||
/// Used for inbound connections where the client doesn't present a TLS cert.
|
||||
pub fn find_peer_by_addr(&self, addr: SocketAddr) -> Option<&PeerConfig> {
|
||||
let addr_ip = addr.ip();
|
||||
self.peers.iter().find(|p| {
|
||||
p.url.parse::<SocketAddr>()
|
||||
.map(|sa| sa.ip() == addr_ip)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Normalize a fingerprint string (remove colons, lowercase).
|
||||
@@ -156,12 +167,15 @@ async fn run_federation_link(
|
||||
// Map room_hash -> room_name for incoming media demux
|
||||
let mut hash_to_room: HashMap<[u8; 8], String> = HashMap::new();
|
||||
|
||||
// Run two tasks: recv signals + recv media datagrams
|
||||
// Run three tasks: recv signals + recv media + periodic room announcements
|
||||
let signal_transport = transport.clone();
|
||||
let media_transport = transport.clone();
|
||||
let announce_transport = transport.clone();
|
||||
let fm_signal = fm.clone();
|
||||
let fm_media = fm.clone();
|
||||
let fm_announce = fm.clone();
|
||||
let peer_label = peer.label.clone().unwrap_or_else(|| peer.url.clone());
|
||||
let peer_label2 = peer_label.clone();
|
||||
|
||||
let signal_task = async move {
|
||||
loop {
|
||||
@@ -275,9 +289,43 @@ async fn run_federation_link(
|
||||
}
|
||||
};
|
||||
|
||||
// Periodically announce new local rooms to the peer
|
||||
let announce_task = async move {
|
||||
let mut announced: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let rooms = {
|
||||
let mgr = fm_announce.room_mgr.lock().await;
|
||||
mgr.active_rooms()
|
||||
};
|
||||
for room_name in &rooms {
|
||||
if !announced.contains(room_name) {
|
||||
let participants = {
|
||||
let mgr = fm_announce.room_mgr.lock().await;
|
||||
mgr.local_participants(room_name)
|
||||
};
|
||||
if participants.is_empty() {
|
||||
continue; // only virtual participants, skip
|
||||
}
|
||||
let msg = SignalMessage::FederationRoomJoin {
|
||||
room: room_name.clone(),
|
||||
participants,
|
||||
};
|
||||
if announce_transport.send_signal(&msg).await.is_ok() {
|
||||
info!(peer = %peer_label2, room = %room_name, "federation: announced room to peer");
|
||||
announced.insert(room_name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Remove rooms that no longer exist
|
||||
announced.retain(|r| rooms.contains(r));
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = signal_task => {}
|
||||
_ = media_task => {}
|
||||
_ = announce_task => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -501,16 +501,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Federation connections use SNI "_federation"
|
||||
if room_name == "_federation" {
|
||||
if let Some(ref fm) = federation_mgr {
|
||||
// Check if we recognize this peer by TLS fingerprint
|
||||
let peer_fp = wzp_transport::tls_fingerprint(
|
||||
&transport.connection()
|
||||
.peer_identity()
|
||||
.and_then(|id| id.downcast::<Vec<rustls::pki_types::CertificateDer>>().ok())
|
||||
.and_then(|certs| certs.first().cloned())
|
||||
.map(|c| c.to_vec())
|
||||
.unwrap_or_default()
|
||||
);
|
||||
if let Some(peer_config) = fm.find_peer_by_fingerprint(&peer_fp) {
|
||||
// Match inbound peer by source IP (client connections don't present TLS certs)
|
||||
if let Some(peer_config) = fm.find_peer_by_addr(addr) {
|
||||
let peer_config = peer_config.clone();
|
||||
let fm = fm.clone();
|
||||
info!(%addr, label = ?peer_config.label, "inbound federation connection accepted");
|
||||
@@ -520,12 +512,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
info!(" to accept, add to relay.toml:");
|
||||
info!(" [[peers]]");
|
||||
info!(" url = \"{addr}\"");
|
||||
info!(" fingerprint = \"{peer_fp}\"");
|
||||
transport.close().await.ok();
|
||||
}
|
||||
} else {
|
||||
info!(%addr, "federation connection rejected (no peers configured)");
|
||||
transport.close().await.ok();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user