3 Commits

Author SHA1 Message Date
Siavash Sameni
6be36e43c2 feat: relay federation infrastructure — room bridging, loop prevention, peer connections
Some checks failed
Mirror to GitHub / mirror (push) Failing after 36s
Build Release Binaries / build-amd64 (push) Failing after 2m1s
Phase 1 of relay federation:

1. Signal messages: FederationRoomJoin/Leave/ParticipantUpdate added
   to SignalMessage enum for relay-to-relay room coordination.

2. Room changes: ParticipantOrigin (Local/Federated) tracking, loop
   prevention (federated media only forwards to local participants),
   ParticipantSender::Federation with 8-byte room-hash prefixed
   datagrams, merged participant lists (local + remote), new methods:
   join_federated(), update_federated_participants(), local_senders(),
   active_rooms(), local_participants().

3. FederationManager: connects to configured peers via QUIC with SNI
   "_federation", reconnects with exponential backoff (5s-300s),
   exchanges FederationRoomJoin signals, runs recv loops for both
   signals and media datagrams, creates virtual participants in rooms.

4. Accept-side: _federation SNI handling in main.rs, unknown peer
   gets helpful "add to relay.toml" log message, recognized peers
   handed off to FederationManager.

TODO: TLS fingerprint verification — currently outbound connections
use client_config() which doesn't present a cert, so inbound
verification fails. Need mutual TLS or URL-based peer matching.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 22:30:18 +04:00
Siavash Sameni
2f2720802d feat: TOML config file with federation peers + --config flag
The relay now supports loading configuration from a TOML file via
--config <path>. CLI flags override TOML values. All fields have
serde defaults so a minimal config only needs what you want to change.

Example relay.toml:
  listen_addr = "0.0.0.0:4433"
  [[peers]]
  url = "193.180.213.68:4433"
  fingerprint = "1a:39:38:..."
  label = "Pangolin EU"

Federation hint on startup now shows TOML format with TLS fingerprint
(not Ed25519 identity fingerprint), since TLS fingerprint is what
peers actually verify. Configured peers are logged on startup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 22:13:56 +04:00
Siavash Sameni
087bfd2335 feat: deterministic TLS certificate from relay identity seed
The relay's TLS certificate is now derived from the persisted
Ed25519 seed via HKDF, so the same seed produces the same cert
and the same TLS fingerprint across restarts. This fixes the
"Server Key Changed" warnings on every relay restart.

Implementation: HKDF-SHA256(seed, "wzp-tls-ed25519") → Ed25519
signing key → PKCS8 DER → rcgen KeyPair → self-signed cert.

Also adds tls_fingerprint() helper (SHA-256 of DER cert, hex with
colons) and prints it on startup. This is the prerequisite for
relay federation (peers verify each other by TLS fingerprint).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 22:10:08 +04:00
13 changed files with 651 additions and 22 deletions

View File

@@ -40,7 +40,7 @@ codec2 = "0.3"
# Crypto # Crypto
x25519-dalek = { version = "2", features = ["static_secrets"] } x25519-dalek = { version = "2", features = ["static_secrets"] }
ed25519-dalek = { version = "2", features = ["rand_core"] } ed25519-dalek = { version = "2", features = ["rand_core", "pkcs8"] }
chacha20poly1305 = "0.10" chacha20poly1305 = "0.10"
hkdf = "0.12" hkdf = "0.12"
sha2 = "0.10" sha2 = "0.10"

View File

@@ -110,6 +110,9 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse
SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::FederationRoomJoin { .. }
| SignalMessage::FederationRoomLeave { .. }
| SignalMessage::FederationParticipantUpdate { .. } => CallSignalType::Offer, // relay-only
} }
} }

View File

@@ -656,6 +656,25 @@ pub enum SignalMessage {
/// List of participants currently in the room. /// List of participants currently in the room.
participants: Vec<RoomParticipant>, participants: Vec<RoomParticipant>,
}, },
// ── Federation signals (relay-to-relay) ──
/// Federation: a room exists on the sending relay with active local participants.
FederationRoomJoin {
room: String,
participants: Vec<RoomParticipant>,
},
/// Federation: a room is now empty on the sending relay.
FederationRoomLeave {
room: String,
},
/// Federation: local participant list changed for a federated room.
FederationParticipantUpdate {
room: String,
participants: Vec<RoomParticipant>,
},
} }
/// A participant entry in a RoomUpdate message. /// A participant entry in a RoomUpdate message.

View File

@@ -29,6 +29,7 @@ axum = { version = "0.7", default-features = false, features = ["tokio", "http1"
tower-http = { version = "0.6", features = ["fs"] } tower-http = { version = "0.6", features = ["fs"] }
futures-util = "0.3" futures-util = "0.3"
dirs = "6" dirs = "6"
sha2 = { workspace = true }
[[bin]] [[bin]]
name = "wzp-relay" name = "wzp-relay"

View File

@@ -3,8 +3,24 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::SocketAddr; use std::net::SocketAddr;
/// Configuration for the relay daemon. /// A federated peer relay.
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerConfig {
/// Address of the peer relay (e.g., "193.180.213.68:4433").
pub url: String,
/// Expected TLS certificate fingerprint (hex, with colons).
pub fingerprint: String,
/// Optional human-readable label.
#[serde(default)]
pub label: Option<String>,
}
/// Configuration for the relay daemon.
///
/// All fields have defaults, so a minimal TOML file only needs the
/// fields you want to override (e.g., just `[[peers]]`).
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct RelayConfig { pub struct RelayConfig {
/// Address to listen on for incoming connections (client-facing). /// Address to listen on for incoming connections (client-facing).
pub listen_addr: SocketAddr, pub listen_addr: SocketAddr,
@@ -44,6 +60,9 @@ pub struct RelayConfig {
pub ws_port: Option<u16>, pub ws_port: Option<u16>,
/// Directory to serve static files from (HTML/JS/WASM for web clients). /// Directory to serve static files from (HTML/JS/WASM for web clients).
pub static_dir: Option<String>, pub static_dir: Option<String>,
/// Federation peer relays.
#[serde(default)]
pub peers: Vec<PeerConfig>,
} }
impl Default for RelayConfig { impl Default for RelayConfig {
@@ -62,6 +81,14 @@ impl Default for RelayConfig {
trunking_enabled: false, trunking_enabled: false,
ws_port: None, ws_port: None,
static_dir: None, static_dir: None,
peers: Vec::new(),
} }
} }
} }
/// Load relay configuration from a TOML file.
pub fn load_config(path: &str) -> Result<RelayConfig, anyhow::Error> {
let content = std::fs::read_to_string(path)?;
let config: RelayConfig = toml::from_str(&content)?;
Ok(config)
}

View File

@@ -0,0 +1,284 @@
//! Relay federation — connects to peer relays and bridges rooms with matching names.
//!
//! Each federated peer is represented as a virtual participant in shared rooms.
//! Media from local participants is forwarded to the peer via room-tagged datagrams.
//! Media from the peer is received, demuxed by room hash, and forwarded to local participants.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use sha2::{Sha256, Digest};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::QuinnTransport;
use crate::config::PeerConfig;
use crate::room::{self, ParticipantSender, RoomManager};
/// Compute 8-byte room hash for federation datagram tagging.
pub fn room_hash(room_name: &str) -> [u8; 8] {
let h = Sha256::digest(room_name.as_bytes());
let mut out = [0u8; 8];
out.copy_from_slice(&h[..8]);
out
}
/// Manages federation connections to peer relays.
pub struct FederationManager {
peers: Vec<PeerConfig>,
room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint,
local_tls_fp: String,
}
impl FederationManager {
pub fn new(
peers: Vec<PeerConfig>,
room_mgr: Arc<Mutex<RoomManager>>,
endpoint: quinn::Endpoint,
local_tls_fp: String,
) -> Self {
Self {
peers,
room_mgr,
endpoint,
local_tls_fp,
}
}
/// Start federation — spawns one task per configured peer.
pub async fn run(self: Arc<Self>) {
if self.peers.is_empty() {
return;
}
info!(peers = self.peers.len(), "federation starting");
let mut handles = Vec::new();
for peer in &self.peers {
let this = self.clone();
let peer = peer.clone();
handles.push(tokio::spawn(async move {
run_peer_loop(this, peer).await;
}));
}
for h in handles {
let _ = h.await;
}
}
/// Handle an inbound federation connection from a peer that we recognize.
pub async fn handle_inbound(
self: &Arc<Self>,
transport: Arc<QuinnTransport>,
peer_config: PeerConfig,
) {
let addr: SocketAddr = peer_config.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap());
info!(peer = ?peer_config.label, %addr, "inbound federation link active");
if let Err(e) = run_federation_link(self.clone(), transport, addr, &peer_config).await {
warn!(peer = ?peer_config.label, "inbound federation link ended: {e}");
}
}
/// Find a configured peer by TLS fingerprint.
pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> {
self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp))
}
}
/// Normalize a fingerprint string (remove colons, lowercase).
fn normalize_fp(fp: &str) -> String {
fp.replace(':', "").to_lowercase()
}
/// Persistent connection loop for one peer — reconnects with backoff.
async fn run_peer_loop(fm: Arc<FederationManager>, peer: PeerConfig) {
let mut backoff = Duration::from_secs(5);
loop {
info!(peer_url = %peer.url, label = ?peer.label, "federation: connecting to peer...");
match connect_to_peer(&fm, &peer).await {
Ok(transport) => {
backoff = Duration::from_secs(5); // reset on success
let addr: SocketAddr = peer.url.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap());
if let Err(e) = run_federation_link(fm.clone(), transport, addr, &peer).await {
warn!(peer_url = %peer.url, "federation link ended: {e}");
}
}
Err(e) => {
warn!(peer_url = %peer.url, backoff_s = backoff.as_secs(), "federation connect failed: {e}");
}
}
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(300));
}
}
/// Connect to a peer relay.
async fn connect_to_peer(fm: &FederationManager, peer: &PeerConfig) -> Result<Arc<QuinnTransport>, anyhow::Error> {
let addr: SocketAddr = peer.url.parse()?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&fm.endpoint, addr, "_federation", client_cfg).await?;
// TODO: verify peer TLS fingerprint once we have cert access
let transport = Arc::new(QuinnTransport::new(conn));
info!(peer_url = %peer.url, label = ?peer.label, "federation: connected to peer");
Ok(transport)
}
/// Run the federation link: exchange room info and forward media.
async fn run_federation_link(
fm: Arc<FederationManager>,
transport: Arc<QuinnTransport>,
peer_addr: SocketAddr,
peer: &PeerConfig,
) -> Result<(), anyhow::Error> {
// Announce our active rooms to the peer
let rooms = {
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms()
};
for room_name in &rooms {
let participants = {
let mgr = fm.room_mgr.lock().await;
mgr.local_participants(room_name)
};
let msg = SignalMessage::FederationRoomJoin {
room: room_name.clone(),
participants,
};
transport.send_signal(&msg).await?;
}
// Track virtual participants we create on behalf of this peer
let mut peer_room_participants: HashMap<String, room::ParticipantId> = HashMap::new();
// Map room_hash -> room_name for incoming media demux
let mut hash_to_room: HashMap<[u8; 8], String> = HashMap::new();
// Run two tasks: recv signals + recv media datagrams
let signal_transport = transport.clone();
let media_transport = transport.clone();
let fm_signal = fm.clone();
let fm_media = fm.clone();
let peer_label = peer.label.clone().unwrap_or_else(|| peer.url.clone());
let signal_task = async move {
loop {
match signal_transport.recv_signal().await {
Ok(Some(msg)) => {
match msg {
SignalMessage::FederationRoomJoin { room, participants } => {
info!(peer = %peer_label, room = %room, count = participants.len(), "federation: peer room join");
let rh = room_hash(&room);
hash_to_room.insert(rh, room.clone());
let sender = ParticipantSender::Federation {
transport: signal_transport.clone(),
room_hash: rh,
};
let (pid, update, senders) = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.join_federated(&room, peer_addr, sender, participants)
};
peer_room_participants.insert(room, pid);
room::broadcast_signal(&senders, &update).await;
}
SignalMessage::FederationRoomLeave { room } => {
info!(peer = %peer_label, room = %room, "federation: peer room leave");
if let Some(pid) = peer_room_participants.remove(&room) {
let result = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.leave(&room, pid)
};
if let Some((update, senders)) = result {
room::broadcast_signal(&senders, &update).await;
}
}
hash_to_room.retain(|_, v| v != &room);
}
SignalMessage::FederationParticipantUpdate { room, participants } => {
let result = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.update_federated_participants(&room, peer_addr, participants)
};
if let Some((update, senders)) = result {
room::broadcast_signal(&senders, &update).await;
}
}
_ => {} // ignore other signals
}
}
Ok(None) => break,
Err(e) => {
error!(peer = %peer_label, "federation signal recv error: {e}");
break;
}
}
}
// Cleanup: remove all virtual participants for this peer
for (room, pid) in &peer_room_participants {
let result = {
let mut mgr = fm_signal.room_mgr.lock().await;
mgr.leave(room, *pid)
};
if let Some((update, senders)) = result {
room::broadcast_signal(&senders, &update).await;
}
}
info!(peer = %peer_label, "federation signal task ended");
};
let media_task = async move {
loop {
match media_transport.connection().read_datagram().await {
Ok(data) => {
if data.len() < 8 + 4 {
continue; // too short (need room_hash + min header)
}
let mut rh = [0u8; 8];
rh.copy_from_slice(&data[..8]);
let media_bytes = &data[8..];
// Deserialize media packet
let pkt = match wzp_proto::MediaPacket::from_bytes(Bytes::copy_from_slice(media_bytes)) {
Some(pkt) => pkt,
None => continue,
};
// Look up room by hash — we need to get the room name from the signal task's hash_to_room
// For simplicity, we forward to all local participants via the room manager
// The virtual participant approach means we don't need the room name here —
// the SFU loop handles it. But since inbound media doesn't go through run_participant,
// we need to manually fan out.
// For now, just use the room manager to find local participants
// This is a simplified approach — full implementation would maintain
// a shared hash_to_room map between signal and media tasks
let mgr = fm_media.room_mgr.lock().await;
for room_name in mgr.active_rooms() {
if room_hash(&room_name) == rh {
// Forward to all local participants in this room
let locals: Vec<_> = mgr.local_senders(&room_name);
drop(mgr); // release lock before sending
for sender in &locals {
if let ParticipantSender::Quic(t) = sender {
let _ = t.send_media(&pkt).await;
}
}
break;
}
}
}
Err(_) => break,
}
}
};
tokio::select! {
_ = signal_task => {}
_ = media_task => {}
}
Ok(())
}

View File

@@ -9,6 +9,7 @@
pub mod auth; pub mod auth;
pub mod config; pub mod config;
pub mod federation;
pub mod handshake; pub mod handshake;
pub mod metrics; pub mod metrics;
pub mod pipeline; pub mod pipeline;

View File

@@ -24,11 +24,34 @@ use wzp_relay::room::{self, RoomManager};
use wzp_relay::session_mgr::SessionManager; use wzp_relay::session_mgr::SessionManager;
fn parse_args() -> RelayConfig { fn parse_args() -> RelayConfig {
let mut config = RelayConfig::default();
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
// Check for --config first to use as base
let mut config_file = None;
let mut i = 1;
while i < args.len() {
if args[i] == "--config" {
i += 1;
config_file = args.get(i).cloned();
}
i += 1;
}
let mut config = if let Some(ref path) = config_file {
wzp_relay::config::load_config(path)
.unwrap_or_else(|e| {
eprintln!("failed to load config from {path}: {e}");
std::process::exit(1);
})
} else {
RelayConfig::default()
};
// CLI flags override config file values
let mut i = 1; let mut i = 1;
while i < args.len() { while i < args.len() {
match args[i].as_str() { match args[i].as_str() {
"--config" => { i += 1; } // already handled
"--listen" => { "--listen" => {
i += 1; i += 1;
config.listen_addr = args.get(i).expect("--listen requires an address") config.listen_addr = args.get(i).expect("--listen requires an address")
@@ -90,9 +113,10 @@ fn parse_args() -> RelayConfig {
std::process::exit(0); std::process::exit(0);
} }
"--help" | "-h" => { "--help" | "-h" => {
eprintln!("Usage: wzp-relay [--listen <addr>] [--remote <addr>] [--auth-url <url>] [--metrics-port <port>] [--probe <addr>]... [--probe-mesh] [--mesh-status]"); eprintln!("Usage: wzp-relay [--config <path>] [--listen <addr>] [--remote <addr>] [--auth-url <url>] [--metrics-port <port>] [--probe <addr>]... [--probe-mesh] [--mesh-status]");
eprintln!(); eprintln!();
eprintln!("Options:"); eprintln!("Options:");
eprintln!(" --config <path> Load configuration from TOML file (peers, listen, etc.)");
eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433)"); eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433)");
eprintln!(" --remote <addr> Remote relay for forwarding (disables room mode)"); eprintln!(" --remote <addr> Remote relay for forwarding (disables room mode)");
eprintln!(" --auth-url <url> featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)"); eprintln!(" --auth-url <url> featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)");
@@ -258,16 +282,27 @@ async fn main() -> anyhow::Result<()> {
let relay_fp = relay_seed.derive_identity().public_identity().fingerprint; let relay_fp = relay_seed.derive_identity().public_identity().fingerprint;
info!(addr = %config.listen_addr, fingerprint = %relay_fp, "WarzonePhone relay starting"); info!(addr = %config.listen_addr, fingerprint = %relay_fp, "WarzonePhone relay starting");
// Print federation hint with our public IP + listen port let (server_config, cert_der) = wzp_transport::server_config_from_seed(&relay_seed.0);
let tls_fp = wzp_transport::tls_fingerprint(&cert_der);
info!(tls_fingerprint = %tls_fp, "TLS certificate (deterministic from relay identity)");
// Print federation hint with our public IP + listen port + TLS fingerprint
let listen_port = config.listen_addr.port(); let listen_port = config.listen_addr.port();
let public_ip = detect_public_ip(); let public_ip = detect_public_ip();
if let Some(ip) = &public_ip { if let Some(ip) = &public_ip {
info!("federation: to peer with this relay, add to peers config:"); info!("federation: to peer with this relay, add to relay.toml:");
info!(" - url: \"{ip}:{listen_port}\""); info!(" [[peers]]");
info!(" fingerprint: \"{relay_fp}\""); info!(" url = \"{ip}:{listen_port}\"");
info!(" fingerprint = \"{tls_fp}\"");
} }
let (server_config, _cert) = wzp_transport::server_config(); // Log configured peers
if !config.peers.is_empty() {
info!(count = config.peers.len(), "federation peers configured");
for p in &config.peers {
info!(url = %p.url, label = ?p.label, " peer");
}
}
let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?; let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?;
// Forward mode // Forward mode
@@ -285,6 +320,21 @@ async fn main() -> anyhow::Result<()> {
// Room manager (room mode only) // Room manager (room mode only)
let room_mgr = Arc::new(Mutex::new(RoomManager::new())); let room_mgr = Arc::new(Mutex::new(RoomManager::new()));
// Federation manager
let federation_mgr = if !config.peers.is_empty() {
let fm = Arc::new(wzp_relay::federation::FederationManager::new(
config.peers.clone(),
room_mgr.clone(),
endpoint.clone(),
tls_fp.clone(),
));
let fm_run = fm.clone();
tokio::spawn(async move { fm_run.run().await });
Some(fm)
} else {
None
};
// Session manager — enforces max concurrent sessions // Session manager — enforces max concurrent sessions
let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions)));
@@ -340,6 +390,7 @@ async fn main() -> anyhow::Result<()> {
let trunking_enabled = config.trunking_enabled; let trunking_enabled = config.trunking_enabled;
let presence = presence.clone(); let presence = presence.clone();
let route_resolver = route_resolver.clone(); let route_resolver = route_resolver.clone();
let federation_mgr = federation_mgr.clone();
tokio::spawn(async move { tokio::spawn(async move {
let addr = connection.remote_address(); let addr = connection.remote_address();
@@ -447,6 +498,38 @@ async fn main() -> anyhow::Result<()> {
return; return;
} }
// Federation connections use SNI "_federation"
if room_name == "_federation" {
if let Some(ref fm) = federation_mgr {
// Check if we recognize this peer by TLS fingerprint
let peer_fp = wzp_transport::tls_fingerprint(
&transport.connection()
.peer_identity()
.and_then(|id| id.downcast::<Vec<rustls::pki_types::CertificateDer>>().ok())
.and_then(|certs| certs.first().cloned())
.map(|c| c.to_vec())
.unwrap_or_default()
);
if let Some(peer_config) = fm.find_peer_by_fingerprint(&peer_fp) {
let peer_config = peer_config.clone();
let fm = fm.clone();
info!(%addr, label = ?peer_config.label, "inbound federation connection accepted");
fm.handle_inbound(transport, peer_config).await;
} else {
warn!(%addr, "unknown relay wants to federate");
info!(" to accept, add to relay.toml:");
info!(" [[peers]]");
info!(" url = \"{addr}\"");
info!(" fingerprint = \"{peer_fp}\"");
transport.close().await.ok();
}
} else {
info!(%addr, "federation connection rejected (no peers configured)");
transport.close().await.ok();
}
return;
}
// Auth check: if --auth-url is set, expect first signal message to be a token // Auth check: if --auth-url is set, expect first signal message to be a token
// Auth: if --auth-url is set, expect AuthToken as first signal // Auth: if --auth-url is set, expect AuthToken as first signal
let authenticated_fp: Option<String> = if let Some(ref url) = auth_url { let authenticated_fp: Option<String> = if let Some(ref url) = auth_url {

View File

@@ -27,11 +27,25 @@ fn next_id() -> ParticipantId {
NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed) NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed)
} }
/// Tracks where a participant originates from (for loop prevention).
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ParticipantOrigin {
/// Connected directly to this relay.
Local,
/// Virtual participant representing a federated peer relay.
Federated { relay_addr: std::net::SocketAddr },
}
/// How to send data to a participant — either via QUIC transport or WebSocket channel. /// How to send data to a participant — either via QUIC transport or WebSocket channel.
#[derive(Clone)] #[derive(Clone)]
pub enum ParticipantSender { pub enum ParticipantSender {
Quic(Arc<wzp_transport::QuinnTransport>), Quic(Arc<wzp_transport::QuinnTransport>),
WebSocket(tokio::sync::mpsc::Sender<Bytes>), WebSocket(tokio::sync::mpsc::Sender<Bytes>),
/// Federated peer relay — media is prefixed with an 8-byte room hash.
Federation {
transport: Arc<wzp_transport::QuinnTransport>,
room_hash: [u8; 8],
},
} }
impl ParticipantSender { impl ParticipantSender {
@@ -50,6 +64,14 @@ impl ParticipantSender {
}; };
transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}")) transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}"))
} }
ParticipantSender::Federation { transport, room_hash } => {
// Prefix media data with room hash for demuxing on the peer relay
let mut tagged = Vec::with_capacity(8 + data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(data);
transport.send_raw_datagram(&tagged)
.map_err(|e| format!("federation send: {e}"))
}
} }
} }
@@ -85,17 +107,21 @@ struct Participant {
sender: ParticipantSender, sender: ParticipantSender,
fingerprint: Option<String>, fingerprint: Option<String>,
alias: Option<String>, alias: Option<String>,
origin: ParticipantOrigin,
} }
/// A room holding multiple participants. /// A room holding multiple participants.
struct Room { struct Room {
participants: Vec<Participant>, participants: Vec<Participant>,
/// Remote participants from federated peers (for merged RoomUpdate).
federated_participants: HashMap<std::net::SocketAddr, Vec<wzp_proto::packet::RoomParticipant>>,
} }
impl Room { impl Room {
fn new() -> Self { fn new() -> Self {
Self { Self {
participants: Vec::new(), participants: Vec::new(),
federated_participants: HashMap::new(),
} }
} }
@@ -105,10 +131,11 @@ impl Room {
sender: ParticipantSender, sender: ParticipantSender,
fingerprint: Option<String>, fingerprint: Option<String>,
alias: Option<String>, alias: Option<String>,
origin: ParticipantOrigin,
) -> ParticipantId { ) -> ParticipantId {
let id = next_id(); let id = next_id();
info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room"); info!(room_size = self.participants.len() + 1, participant = id, %addr, ?origin, "joined room");
self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias }); self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias, origin });
id id
} }
@@ -125,15 +152,38 @@ impl Room {
.collect() .collect()
} }
/// Build a RoomUpdate participant list. /// Get senders with loop prevention for federation.
fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> { ///
/// - Media from a **local** participant → send to ALL others (local + federated)
/// - Media from a **federated** participant → send to LOCAL participants only
/// (the source relay already forwarded to its own locals and other peers)
fn others_for_origin(&self, exclude_id: ParticipantId, source_origin: &ParticipantOrigin) -> Vec<ParticipantSender> {
self.participants self.participants
.iter() .iter()
.filter(|p| p.id != exclude_id)
.filter(|p| match source_origin {
ParticipantOrigin::Local => true,
ParticipantOrigin::Federated { .. } => p.origin == ParticipantOrigin::Local,
})
.map(|p| p.sender.clone())
.collect()
}
/// Build a RoomUpdate participant list (local + federated).
fn participant_list(&self) -> Vec<wzp_proto::packet::RoomParticipant> {
let mut list: Vec<_> = self.participants
.iter()
.filter(|p| p.origin == ParticipantOrigin::Local)
.map(|p| wzp_proto::packet::RoomParticipant { .map(|p| wzp_proto::packet::RoomParticipant {
fingerprint: p.fingerprint.clone().unwrap_or_default(), fingerprint: p.fingerprint.clone().unwrap_or_default(),
alias: p.alias.clone(), alias: p.alias.clone(),
}) })
.collect() .collect();
// Merge federated participants from all peer relays
for remote in self.federated_participants.values() {
list.extend(remote.iter().cloned());
}
list
} }
/// Get all senders (for broadcasting to everyone including the joiner). /// Get all senders (for broadcasting to everyone including the joiner).
@@ -214,7 +264,7 @@ impl RoomManager {
return Err("not authorized for this room".to_string()); return Err("not authorized for this room".to_string());
} }
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string())); let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()), ParticipantOrigin::Local);
let update = wzp_proto::SignalMessage::RoomUpdate { let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32, count: room.len() as u32,
participants: room.participant_list(), participants: room.participant_list(),
@@ -235,6 +285,83 @@ impl RoomManager {
Ok(id) Ok(id)
} }
/// Join a room as a federated virtual participant.
pub fn join_federated(
&mut self,
room_name: &str,
relay_addr: std::net::SocketAddr,
sender: ParticipantSender,
remote_participants: Vec<wzp_proto::packet::RoomParticipant>,
) -> (ParticipantId, wzp_proto::SignalMessage, Vec<ParticipantSender>) {
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
room.federated_participants.insert(relay_addr, remote_participants);
let id = room.add(
relay_addr, sender, None, Some("(federated)".to_string()),
ParticipantOrigin::Federated { relay_addr },
);
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
(id, update, senders)
}
/// Update federated participant list for a room (from FederationParticipantUpdate).
pub fn update_federated_participants(
&mut self,
room_name: &str,
relay_addr: std::net::SocketAddr,
participants: Vec<wzp_proto::packet::RoomParticipant>,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
if let Some(room) = self.rooms.get_mut(room_name) {
room.federated_participants.insert(relay_addr, participants);
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
Some((update, senders))
} else {
None
}
}
/// Get the origin of a participant by ID.
pub fn participant_origin(&self, room_name: &str, participant_id: ParticipantId) -> Option<ParticipantOrigin> {
self.rooms.get(room_name)
.and_then(|room| room.participants.iter().find(|p| p.id == participant_id))
.map(|p| p.origin.clone())
}
/// Get list of active room names (for federation room announcements).
pub fn active_rooms(&self) -> Vec<String> {
self.rooms.keys().cloned().collect()
}
/// Get local participant list for a room (excludes federated virtual participants).
pub fn local_participants(&self, room_name: &str) -> Vec<wzp_proto::packet::RoomParticipant> {
self.rooms.get(room_name)
.map(|room| room.participants.iter()
.filter(|p| p.origin == ParticipantOrigin::Local)
.map(|p| wzp_proto::packet::RoomParticipant {
fingerprint: p.fingerprint.clone().unwrap_or_default(),
alias: p.alias.clone(),
})
.collect())
.unwrap_or_default()
}
/// Get senders for local-only participants in a room (for federation inbound media).
pub fn local_senders(&self, room_name: &str) -> Vec<ParticipantSender> {
self.rooms.get(room_name)
.map(|room| room.participants.iter()
.filter(|p| p.origin == ParticipantOrigin::Local)
.map(|p| p.sender.clone())
.collect())
.unwrap_or_default()
}
/// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty. /// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty.
pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> { pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
if let Some(room) = self.rooms.get_mut(room_name) { if let Some(room) = self.rooms.get_mut(room_name) {
@@ -467,6 +594,19 @@ async fn run_participant_plain(
ParticipantSender::WebSocket(_) => { ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await; let _ = other.send_raw(&pkt.payload).await;
} }
ParticipantSender::Federation { transport, room_hash } => {
// Send room-tagged datagram to federated peer
let data = pkt.to_bytes();
let mut tagged = Vec::with_capacity(8 + data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(&data);
if let Err(e) = transport.send_raw_datagram(&tagged) {
send_errors += 1;
if send_errors <= 5 {
warn!(room = %room_name, "federation forward error: {e}");
}
}
}
} }
} }
let fwd_ms = fwd_start.elapsed().as_millis() as u64; let fwd_ms = fwd_start.elapsed().as_millis() as u64;
@@ -634,6 +774,13 @@ async fn run_participant_trunked(
ParticipantSender::WebSocket(_) => { ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await; let _ = other.send_raw(&pkt.payload).await;
} }
ParticipantSender::Federation { transport, room_hash } => {
let data = pkt.to_bytes();
let mut tagged = Vec::with_capacity(8 + data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(&data);
let _ = transport.send_raw_datagram(&tagged);
}
} }
} }
let fwd_ms = fwd_start.elapsed().as_millis() as u64; let fwd_ms = fwd_start.elapsed().as_millis() as u64;

View File

@@ -16,6 +16,9 @@ async-trait = { workspace = true }
serde_json = "1" serde_json = "1"
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
rcgen = "0.13" rcgen = "0.13"
ed25519-dalek = { workspace = true }
hkdf = { workspace = true }
sha2 = { workspace = true }
[dev-dependencies] [dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@@ -6,20 +6,74 @@ use std::time::Duration;
use quinn::crypto::rustls::QuicClientConfig; use quinn::crypto::rustls::QuicClientConfig;
use quinn::crypto::rustls::QuicServerConfig; use quinn::crypto::rustls::QuicServerConfig;
/// Create a server configuration with a self-signed certificate (for testing). /// Create a server configuration with a self-signed certificate (random keypair).
/// ///
/// Tunes QUIC transport parameters for lossy VoIP: /// The certificate changes on every call. Use `server_config_from_seed` for
/// - 30s idle timeout /// a deterministic certificate that survives relay restarts.
/// - 5s keep-alive interval
/// - DATAGRAM extension enabled
/// - Conservative flow control for bandwidth-constrained links
pub fn server_config() -> (quinn::ServerConfig, Vec<u8>) { pub fn server_config() -> (quinn::ServerConfig, Vec<u8>) {
let cert_key = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]) let cert_key = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
.expect("failed to generate self-signed cert"); .expect("failed to generate self-signed cert");
let cert_der = rustls::pki_types::CertificateDer::from(cert_key.cert); let cert_der = rustls::pki_types::CertificateDer::from(cert_key.cert);
let key_der = let key_der =
rustls::pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der()).unwrap(); rustls::pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der()).unwrap();
build_server_config(cert_der, key_der)
}
/// Create a server configuration with a deterministic self-signed certificate
/// derived from a 32-byte seed. Same seed = same cert = same TLS fingerprint.
pub fn server_config_from_seed(seed: &[u8; 32]) -> (quinn::ServerConfig, Vec<u8>) {
use ed25519_dalek::pkcs8::EncodePrivateKey;
use ed25519_dalek::SigningKey;
use hkdf::Hkdf;
use sha2::Sha256;
// Derive Ed25519 key bytes from seed via HKDF
let hk = Hkdf::<Sha256>::new(None, seed);
let mut ed_bytes = [0u8; 32];
hk.expand(b"wzp-tls-ed25519", &mut ed_bytes)
.expect("HKDF expand failed");
// Create Ed25519 signing key and export as PKCS8 DER
let signing_key = SigningKey::from_bytes(&ed_bytes);
let pkcs8_doc = signing_key.to_pkcs8_der()
.expect("failed to encode Ed25519 key as PKCS8");
let key_der_for_rcgen = rustls::pki_types::PrivateKeyDer::try_from(pkcs8_doc.as_bytes().to_vec())
.expect("failed to wrap PKCS8 DER");
// Create rcgen KeyPair from DER
let key_pair = rcgen::KeyPair::from_der_and_sign_algo(
&key_der_for_rcgen,
&rcgen::PKCS_ED25519,
)
.expect("failed to create KeyPair from seed-derived Ed25519 key");
// Build self-signed cert with this deterministic keypair
let params = rcgen::CertificateParams::new(vec!["localhost".to_string()])
.expect("failed to create CertificateParams");
let cert = params.self_signed(&key_pair).expect("failed to self-sign cert");
let cert_der = rustls::pki_types::CertificateDer::from(cert.der().to_vec());
let key_der = rustls::pki_types::PrivateKeyDer::try_from(key_pair.serialize_der())
.expect("failed to serialize key DER");
build_server_config(cert_der, key_der)
}
/// Compute a hex-formatted SHA-256 fingerprint of a DER-encoded certificate.
///
/// Format: `xx:xx:xx:xx:...` (32 bytes = 64 hex chars with colons).
pub fn tls_fingerprint(cert_der: &[u8]) -> String {
use sha2::{Sha256, Digest};
let hash = Sha256::digest(cert_der);
hash.iter()
.map(|b| format!("{b:02x}"))
.collect::<Vec<_>>()
.join(":")
}
fn build_server_config(
cert_der: rustls::pki_types::CertificateDer<'static>,
key_der: rustls::pki_types::PrivateKeyDer<'static>,
) -> (quinn::ServerConfig, Vec<u8>) {
let mut server_crypto = rustls::ServerConfig::builder() let mut server_crypto = rustls::ServerConfig::builder()
.with_no_client_auth() .with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der) .with_single_cert(vec![cert_der.clone()], key_der)

View File

@@ -22,7 +22,7 @@ pub mod path_monitor;
pub mod quic; pub mod quic;
pub mod reliable; pub mod reliable;
pub use config::{client_config, server_config}; pub use config::{client_config, server_config, server_config_from_seed, tls_fingerprint};
pub use connection::{accept, connect, create_endpoint}; pub use connection::{accept, connect, create_endpoint};
pub use path_monitor::PathMonitor; pub use path_monitor::PathMonitor;
pub use quic::QuinnTransport; pub use quic::QuinnTransport;

View File

@@ -33,6 +33,13 @@ impl QuinnTransport {
&self.connection &self.connection
} }
/// Send raw bytes as a QUIC datagram (no MediaPacket framing).
pub fn send_raw_datagram(&self, data: &[u8]) -> Result<(), TransportError> {
self.connection
.send_datagram(bytes::Bytes::copy_from_slice(data))
.map_err(|e| TransportError::Internal(format!("datagram: {e}")))
}
/// Close the QUIC connection immediately (synchronous, no async needed). /// Close the QUIC connection immediately (synchronous, no async needed).
/// The relay will detect the close and remove this participant from the room. /// The relay will detect the close and remove this participant from the room.
pub fn close_now(&self) { pub fn close_now(&self) {