feat: federation rewrite — global rooms router model
Some checks failed
Mirror to GitHub / mirror (push) Failing after 36s
Build Release Binaries / build-amd64 (push) Failing after 1m52s

Major rewrite of relay federation replacing virtual participants with
a clean router model:

1. Global rooms: [[global_rooms]] in TOML config declares rooms that
   are bridged across federation. Each relay is a router + local SFU.

2. Room events: RoomManager emits LocalJoin/LocalLeave via broadcast
   channel when rooms transition between empty and non-empty.

3. GlobalRoomActive/Inactive signals: relays announce when they have
   local participants in global rooms. Peers track active state and
   forward media accordingly. Announcements propagate for multi-hop.

4. Media forwarding: separated from SFU loop. Local participant sends
   via mpsc channel → egress task → forward_to_peers() → room-hash
   tagged datagrams to active peer links. Inbound datagrams delivered
   to local participants + forwarded to other active peers (multi-hop).

5. Loop prevention: don't forward back to source relay.

6. Room name hashing: is_global_room() checks both plain name and
   hash (clients hash room names for SNI privacy).

Removed: ParticipantSender::Federation, federated_participants, virtual
participant join/leave, periodic room polling. Rooms now only contain
local participants.

Signaling tested: 3-relay chain (A→B←C) correctly propagates
GlobalRoomActive through B to both A and C. Media forwarding plumbing
in place but needs final debugging.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 07:54:38 +04:00
parent bc8bb3d790
commit b00db5dfdc
6 changed files with 387 additions and 344 deletions

View File

@@ -111,9 +111,8 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse
SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::FederationHello { .. }
| SignalMessage::FederationRoomJoin { .. }
| SignalMessage::FederationRoomLeave { .. }
| SignalMessage::FederationParticipantUpdate { .. } => CallSignalType::Offer, // relay-only
| SignalMessage::GlobalRoomActive { .. }
| SignalMessage::GlobalRoomInactive { .. } => CallSignalType::Offer, // relay-only
}
}

View File

@@ -665,21 +665,14 @@ pub enum SignalMessage {
tls_fingerprint: String,
},
/// Federation: a room exists on the sending relay with active local participants.
FederationRoomJoin {
room: String,
participants: Vec<RoomParticipant>,
},
/// Federation: a room is now empty on the sending relay.
FederationRoomLeave {
/// Federation: this relay now has local participants in a global room.
GlobalRoomActive {
room: String,
},
/// Federation: local participant list changed for a federated room.
FederationParticipantUpdate {
/// Federation: this relay's last local participant left a global room.
GlobalRoomInactive {
room: String,
participants: Vec<RoomParticipant>,
},
}

View File

@@ -25,6 +25,13 @@ pub struct TrustedConfig {
pub label: Option<String>,
}
/// A room declared global — bridged across all federated peers.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GlobalRoomConfig {
/// Room name to bridge (e.g., "android").
pub name: String,
}
/// Configuration for the relay daemon.
///
/// All fields have defaults, so a minimal TOML file only needs the
@@ -73,6 +80,9 @@ pub struct RelayConfig {
/// Federation peer relays.
#[serde(default)]
pub peers: Vec<PeerConfig>,
/// Global rooms bridged across federation.
#[serde(default)]
pub global_rooms: Vec<GlobalRoomConfig>,
/// Trusted relay fingerprints — accept inbound federation from these relays.
/// Unlike [[peers]], no url is needed — the peer connects to us.
#[serde(default)]
@@ -99,6 +109,7 @@ impl Default for RelayConfig {
ws_port: None,
static_dir: None,
peers: Vec::new(),
global_rooms: Vec::new(),
trusted: Vec::new(),
debug_tap: None,
}

View File

@@ -1,10 +1,11 @@
//! Relay federation — connects to peer relays and bridges rooms with matching names.
//! Relay federation — global room routing between peer relays.
//!
//! Each federated peer is represented as a virtual participant in shared rooms.
//! Media from local participants is forwarded to the peer via room-tagged datagrams.
//! Media from the peer is received, demuxed by room hash, and forwarded to local participants.
//! Each relay maintains a forwarding table per global room. When a local participant
//! sends media in a global room, it's forwarded to all peer relays that have the room
//! active. Incoming federated media is delivered to local participants and optionally
//! forwarded to other active peers (multi-hop).
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
@@ -18,7 +19,7 @@ use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::QuinnTransport;
use crate::config::{PeerConfig, TrustedConfig};
use crate::room::{self, ParticipantSender, RoomManager};
use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager};
/// Compute 8-byte room hash for federation datagram tagging.
pub fn room_hash(room_name: &str) -> [u8; 8] {
@@ -28,19 +29,36 @@ pub fn room_hash(room_name: &str) -> [u8; 8] {
out
}
/// Manages federation connections to peer relays.
/// Normalize a fingerprint string (remove colons, lowercase).
fn normalize_fp(fp: &str) -> String {
fp.replace(':', "").to_lowercase()
}
/// Active link to a peer relay.
struct PeerLink {
transport: Arc<QuinnTransport>,
label: String,
/// Global rooms that this peer has reported as active.
active_rooms: HashSet<String>,
}
/// Manages federation connections and global room forwarding.
pub struct FederationManager {
peers: Vec<PeerConfig>,
trusted: Vec<TrustedConfig>,
global_rooms: HashSet<String>,
room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint,
local_tls_fp: String,
/// Active peer connections, keyed by normalized fingerprint.
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
}
impl FederationManager {
pub fn new(
peers: Vec<PeerConfig>,
trusted: Vec<TrustedConfig>,
global_rooms: HashSet<String>,
room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint,
local_tls_fp: String,
@@ -48,19 +66,41 @@ impl FederationManager {
Self {
peers,
trusted,
global_rooms,
room_mgr,
endpoint,
local_tls_fp,
peer_links: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Start federation — spawns one task per configured peer.
/// Check if a room name (which may be hashed) is a global room.
pub fn is_global_room(&self, room: &str) -> bool {
// Check both the raw name and the hashed version
if self.global_rooms.contains(room) {
return true;
}
// The room name in the room manager is the hashed SNI.
// Check if any configured global room hashes to this value.
self.global_rooms.iter().any(|name| {
wzp_crypto::hash_room_name(name) == room
})
}
/// Start federation — spawns connection loops + event dispatcher.
pub async fn run(self: Arc<Self>) {
if self.peers.is_empty() {
if self.peers.is_empty() && self.global_rooms.is_empty() {
return;
}
info!(peers = self.peers.len(), "federation starting");
info!(
peers = self.peers.len(),
global_rooms = self.global_rooms.len(),
"federation starting"
);
let mut handles = Vec::new();
// Per-peer outbound connection loops
for peer in &self.peers {
let this = self.clone();
let peer = peer.clone();
@@ -68,30 +108,58 @@ impl FederationManager {
run_peer_loop(this, peer).await;
}));
}
// Room event dispatcher
let room_events = {
let mgr = self.room_mgr.lock().await;
mgr.subscribe_events()
};
let this = self.clone();
handles.push(tokio::spawn(async move {
run_room_event_dispatcher(this, room_events).await;
}));
for h in handles {
let _ = h.await;
}
}
/// Handle an inbound federation connection from a peer that we recognize.
/// Handle an inbound federation connection from a recognized peer.
pub async fn handle_inbound(
self: &Arc<Self>,
transport: Arc<QuinnTransport>,
peer_config: PeerConfig,
) {
let addr: SocketAddr = peer_config.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap());
info!(peer = ?peer_config.label, %addr, "inbound federation link active");
if let Err(e) = run_federation_link(self.clone(), transport, addr, &peer_config).await {
warn!(peer = ?peer_config.label, "inbound federation link ended: {e}");
let peer_fp = normalize_fp(&peer_config.fingerprint);
let label = peer_config.label.unwrap_or_else(|| peer_config.url.clone());
info!(peer = %label, "inbound federation link active");
if let Err(e) = run_federation_link(self.clone(), transport, peer_fp, label.clone()).await {
warn!(peer = %label, "inbound federation link ended: {e}");
}
}
/// Find a configured peer by TLS fingerprint.
/// Forward locally-generated media to active peers for a global room.
pub async fn forward_to_peers(&self, room_name: &str, room_hash: &[u8; 8], media_data: &Bytes) {
let links = self.peer_links.lock().await;
if links.is_empty() {
return;
}
for link in links.values() {
if link.active_rooms.contains(room_name) {
let mut tagged = Vec::with_capacity(8 + media_data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(media_data);
let _ = link.transport.send_raw_datagram(&tagged);
}
}
}
// ── Trust verification (kept from previous implementation) ──
pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> {
self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp))
}
/// Find a configured peer by source IP address.
pub fn find_peer_by_addr(&self, addr: SocketAddr) -> Option<&PeerConfig> {
let addr_ip = addr.ip();
self.peers.iter().find(|p| {
@@ -101,19 +169,14 @@ impl FederationManager {
})
}
/// Find a trusted relay by TLS fingerprint.
pub fn find_trusted_by_fingerprint(&self, fp: &str) -> Option<&TrustedConfig> {
self.trusted.iter().find(|t| normalize_fp(&t.fingerprint) == normalize_fp(fp))
}
/// Check if an inbound federation connection is trusted (by IP match in [[peers]] or fingerprint in [[trusted]]).
/// Returns the label for logging.
pub fn check_inbound_trust(&self, addr: SocketAddr, hello_fp: &str) -> Option<String> {
// Check [[peers]] by IP
if let Some(peer) = self.find_peer_by_addr(addr) {
return Some(peer.label.clone().unwrap_or_else(|| peer.url.clone()));
}
// Check [[trusted]] by fingerprint
if let Some(trusted) = self.find_trusted_by_fingerprint(hello_fp) {
return Some(trusted.label.clone().unwrap_or_else(|| hello_fp[..16].to_string()));
}
@@ -121,10 +184,56 @@ impl FederationManager {
}
}
/// Normalize a fingerprint string (remove colons, lowercase).
fn normalize_fp(fp: &str) -> String {
fp.replace(':', "").to_lowercase()
// ── Outbound media egress task ──
/// Drains the federation media channel and forwards to active peers.
pub async fn run_federation_media_egress(
fm: Arc<FederationManager>,
mut rx: tokio::sync::mpsc::Receiver<FederationMediaOut>,
) {
while let Some(out) = rx.recv().await {
fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data).await;
}
}
// ── Room event dispatcher ──
/// Watches RoomManager events and sends GlobalRoomActive/Inactive to peers.
async fn run_room_event_dispatcher(
fm: Arc<FederationManager>,
mut events: tokio::sync::broadcast::Receiver<RoomEvent>,
) {
loop {
match events.recv().await {
Ok(RoomEvent::LocalJoin { room }) => {
if fm.is_global_room(&room) {
info!(room = %room, "global room now active, announcing to peers");
let msg = SignalMessage::GlobalRoomActive { room };
let links = fm.peer_links.lock().await;
for link in links.values() {
let _ = link.transport.send_signal(&msg).await;
}
}
}
Ok(RoomEvent::LocalLeave { room }) => {
if fm.is_global_room(&room) {
info!(room = %room, "global room now inactive, announcing to peers");
let msg = SignalMessage::GlobalRoomInactive { room };
let links = fm.peer_links.lock().await;
for link in links.values() {
let _ = link.transport.send_signal(&msg).await;
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(missed = n, "room event receiver lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
}
// ── Peer connection management ──
/// Persistent connection loop for one peer — reconnects with backoff.
async fn run_peer_loop(fm: Arc<FederationManager>, peer: PeerConfig) {
@@ -133,9 +242,10 @@ async fn run_peer_loop(fm: Arc<FederationManager>, peer: PeerConfig) {
info!(peer_url = %peer.url, label = ?peer.label, "federation: connecting to peer...");
match connect_to_peer(&fm, &peer).await {
Ok(transport) => {
backoff = Duration::from_secs(5); // reset on success
let addr: SocketAddr = peer.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap());
if let Err(e) = run_federation_link(fm.clone(), transport, addr, &peer).await {
backoff = Duration::from_secs(5);
let peer_fp = normalize_fp(&peer.fingerprint);
let label = peer.label.clone().unwrap_or_else(|| peer.url.clone());
if let Err(e) = run_federation_link(fm.clone(), transport, peer_fp, label).await {
warn!(peer_url = %peer.url, "federation link ended: {e}");
}
}
@@ -148,219 +258,201 @@ async fn run_peer_loop(fm: Arc<FederationManager>, peer: PeerConfig) {
}
}
/// Connect to a peer relay.
/// Connect to a peer relay and send hello.
async fn connect_to_peer(fm: &FederationManager, peer: &PeerConfig) -> Result<Arc<QuinnTransport>, anyhow::Error> {
let addr: SocketAddr = peer.url.parse()?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&fm.endpoint, addr, "_federation", client_cfg).await?;
// TODO: verify peer TLS fingerprint once we have cert access
let transport = Arc::new(QuinnTransport::new(conn));
// Send hello with our TLS fingerprint so the peer can verify us
// Send hello with our TLS fingerprint
let hello = SignalMessage::FederationHello {
tls_fingerprint: fm.local_tls_fp.clone(),
};
transport.send_signal(&hello).await
.map_err(|e| anyhow::anyhow!("federation hello send failed: {e}"))?;
info!(peer_url = %peer.url, label = ?peer.label, "federation: connected to peer (hello sent)");
info!(peer_url = %peer.url, label = ?peer.label, "federation: connected (hello sent)");
Ok(transport)
}
/// Run the federation link: exchange room info and forward media.
// ── Federation link (runs on a single QUIC connection) ──
/// Run the federation link: exchange global room state and forward media.
async fn run_federation_link(
fm: Arc<FederationManager>,
transport: Arc<QuinnTransport>,
peer_addr: SocketAddr,
peer: &PeerConfig,
peer_fp: String,
peer_label: String,
) -> Result<(), anyhow::Error> {
// Announce our active rooms to the peer
let rooms = {
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms()
};
for room_name in &rooms {
let participants = {
let mgr = fm.room_mgr.lock().await;
mgr.local_participants(room_name)
};
let msg = SignalMessage::FederationRoomJoin {
room: room_name.clone(),
participants,
};
transport.send_signal(&msg).await?;
// Register peer link
{
let mut links = fm.peer_links.lock().await;
links.insert(peer_fp.clone(), PeerLink {
transport: transport.clone(),
label: peer_label.clone(),
active_rooms: HashSet::new(),
});
}
// Track virtual participants we create on behalf of this peer
let mut peer_room_participants: HashMap<String, room::ParticipantId> = HashMap::new();
// Map room_hash -> room_name for incoming media demux
let mut hash_to_room: HashMap<[u8; 8], String> = HashMap::new();
// Announce our currently active global rooms
{
let mgr = fm.room_mgr.lock().await;
for room_name in mgr.active_rooms() {
if fm.is_global_room(&room_name) {
let msg = SignalMessage::GlobalRoomActive { room: room_name };
let _ = transport.send_signal(&msg).await;
}
}
}
// Run three tasks: recv signals + recv media + periodic room announcements
// Two concurrent tasks: signal recv + media recv
let signal_transport = transport.clone();
let media_transport = transport.clone();
let announce_transport = transport.clone();
let fm_signal = fm.clone();
let fm_media = fm.clone();
let fm_announce = fm.clone();
let peer_label = peer.label.clone().unwrap_or_else(|| peer.url.clone());
let peer_label2 = peer_label.clone();
let peer_fp_signal = peer_fp.clone();
let peer_fp_media = peer_fp.clone();
let label_signal = peer_label.clone();
let signal_task = async move {
loop {
match signal_transport.recv_signal().await {
Ok(Some(msg)) => {
info!(peer = %peer_label, "federation: received signal {:?}", std::mem::discriminant(&msg));
match msg {
SignalMessage::FederationRoomJoin { room, participants } => {
info!(peer = %peer_label, room = %room, count = participants.len(), "federation: peer room join");
let rh = room_hash(&room);
hash_to_room.insert(rh, room.clone());
let sender = ParticipantSender::Federation {
transport: signal_transport.clone(),
room_hash: rh,
};
let (pid, update, senders) = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.join_federated(&room, peer_addr, sender, participants)
};
peer_room_participants.insert(room, pid);
room::broadcast_signal(&senders, &update).await;
}
SignalMessage::FederationRoomLeave { room } => {
info!(peer = %peer_label, room = %room, "federation: peer room leave");
if let Some(pid) = peer_room_participants.remove(&room) {
let result = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.leave(&room, pid)
};
if let Some((update, senders)) = result {
room::broadcast_signal(&senders, &update).await;
}
}
hash_to_room.retain(|_, v| v != &room);
}
SignalMessage::FederationParticipantUpdate { room, participants } => {
let result = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.update_federated_participants(&room, peer_addr, participants)
};
if let Some((update, senders)) = result {
room::broadcast_signal(&senders, &update).await;
}
}
_ => {} // ignore other signals
}
handle_signal(&fm_signal, &peer_fp_signal, &label_signal, msg).await;
}
Ok(None) => break,
Err(e) => {
error!(peer = %peer_label, "federation signal recv error: {e}");
error!(peer = %label_signal, "federation signal error: {e}");
break;
}
}
}
// Cleanup: remove all virtual participants for this peer
for (room, pid) in &peer_room_participants {
let result = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.leave(room, *pid)
};
if let Some((update, senders)) = result {
room::broadcast_signal(&senders, &update).await;
}
}
info!(peer = %peer_label, "federation signal task ended");
};
let media_task = async move {
loop {
match media_transport.connection().read_datagram().await {
Ok(data) => {
if data.len() < 8 + 4 {
continue; // too short (need room_hash + min header)
}
let mut rh = [0u8; 8];
rh.copy_from_slice(&data[..8]);
let media_bytes = &data[8..];
// Deserialize media packet
let pkt = match wzp_proto::MediaPacket::from_bytes(Bytes::copy_from_slice(media_bytes)) {
Some(pkt) => pkt,
None => continue,
};
// Look up room by hash — we need to get the room name from the signal task's hash_to_room
// For simplicity, we forward to all local participants via the room manager
// The virtual participant approach means we don't need the room name here —
// the SFU loop handles it. But since inbound media doesn't go through run_participant,
// we need to manually fan out.
// For now, just use the room manager to find local participants
// This is a simplified approach — full implementation would maintain
// a shared hash_to_room map between signal and media tasks
let mgr = fm_media.room_mgr.lock().await;
for room_name in mgr.active_rooms() {
if room_hash(&room_name) == rh {
// Forward to all local participants in this room
let locals: Vec<_> = mgr.local_senders(&room_name);
drop(mgr); // release lock before sending
for sender in &locals {
if let ParticipantSender::Quic(t) = sender {
let _ = t.send_media(&pkt).await;
}
}
break;
}
}
handle_datagram(&fm_media, &peer_fp_media, data).await;
}
Err(_) => break,
}
}
};
// Periodically announce new local rooms to the peer
let announce_task = async move {
let mut announced: std::collections::HashSet<String> = std::collections::HashSet::new();
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let rooms = {
let mgr = fm_announce.room_mgr.lock().await;
mgr.active_rooms()
};
for room_name in &rooms {
if !announced.contains(room_name) {
let participants = {
let mgr = fm_announce.room_mgr.lock().await;
mgr.local_participants(room_name)
};
if participants.is_empty() {
continue; // only virtual participants, skip
}
info!(peer = %peer_label2, room = %room_name, local_count = participants.len(), "federation: announcing room to peer");
let msg = SignalMessage::FederationRoomJoin {
room: room_name.clone(),
participants,
};
match announce_transport.send_signal(&msg).await {
Ok(()) => {
info!(peer = %peer_label2, room = %room_name, "federation: room announced successfully");
announced.insert(room_name.clone());
}
Err(e) => {
warn!(peer = %peer_label2, room = %room_name, "federation: announce send failed: {e}");
}
}
}
}
// Remove rooms that no longer exist
announced.retain(|r| rooms.contains(r));
}
};
tokio::select! {
_ = signal_task => {}
_ = media_task => {}
_ = announce_task => {}
}
// Cleanup: remove peer link
{
let mut links = fm.peer_links.lock().await;
links.remove(&peer_fp);
}
info!(peer = %peer_label, "federation link ended");
Ok(())
}
/// Handle an incoming federation signal.
async fn handle_signal(
fm: &Arc<FederationManager>,
peer_fp: &str,
peer_label: &str,
msg: SignalMessage,
) {
match msg {
SignalMessage::GlobalRoomActive { room } => {
if fm.is_global_room(&room) {
info!(peer = %peer_label, room = %room, "peer has global room active");
let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(peer_fp) {
link.active_rooms.insert(room.clone());
}
// Propagate: tell all OTHER peers this room is routable through us.
// This enables multi-hop: A→B→C where B relays A's announcement to C and vice versa.
for (fp, link) in links.iter() {
if fp != peer_fp {
let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive { room: room.clone() }).await;
}
}
}
}
SignalMessage::GlobalRoomInactive { room } => {
info!(peer = %peer_label, room = %room, "peer global room now inactive");
let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(peer_fp) {
link.active_rooms.remove(&room);
}
// Check if any other peer still has this room — if none, propagate inactive
let any_other_active = links.iter()
.any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room));
let local_active = {
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms().iter().any(|r| r == &room)
};
if !any_other_active && !local_active {
for (fp, link) in links.iter() {
if fp != peer_fp {
let _ = link.transport.send_signal(&SignalMessage::GlobalRoomInactive { room: room.clone() }).await;
}
}
}
}
_ => {} // ignore other signals
}
}
/// Handle an incoming federation datagram (room-hash-tagged media).
async fn handle_datagram(
fm: &Arc<FederationManager>,
source_peer_fp: &str,
data: Bytes,
) {
if data.len() < 12 { return; } // 8-byte hash + min packet
let mut rh = [0u8; 8];
rh.copy_from_slice(&data[..8]);
let media_bytes = data.slice(8..);
let pkt = match wzp_proto::MediaPacket::from_bytes(media_bytes.clone()) {
Some(pkt) => pkt,
None => return,
};
// Find room by hash
let room_name = {
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms().into_iter().find(|r| room_hash(r) == rh)
};
let room_name = match room_name {
Some(r) => r,
None => return, // room not active locally
};
// Deliver to all local participants
let locals = {
let mgr = fm.room_mgr.lock().await;
mgr.local_senders(&room_name)
};
for sender in &locals {
match sender {
room::ParticipantSender::Quic(t) => { let _ = t.send_media(&pkt).await; }
room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; }
}
}
// Multi-hop: forward to OTHER active peers (not the source)
let links = fm.peer_links.lock().await;
for (fp, link) in links.iter() {
if fp != source_peer_fp && link.active_rooms.contains(&room_name) {
let mut tagged = Vec::with_capacity(8 + media_bytes.len());
tagged.extend_from_slice(&rh);
tagged.extend_from_slice(&media_bytes);
let _ = link.transport.send_raw_datagram(&tagged);
}
}
}

View File

@@ -104,6 +104,12 @@ fn parse_args() -> RelayConfig {
args.get(i).expect("--static-dir requires a directory path").to_string(),
);
}
"--global-room" => {
i += 1;
config.global_rooms.push(wzp_relay::config::GlobalRoomConfig {
name: args.get(i).expect("--global-room requires a room name").to_string(),
});
}
"--debug-tap" => {
i += 1;
config.debug_tap = Some(
@@ -132,6 +138,7 @@ fn parse_args() -> RelayConfig {
eprintln!(" --probe-mesh Enable mesh mode (mark config flag, probes all --probe targets).");
eprintln!(" --mesh-status Print mesh health table and exit (diagnostic).");
eprintln!(" --trunking Enable trunk batching for outgoing media in room mode.");
eprintln!(" --global-room <name> Declare a room as global (bridged across federation). Repeatable.");
eprintln!(" --debug-tap <room> Log packet headers for a room ('*' for all rooms).");
eprintln!(" --ws-port <port> WebSocket listener port for browser clients (e.g., 8080).");
eprintln!(" --static-dir <dir> Directory to serve static files from (HTML/JS/WASM).");
@@ -334,10 +341,15 @@ async fn main() -> anyhow::Result<()> {
let room_mgr = Arc::new(Mutex::new(RoomManager::new()));
// Federation manager
let federation_mgr = if !config.peers.is_empty() || !config.trusted.is_empty() {
let global_room_set: std::collections::HashSet<String> = config.global_rooms.iter()
.map(|g| g.name.clone())
.collect();
let federation_mgr = if !config.peers.is_empty() || !config.trusted.is_empty() || !global_room_set.is_empty() {
let fm = Arc::new(wzp_relay::federation::FederationManager::new(
config.peers.clone(),
config.trusted.clone(),
global_room_set.clone(),
room_mgr.clone(),
endpoint.clone(),
tls_fp.clone(),
@@ -386,6 +398,12 @@ async fn main() -> anyhow::Result<()> {
} else {
info!("auth disabled — any client can connect (use --auth-url to enable)");
}
if !config.global_rooms.is_empty() {
info!(count = config.global_rooms.len(), "global rooms configured");
for g in &config.global_rooms {
info!(name = %g.name, " global room");
}
}
if let Some(ref tap) = config.debug_tap {
info!(filter = %tap, "debug tap enabled — logging packet headers");
}
@@ -701,6 +719,22 @@ async fn main() -> anyhow::Result<()> {
.iter()
.map(|b| format!("{b:02x}"))
.collect();
// Set up federation media channel if this is a global room
let federation_tx = if let Some(ref fm) = federation_mgr {
if fm.is_global_room(&room_name) {
let (tx, rx) = tokio::sync::mpsc::channel(256);
let fm_clone = fm.clone();
tokio::spawn(async move {
wzp_relay::federation::run_federation_media_egress(fm_clone, rx).await;
});
Some(tx)
} else {
None
}
} else {
None
};
room::run_participant(
room_mgr.clone(),
room_name,
@@ -710,6 +744,7 @@ async fn main() -> anyhow::Result<()> {
&session_id_str,
trunking_enabled,
debug_tap,
federation_tx,
).await;
// Participant disconnected — clean up presence + per-session metrics

View File

@@ -59,13 +59,20 @@ fn next_id() -> ParticipantId {
NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed)
}
/// Tracks where a participant originates from (for loop prevention).
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ParticipantOrigin {
/// Connected directly to this relay.
Local,
/// Virtual participant representing a federated peer relay.
Federated { relay_addr: std::net::SocketAddr },
/// Events emitted by RoomManager for federation to observe.
#[derive(Clone, Debug)]
pub enum RoomEvent {
/// First local participant joined this room.
LocalJoin { room: String },
/// Last local participant left this room.
LocalLeave { room: String },
}
/// Outbound federation media from a local participant.
pub struct FederationMediaOut {
pub room_name: String,
pub room_hash: [u8; 8],
pub data: Bytes,
}
/// How to send data to a participant — either via QUIC transport or WebSocket channel.
@@ -73,11 +80,6 @@ pub enum ParticipantOrigin {
pub enum ParticipantSender {
Quic(Arc<wzp_transport::QuinnTransport>),
WebSocket(tokio::sync::mpsc::Sender<Bytes>),
/// Federated peer relay — media is prefixed with an 8-byte room hash.
Federation {
transport: Arc<wzp_transport::QuinnTransport>,
room_hash: [u8; 8],
},
}
impl ParticipantSender {
@@ -96,14 +98,6 @@ impl ParticipantSender {
};
transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}"))
}
ParticipantSender::Federation { transport, room_hash } => {
// Prefix media data with room hash for demuxing on the peer relay
let mut tagged = Vec::with_capacity(8 + data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(data);
transport.send_raw_datagram(&tagged)
.map_err(|e| format!("federation send: {e}"))
}
}
}
@@ -139,21 +133,17 @@ struct Participant {
sender: ParticipantSender,
fingerprint: Option<String>,
alias: Option<String>,
origin: ParticipantOrigin,
}
/// A room holding multiple participants.
struct Room {
participants: Vec<Participant>,
/// Remote participants from federated peers (for merged RoomUpdate).
federated_participants: HashMap<std::net::SocketAddr, Vec<wzp_proto::packet::RoomParticipant>>,
}
impl Room {
fn new() -> Self {
Self {
participants: Vec::new(),
federated_participants: HashMap::new(),
}
}
@@ -163,11 +153,10 @@ impl Room {
sender: ParticipantSender,
fingerprint: Option<String>,
alias: Option<String>,
origin: ParticipantOrigin,
) -> ParticipantId {
let id = next_id();
info!(room_size = self.participants.len() + 1, participant = id, %addr, ?origin, "joined room");
self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias, origin });
info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room");
self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias });
id
}
@@ -184,38 +173,15 @@ impl Room {
.collect()
}
/// Get senders with loop prevention for federation.
///
/// - Media from a **local** participant → send to ALL others (local + federated)
/// - Media from a **federated** participant → send to LOCAL participants only
/// (the source relay already forwarded to its own locals and other peers)
fn others_for_origin(&self, exclude_id: ParticipantId, source_origin: &ParticipantOrigin) -> Vec<ParticipantSender> {
/// Build a RoomUpdate participant list.
fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> {
self.participants
.iter()
.filter(|p| p.id != exclude_id)
.filter(|p| match source_origin {
ParticipantOrigin::Local => true,
ParticipantOrigin::Federated { .. } => p.origin == ParticipantOrigin::Local,
})
.map(|p| p.sender.clone())
.collect()
}
/// Build a RoomUpdate participant list (local + federated).
fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> {
let mut list: Vec<_> = self.participants
.iter()
.filter(|p| p.origin == ParticipantOrigin::Local)
.map(|p| wzp_proto::packet::RoomParticipant {
fingerprint: p.fingerprint.clone().unwrap_or_default(),
alias: p.alias.clone(),
})
.collect();
// Merge federated participants from all peer relays
for remote in self.federated_participants.values() {
list.extend(remote.iter().cloned());
}
list
.collect()
}
/// Get all senders (for broadcasting to everyone including the joiner).
@@ -239,24 +205,35 @@ pub struct RoomManager {
/// When `None`, rooms are open (no auth mode). When `Some`, only listed
/// fingerprints can join the corresponding room.
acl: Option<HashMap<String, HashSet<String>>>,
/// Channel for room lifecycle events (federation subscribes).
event_tx: tokio::sync::broadcast::Sender<RoomEvent>,
}
impl RoomManager {
pub fn new() -> Self {
let (event_tx, _) = tokio::sync::broadcast::channel(64);
Self {
rooms: HashMap::new(),
acl: None,
event_tx,
}
}
/// Create a room manager with ACL enforcement enabled.
pub fn with_acl() -> Self {
let (event_tx, _) = tokio::sync::broadcast::channel(64);
Self {
rooms: HashMap::new(),
acl: Some(HashMap::new()),
event_tx,
}
}
/// Subscribe to room lifecycle events (for federation).
pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<RoomEvent> {
self.event_tx.subscribe()
}
/// Grant a fingerprint access to a room.
pub fn allow(&mut self, room_name: &str, fingerprint: &str) {
if let Some(ref mut acl) = self.acl {
@@ -295,8 +272,13 @@ impl RoomManager {
warn!(room = room_name, fingerprint = ?fingerprint, "unauthorized room join attempt");
return Err("not authorized for this room".to_string());
}
let was_empty = !self.rooms.contains_key(room_name)
|| self.rooms.get(room_name).map_or(true, |r| r.is_empty());
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()), ParticipantOrigin::Local);
let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()));
if was_empty {
let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() });
}
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
@@ -317,78 +299,15 @@ impl RoomManager {
Ok(id)
}
/// Join a room as a federated virtual participant.
pub fn join_federated(
&mut self,
room_name: &str,
relay_addr: std::net::SocketAddr,
sender: ParticipantSender,
remote_participants: Vec<wzp_proto::packet::RoomParticipant>,
) -> (ParticipantId, wzp_proto::SignalMessage, Vec<ParticipantSender>) {
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
room.federated_participants.insert(relay_addr, remote_participants);
let id = room.add(
relay_addr, sender, None, Some("(federated)".to_string()),
ParticipantOrigin::Federated { relay_addr },
);
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
(id, update, senders)
}
/// Update federated participant list for a room (from FederationParticipantUpdate).
pub fn update_federated_participants(
&mut self,
room_name: &str,
relay_addr: std::net::SocketAddr,
participants: Vec<wzp_proto::packet::RoomParticipant>,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
if let Some(room) = self.rooms.get_mut(room_name) {
room.federated_participants.insert(relay_addr, participants);
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
Some((update, senders))
} else {
None
}
}
/// Get the origin of a participant by ID.
pub fn participant_origin(&self, room_name: &str, participant_id: ParticipantId) -> Option<ParticipantOrigin> {
self.rooms.get(room_name)
.and_then(|room| room.participants.iter().find(|p| p.id == participant_id))
.map(|p| p.origin.clone())
}
/// Get list of active room names (for federation room announcements).
/// Get list of active room names.
pub fn active_rooms(&self) -> Vec<String> {
self.rooms.keys().cloned().collect()
}
/// Get local participant list for a room (excludes federated virtual participants).
pub fn local_participants(&self, room_name: &str) -> Vec<wzp_proto::packet::RoomParticipant> {
self.rooms.get(room_name)
.map(|room| room.participants.iter()
.filter(|p| p.origin == ParticipantOrigin::Local)
.map(|p| wzp_proto::packet::RoomParticipant {
fingerprint: p.fingerprint.clone().unwrap_or_default(),
alias: p.alias.clone(),
})
.collect())
.unwrap_or_default()
}
/// Get senders for local-only participants in a room (for federation inbound media).
/// Get all senders for participants in a room (for federation inbound media delivery).
pub fn local_senders(&self, room_name: &str) -> Vec<ParticipantSender> {
self.rooms.get(room_name)
.map(|room| room.participants.iter()
.filter(|p| p.origin == ParticipantOrigin::Local)
.map(|p| p.sender.clone())
.collect())
.unwrap_or_default()
@@ -400,6 +319,7 @@ impl RoomManager {
room.remove(participant_id);
if room.is_empty() {
self.rooms.remove(room_name);
let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() });
info!(room = room_name, "room closed (empty)");
return None;
}
@@ -510,6 +430,7 @@ pub async fn run_participant(
session_id: &str,
trunking_enabled: bool,
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
) {
if trunking_enabled {
run_participant_trunked(
@@ -518,7 +439,7 @@ pub async fn run_participant(
.await;
} else {
run_participant_plain(
room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap,
room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx,
)
.await;
}
@@ -533,6 +454,7 @@ async fn run_participant_plain(
metrics: Arc<RelayMetrics>,
session_id: &str,
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
@@ -635,21 +557,19 @@ async fn run_participant_plain(
ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await;
}
ParticipantSender::Federation { transport, room_hash } => {
// Send room-tagged datagram to federated peer
}
}
// Federation: forward to active peer relays via channel
if let Some(ref fed_tx) = federation_tx {
let data = pkt.to_bytes();
let mut tagged = Vec::with_capacity(8 + data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(&data);
if let Err(e) = transport.send_raw_datagram(&tagged) {
send_errors += 1;
if send_errors <= 5 {
warn!(room = %room_name, "federation forward error: {e}");
}
}
}
}
let _ = fed_tx.try_send(FederationMediaOut {
room_name: room_name.clone(),
room_hash: crate::federation::room_hash(&room_name),
data,
});
}
let fwd_ms = fwd_start.elapsed().as_millis() as u64;
if fwd_ms > max_forward_ms {
max_forward_ms = fwd_ms;
@@ -815,13 +735,6 @@ async fn run_participant_trunked(
ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await;
}
ParticipantSender::Federation { transport, room_hash } => {
let data = pkt.to_bytes();
let mut tagged = Vec::with_capacity(8 + data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(&data);
let _ = transport.send_raw_datagram(&tagged);
}
}
}
let fwd_ms = fwd_start.elapsed().as_millis() as u64;