fix: canonical room hash for federation — handles hashed vs raw room names
Some checks failed
Mirror to GitHub / mirror (push) Failing after 36s
Build Release Binaries / build-amd64 (push) Failing after 2m13s

Different clients send different room names:
- Android: raw "general" as SNI
- Desktop: hash_room_name("general") = "f09ae11d..." as SNI

Federation datagrams are tagged with an 8-byte room hash. Previously,
each relay computed the hash from the client-provided room name,
causing mismatches between relays with different client types.

Fix: resolve_global_room() maps any room name (raw or hashed) to the
canonical [[global_rooms]] name. global_room_hash() always uses the
canonical name for federation hashing. handle_datagram uses both raw
and canonical hash matching to find the local room.

Also: run_participant now receives the pre-computed federation_room_hash
so the egress uses the canonical hash, not the client-specific name.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 10:31:26 +04:00
parent 0abecf7fd8
commit d52b8befd6
3 changed files with 185 additions and 30 deletions

View File

@@ -8,7 +8,7 @@
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use bytes::Bytes;
use sha2::{Sha256, Digest};
@@ -19,6 +19,7 @@ use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::QuinnTransport;
use crate::config::{PeerConfig, TrustedConfig};
use crate::metrics::RelayMetrics;
use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager};
/// Compute 8-byte room hash for federation datagram tagging.
@@ -34,6 +35,78 @@ fn normalize_fp(fp: &str) -> String {
fp.replace(':', "").to_lowercase()
}
/// Sliding-window dedup filter for federation datagrams.
/// Tracks recently seen (room_hash, seq) pairs to discard duplicates
/// arriving via multiple federation paths (e.g., A↔B↔C and A↔C).
struct Deduplicator {
/// Ring buffer of recent packet fingerprints (room_hash XOR'd with seq).
seen: HashSet<u64>,
/// Ordered list for eviction.
order: std::collections::VecDeque<u64>,
capacity: usize,
}
impl Deduplicator {
fn new(capacity: usize) -> Self {
Self {
seen: HashSet::with_capacity(capacity),
order: std::collections::VecDeque::with_capacity(capacity),
capacity,
}
}
/// Returns true if this packet is a duplicate (already seen).
fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16) -> bool {
let key = u64::from_be_bytes(*room_hash) ^ (seq as u64);
if self.seen.contains(&key) {
return true;
}
if self.order.len() >= self.capacity {
if let Some(old) = self.order.pop_front() {
self.seen.remove(&old);
}
}
self.seen.insert(key);
self.order.push_back(key);
false
}
}
/// Per-room token bucket rate limiter for federation forwarding.
struct RateLimiter {
/// Max packets per second per room.
max_pps: u32,
/// Tokens remaining in current window.
tokens: u32,
/// When the current window started.
window_start: Instant,
}
impl RateLimiter {
fn new(max_pps: u32) -> Self {
Self {
max_pps,
tokens: max_pps,
window_start: Instant::now(),
}
}
/// Returns true if the packet should be allowed through.
fn allow(&mut self) -> bool {
let elapsed = self.window_start.elapsed();
if elapsed >= Duration::from_secs(1) {
self.tokens = self.max_pps;
self.window_start = Instant::now();
}
if self.tokens > 0 {
self.tokens -= 1;
true
} else {
false
}
}
}
/// Active link to a peer relay.
struct PeerLink {
transport: Arc<QuinnTransport>,
@@ -42,6 +115,11 @@ struct PeerLink {
active_rooms: HashSet<String>,
}
/// Max federation packets per second per room (0 = unlimited).
const FEDERATION_RATE_LIMIT_PPS: u32 = 500;
/// Dedup window size (number of recent packets to remember).
const DEDUP_WINDOW_SIZE: usize = 4096;
/// Manages federation connections and global room forwarding.
pub struct FederationManager {
peers: Vec<PeerConfig>,
@@ -52,6 +130,12 @@ pub struct FederationManager {
local_tls_fp: String,
/// Active peer connections, keyed by normalized fingerprint.
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
/// Prometheus metrics.
metrics: Arc<RelayMetrics>,
/// Dedup filter for incoming federation datagrams.
dedup: Mutex<Deduplicator>,
/// Per-room rate limiters for inbound federation media.
rate_limiters: Mutex<HashMap<String, RateLimiter>>,
}
impl FederationManager {
@@ -62,6 +146,7 @@ impl FederationManager {
room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint,
local_tls_fp: String,
metrics: Arc<RelayMetrics>,
) -> Self {
Self {
peers,
@@ -71,20 +156,38 @@ impl FederationManager {
endpoint,
local_tls_fp,
peer_links: Arc::new(Mutex::new(HashMap::new())),
metrics,
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
rate_limiters: Mutex::new(HashMap::new()),
}
}
/// Check if a room name (which may be hashed) is a global room.
pub fn is_global_room(&self, room: &str) -> bool {
// Check both the raw name and the hashed version
self.resolve_global_room(room).is_some()
}
/// Resolve a room name (raw or hashed) to the canonical global room name.
/// Returns the configured global room name if it matches.
pub fn resolve_global_room(&self, room: &str) -> Option<&str> {
// Direct match (raw room name, e.g. Android clients)
if self.global_rooms.contains(room) {
return true;
return Some(self.global_rooms.iter().find(|n| n.as_str() == room).unwrap());
}
// The room name in the room manager is the hashed SNI.
// Check if any configured global room hashes to this value.
self.global_rooms.iter().any(|name| {
// Hashed match (desktop clients hash room names for SNI privacy)
self.global_rooms.iter().find(|name| {
wzp_crypto::hash_room_name(name) == room
})
}).map(|s| s.as_str())
}
/// Get the canonical federation room hash for a room.
/// Always uses the configured global room name, not the client-provided name.
pub fn global_room_hash(&self, room: &str) -> [u8; 8] {
if let Some(canonical) = self.resolve_global_room(room) {
room_hash(canonical)
} else {
room_hash(room)
}
}
/// Start federation — spawns connection loops + event dispatcher.
@@ -146,18 +249,16 @@ impl FederationManager {
if links.is_empty() {
return;
}
let mut sent = 0u32;
for (fp, link) in links.iter() {
// Send to all connected peers — they have the global room configured
// and will deliver to local participants or forward further
{
let mut tagged = Vec::with_capacity(8 + media_data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(media_data);
match link.transport.send_raw_datagram(&tagged) {
Ok(()) => sent += 1,
Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
for (_fp, link) in links.iter() {
let mut tagged = Vec::with_capacity(8 + media_data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(media_data);
match link.transport.send_raw_datagram(&tagged) {
Ok(()) => {
self.metrics.federation_packets_forwarded
.with_label_values(&[&link.label, "out"]).inc();
}
Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
}
}
}
@@ -299,7 +400,7 @@ async fn run_federation_link(
peer_fp: String,
peer_label: String,
) -> Result<(), anyhow::Error> {
// Register peer link
// Register peer link + metrics
{
let mut links = fm.peer_links.lock().await;
links.insert(peer_fp.clone(), PeerLink {
@@ -307,6 +408,8 @@ async fn run_federation_link(
label: peer_label.clone(),
active_rooms: HashSet::new(),
});
fm.metrics.federation_peer_status
.with_label_values(&[&peer_label]).set(1);
}
// Announce our currently active global rooms
@@ -320,14 +423,17 @@ async fn run_federation_link(
}
}
// Two concurrent tasks: signal recv + media recv
// Three concurrent tasks: signal recv + media recv + RTT monitor
let signal_transport = transport.clone();
let media_transport = transport.clone();
let rtt_transport = transport.clone();
let fm_signal = fm.clone();
let fm_media = fm.clone();
let fm_rtt = fm.clone();
let peer_fp_signal = peer_fp.clone();
let peer_fp_media = peer_fp.clone();
let label_signal = peer_label.clone();
let label_rtt = peer_label.clone();
let signal_task = async move {
loop {
@@ -354,6 +460,8 @@ async fn run_federation_link(
if media_count == 1 || media_count % 250 == 0 {
info!(peer = %peer_label_media, media_count, len = data.len(), "federation: received datagram");
}
fm_media.metrics.federation_packets_forwarded
.with_label_values(&[&peer_label_media, "in"]).inc();
handle_datagram(&fm_media, &peer_fp_media, data).await;
}
Err(e) => {
@@ -364,15 +472,28 @@ async fn run_federation_link(
}
};
// RTT monitor: periodically sample QUIC RTT for this peer
let rtt_task = async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let rtt_ms = rtt_transport.connection().stats().path.rtt.as_millis() as f64;
fm_rtt.metrics.federation_peer_rtt_ms
.with_label_values(&[&label_rtt]).set(rtt_ms);
}
};
tokio::select! {
_ = signal_task => {}
_ = media_task => {}
_ = rtt_task => {}
}
// Cleanup: remove peer link
// Cleanup: remove peer link + metrics
{
let mut links = fm.peer_links.lock().await;
links.remove(&peer_fp);
fm.metrics.federation_peer_status
.with_label_values(&[&peer_label]).set(0);
}
info!(peer = %peer_label, "federation link ended");
@@ -394,6 +515,9 @@ async fn handle_signal(
if let Some(link) = links.get_mut(peer_fp) {
link.active_rooms.insert(room.clone());
}
// Update active rooms gauge
let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
fm.metrics.federation_active_rooms.set(total as i64);
// Propagate: tell all OTHER peers this room is routable through us.
// This enables multi-hop: A→B→C where B relays A's announcement to C and vice versa.
for (fp, link) in links.iter() {
@@ -409,6 +533,9 @@ async fn handle_signal(
if let Some(link) = links.get_mut(peer_fp) {
link.active_rooms.remove(&room);
}
// Update active rooms gauge
let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
fm.metrics.federation_active_rooms.set(total as i64);
// Check if any other peer still has this room — if none, propagate inactive
let any_other_active = links.iter()
.any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room));
@@ -445,10 +572,23 @@ async fn handle_datagram(
None => return,
};
// Dedup: drop packets we've already seen (multi-path duplicates)
{
let mut dedup = fm.dedup.lock().await;
if dedup.is_dup(&rh, pkt.header.seq) {
fm.metrics.federation_packets_deduped.inc();
return;
}
}
// Find room by hash
let room_name = {
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms().into_iter().find(|r| room_hash(r) == rh)
{
let active = mgr.active_rooms();
active.iter().find(|r| room_hash(r) == rh).cloned()
.or_else(|| active.iter().find(|r| fm.global_room_hash(r) == rh).cloned())
}
};
let room_name = match room_name {
@@ -456,6 +596,17 @@ async fn handle_datagram(
None => return, // room not active locally
};
// Rate limit per room
if FEDERATION_RATE_LIMIT_PPS > 0 {
let mut limiters = fm.rate_limiters.lock().await;
let limiter = limiters.entry(room_name.clone())
.or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS));
if !limiter.allow() {
fm.metrics.federation_packets_rate_limited.inc();
return;
}
}
// Deliver to all local participants
let locals = {
let mgr = fm.room_mgr.lock().await;

View File

@@ -392,6 +392,7 @@ async fn main() -> anyhow::Result<()> {
room_mgr.clone(),
endpoint.clone(),
tls_fp.clone(),
metrics.clone(),
));
let fm_run = fm.clone();
tokio::spawn(async move { fm_run.run().await });
@@ -759,22 +760,22 @@ async fn main() -> anyhow::Result<()> {
.map(|b| format!("{b:02x}"))
.collect();
// Set up federation media channel if this is a global room
let federation_tx = if let Some(ref fm) = federation_mgr {
let (federation_tx, federation_room_hash) = if let Some(ref fm) = federation_mgr {
let is_global = fm.is_global_room(&room_name);
info!(room = %room_name, is_global, "checking if room is global for federation");
if is_global {
let canonical_hash = fm.global_room_hash(&room_name);
let (tx, rx) = tokio::sync::mpsc::channel(256);
let fm_clone = fm.clone();
tokio::spawn(async move {
wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await;
});
info!(room = %room_name, "federation media egress channel created");
Some(tx)
info!(room = %room_name, canonical = ?fm.resolve_global_room(&room_name), "federation egress created (global room)");
(Some(tx), Some(canonical_hash))
} else {
None
(None, None)
}
} else {
None
(None, None)
};
room::run_participant(
@@ -787,6 +788,7 @@ async fn main() -> anyhow::Result<()> {
trunking_enabled,
debug_tap,
federation_tx,
federation_room_hash,
).await;
// Participant disconnected — clean up presence + per-session metrics

View File

@@ -431,6 +431,7 @@ pub async fn run_participant(
trunking_enabled: bool,
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
federation_room_hash: Option<[u8; 8]>,
) {
if trunking_enabled {
run_participant_trunked(
@@ -439,7 +440,7 @@ pub async fn run_participant(
.await;
} else {
run_participant_plain(
room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx,
room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx, federation_room_hash,
)
.await;
}
@@ -455,6 +456,7 @@ async fn run_participant_plain(
session_id: &str,
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
federation_room_hash: Option<[u8; 8]>,
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
@@ -565,7 +567,7 @@ async fn run_participant_plain(
let data = pkt.to_bytes();
let _ = fed_tx.try_send(FederationMediaOut {
room_name: room_name.clone(),
room_hash: crate::federation::room_hash(&room_name),
room_hash: federation_room_hash.unwrap_or_else(|| crate::federation::room_hash(&room_name)),
data,
});
}