T1.5: Migrate emit/parse sites to v2 wire format

This commit is contained in:
Siavash Sameni
2026-05-11 12:36:45 +04:00
parent 9680b6ff34
commit c93d302656
120 changed files with 5953 additions and 2888 deletions

View File

@@ -7,9 +7,7 @@ fn main() {
.output();
let hash = match output {
Ok(o) if o.status.success() => {
String::from_utf8_lossy(&o.stdout).trim().to_string()
}
Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).trim().to_string(),
_ => "unknown".to_string(),
};

View File

@@ -32,10 +32,7 @@ pub struct AuthenticatedClient {
///
/// Calls `POST {auth_url}` with `{ "token": "..." }`.
/// Returns the client identity if valid, or an error string.
pub async fn validate_token(
auth_url: &str,
token: &str,
) -> Result<AuthenticatedClient, String> {
pub async fn validate_token(auth_url: &str, token: &str) -> Result<AuthenticatedClient, String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()

View File

@@ -83,7 +83,12 @@ impl CallRegistry {
}
/// Create a new pending call. Returns the call_id.
pub fn create_call(&mut self, call_id: String, caller_fp: String, callee_fp: String) -> &DirectCall {
pub fn create_call(
&mut self,
call_id: String,
caller_fp: String,
callee_fp: String,
) -> &DirectCall {
let call = DirectCall {
call_id: call_id.clone(),
caller_fingerprint: caller_fp,
@@ -189,7 +194,12 @@ impl CallRegistry {
}
/// Transition to Active state.
pub fn set_active(&mut self, call_id: &str, mode: wzp_proto::CallAcceptMode, room: String) -> bool {
pub fn set_active(
&mut self,
call_id: &str,
mode: wzp_proto::CallAcceptMode,
room: String,
) -> bool {
if let Some(call) = self.calls.get_mut(call_id) {
if call.state == DirectCallState::Pending || call.state == DirectCallState::Ringing {
call.state = DirectCallState::Active;
@@ -213,7 +223,8 @@ impl CallRegistry {
/// Find active/pending calls involving a fingerprint.
pub fn calls_for_fingerprint(&self, fp: &str) -> Vec<&DirectCall> {
self.calls.values()
self.calls
.values()
.filter(|c| {
c.state != DirectCallState::Ended
&& (c.caller_fingerprint == fp || c.callee_fingerprint == fp)
@@ -236,22 +247,25 @@ impl CallRegistry {
/// Returns call IDs of expired calls.
pub fn expire_stale(&mut self, timeout: Duration) -> Vec<DirectCall> {
let now = Instant::now();
let expired: Vec<String> = self.calls.iter()
let expired: Vec<String> = self
.calls
.iter()
.filter(|(_, c)| {
c.state == DirectCallState::Pending
&& now.duration_since(c.created_at) > timeout
c.state == DirectCallState::Pending && now.duration_since(c.created_at) > timeout
})
.map(|(id, _)| id.clone())
.collect();
expired.into_iter()
expired
.into_iter()
.filter_map(|id| self.calls.remove(&id))
.collect()
}
/// Number of active (non-ended) calls.
pub fn active_count(&self) -> usize {
self.calls.values()
self.calls
.values()
.filter(|c| c.state != DirectCallState::Ended)
.count()
}
@@ -270,9 +284,16 @@ mod tests {
assert!(reg.set_ringing("c1"));
assert_eq!(reg.get("c1").unwrap().state, DirectCallState::Ringing);
assert!(reg.set_active("c1", wzp_proto::CallAcceptMode::AcceptGeneric, "_call:c1".into()));
assert!(reg.set_active(
"c1",
wzp_proto::CallAcceptMode::AcceptGeneric,
"_call:c1".into()
));
assert_eq!(reg.get("c1").unwrap().state, DirectCallState::Active);
assert_eq!(reg.get("c1").unwrap().room_name.as_deref(), Some("_call:c1"));
assert_eq!(
reg.get("c1").unwrap().room_name.as_deref(),
Some("_call:c1")
);
let ended = reg.end_call("c1").unwrap();
assert_eq!(ended.state, DirectCallState::Ended);
@@ -329,10 +350,7 @@ mod tests {
// Both addrs are independently readable — the relay uses
// them to cross-wire peer_direct_addr in CallSetup.
let c = reg.get("c1").unwrap();
assert_eq!(
c.caller_reflexive_addr.as_deref(),
Some("192.0.2.1:4433")
);
assert_eq!(c.caller_reflexive_addr.as_deref(), Some("192.0.2.1:4433"));
assert_eq!(
c.callee_reflexive_addr.as_deref(),
Some("198.51.100.9:4433")

View File

@@ -145,7 +145,10 @@ pub struct RelayInfo {
}
/// Load config from path, or create a personalized example config if it doesn't exist.
pub fn load_or_create_config(path: &str, info: Option<&RelayInfo>) -> Result<RelayConfig, anyhow::Error> {
pub fn load_or_create_config(
path: &str,
info: Option<&RelayInfo>,
) -> Result<RelayConfig, anyhow::Error> {
let p = std::path::Path::new(path);
if p.exists() {
return load_config(path);
@@ -164,7 +167,9 @@ pub fn load_or_create_config(path: &str, info: Option<&RelayInfo>) -> Result<Rel
/// Generate an example TOML config, personalized with this relay's info if available.
fn generate_example_config(info: Option<&RelayInfo>) -> String {
let listen = info.map(|i| i.listen_addr.as_str()).unwrap_or("0.0.0.0:4433");
let listen = info
.map(|i| i.listen_addr.as_str())
.unwrap_or("0.0.0.0:4433");
let peer_example = if let Some(i) = info {
let ip = i.public_ip.as_deref().unwrap_or("this-relay-ip");
format!(

View File

@@ -25,16 +25,13 @@ pub struct Event {
pub src: Option<String>,
/// Packet sequence number.
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<u16>,
pub seq: Option<u32>,
/// Codec identifier.
#[serde(skip_serializing_if = "Option::is_none")]
pub codec: Option<String>,
/// FEC block ID.
/// FEC block ID (low byte) and symbol index (high byte).
#[serde(skip_serializing_if = "Option::is_none")]
pub fec_block: Option<u8>,
/// FEC symbol index.
#[serde(skip_serializing_if = "Option::is_none")]
pub fec_sym: Option<u8>,
pub fec_block: Option<u16>,
/// Is FEC repair packet.
#[serde(skip_serializing_if = "Option::is_none")]
pub repair: Option<bool>,
@@ -60,7 +57,9 @@ pub struct Event {
impl Event {
fn now() -> String {
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string()
chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.6fZ")
.to_string()
}
/// Create a minimal event with just type and timestamp.
@@ -73,7 +72,6 @@ impl Event {
seq: None,
codec: None,
fec_block: None,
fec_sym: None,
repair: None,
len: None,
to_count: None,
@@ -85,33 +83,59 @@ impl Event {
}
/// Set room.
pub fn room(mut self, room: &str) -> Self { self.room = Some(room.to_string()); self }
pub fn room(mut self, room: &str) -> Self {
self.room = Some(room.to_string());
self
}
/// Set source.
pub fn src(mut self, src: &str) -> Self { self.src = Some(src.to_string()); self }
pub fn src(mut self, src: &str) -> Self {
self.src = Some(src.to_string());
self
}
/// Set packet header fields from a MediaPacket.
pub fn packet(mut self, pkt: &wzp_proto::MediaPacket) -> Self {
self.seq = Some(pkt.header.seq);
self.codec = Some(format!("{:?}", pkt.header.codec_id));
self.fec_block = Some(pkt.header.fec_block);
self.fec_sym = Some(pkt.header.fec_symbol);
self.repair = Some(pkt.header.is_repair);
self.repair = Some(pkt.header.is_repair());
self.len = Some(pkt.payload.len());
self
}
/// Set seq only (when full packet not available).
pub fn seq(mut self, seq: u16) -> Self { self.seq = Some(seq); self }
pub fn seq(mut self, seq: u32) -> Self {
self.seq = Some(seq);
self
}
/// Set payload length.
pub fn len(mut self, len: usize) -> Self { self.len = Some(len); self }
pub fn len(mut self, len: usize) -> Self {
self.len = Some(len);
self
}
/// Set recipient count.
pub fn to_count(mut self, n: usize) -> Self { self.to_count = Some(n); self }
pub fn to_count(mut self, n: usize) -> Self {
self.to_count = Some(n);
self
}
/// Set peer label.
pub fn peer(mut self, peer: &str) -> Self { self.peer = Some(peer.to_string()); self }
pub fn peer(mut self, peer: &str) -> Self {
self.peer = Some(peer.to_string());
self
}
/// Set drop reason.
pub fn reason(mut self, reason: &str) -> Self { self.reason = Some(reason.to_string()); self }
pub fn reason(mut self, reason: &str) -> Self {
self.reason = Some(reason.to_string());
self
}
/// Set presence action.
pub fn action(mut self, action: &str) -> Self { self.action = Some(action.to_string()); self }
pub fn action(mut self, action: &str) -> Self {
self.action = Some(action.to_string());
self
}
/// Set participant count.
pub fn participants(mut self, n: usize) -> Self { self.participants = Some(n); self }
pub fn participants(mut self, n: usize) -> Self {
self.participants = Some(n);
self
}
}
/// Handle for emitting events. Cheap to clone.
@@ -181,8 +205,12 @@ async fn writer_task(path: PathBuf, mut rx: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = rx.recv().await {
match serde_json::to_string(&event) {
Ok(json) => {
if writer.write_all(json.as_bytes()).await.is_err() { break; }
if writer.write_all(b"\n").await.is_err() { break; }
if writer.write_all(json.as_bytes()).await.is_err() {
break;
}
if writer.write_all(b"\n").await.is_err() {
break;
}
count += 1;
// Flush every 100 events
if count % 100 == 0 {

View File

@@ -11,7 +11,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::Bytes;
use sha2::{Sha256, Digest};
use sha2::{Digest, Sha256};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
@@ -56,13 +56,14 @@ impl Deduplicator {
}
/// Returns true if this packet is a duplicate (already seen within TTL).
fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16, extra: u64) -> bool {
fn is_dup(&mut self, room_hash: &[u8; 8], seq: u32, extra: u64) -> bool {
let key = u64::from_be_bytes(*room_hash) ^ (seq as u64) ^ extra;
let now = Instant::now();
// Periodic cleanup (every ~256 packets)
if self.entries.len() > 256 {
self.entries.retain(|_, ts| now.duration_since(*ts) < self.ttl);
self.entries
.retain(|_, ts| now.duration_since(*ts) < self.ttl);
}
if let Some(ts) = self.entries.get(&key) {
@@ -215,8 +216,11 @@ impl FederationManager {
pub async fn broadcast_signal(&self, msg: &wzp_proto::SignalMessage) -> usize {
let peers: Vec<(String, String, Arc<QuinnTransport>)> = {
let links = self.peer_links.lock().await;
links.iter().map(|(fp, l)| (fp.clone(), l.label.clone(), l.transport.clone())).collect()
}; // lock released
links
.iter()
.map(|(fp, l)| (fp.clone(), l.label.clone(), l.transport.clone()))
.collect()
}; // lock released
let mut count = 0;
for (fp, label, transport) in &peers {
match transport.send_signal(msg).await {
@@ -249,7 +253,7 @@ impl FederationManager {
let transport = {
let links = self.peer_links.lock().await;
links.get(&normalized).map(|l| l.transport.clone())
}; // lock released
}; // lock released
match transport {
Some(t) => t
.send_signal(msg)
@@ -300,9 +304,10 @@ impl FederationManager {
return Some(room.to_string());
}
// Hashed match (desktop clients hash room names for SNI privacy)
self.global_rooms.iter().find(|name| {
wzp_crypto::hash_room_name(name) == room
}).map(|s| s.to_string())
self.global_rooms
.iter()
.find(|name| wzp_crypto::hash_room_name(name) == room)
.map(|s| s.to_string())
}
/// Get the canonical federation room hash for a room.
@@ -371,7 +376,10 @@ impl FederationManager {
/// Get all remote participants for a room from all peer links.
/// Deduplicates by fingerprint (same participant may appear via multiple links).
pub async fn get_remote_participants(&self, room: &str) -> Vec<wzp_proto::packet::RoomParticipant> {
pub async fn get_remote_participants(
&self,
room: &str,
) -> Vec<wzp_proto::packet::RoomParticipant> {
let canonical = self.resolve_global_room(room);
let links = self.peer_links.lock().await;
let mut result = Vec::new();
@@ -407,12 +415,22 @@ impl FederationManager {
/// the other room-tagged helpers and for future per-room-name logging
/// or rate limiting; the body currently forwards on `room_hash` alone
/// because that's what the wire format carries.
pub async fn forward_to_peers(&self, _room_name: &str, room_hash: &[u8; 8], media_data: &Bytes) {
pub async fn forward_to_peers(
&self,
_room_name: &str,
room_hash: &[u8; 8],
media_data: &Bytes,
) {
let peers: Vec<(String, Arc<QuinnTransport>)> = {
let links = self.peer_links.lock().await;
if links.is_empty() { return; }
links.values().map(|l| (l.label.clone(), l.transport.clone())).collect()
}; // lock released
if links.is_empty() {
return;
}
links
.values()
.map(|l| (l.label.clone(), l.transport.clone()))
.collect()
}; // lock released
for (label, transport) in &peers {
let mut tagged = Vec::with_capacity(8 + media_data.len());
@@ -420,8 +438,10 @@ impl FederationManager {
tagged.extend_from_slice(media_data);
match transport.send_raw_datagram(&tagged) {
Ok(()) => {
self.metrics.federation_packets_forwarded
.with_label_values(&[label, "out"]).inc();
self.metrics
.federation_packets_forwarded
.with_label_values(&[label, "out"])
.inc();
}
Err(e) => warn!(peer = %label, "federation send error: {e}"),
}
@@ -431,20 +451,25 @@ impl FederationManager {
// ── Trust verification (kept from previous implementation) ──
pub fn find_peer_by_fingerprint(&self, fp: &str) -> Option<&PeerConfig> {
self.peers.iter().find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp))
self.peers
.iter()
.find(|p| normalize_fp(&p.fingerprint) == normalize_fp(fp))
}
pub fn find_peer_by_addr(&self, addr: SocketAddr) -> Option<&PeerConfig> {
let addr_ip = addr.ip();
self.peers.iter().find(|p| {
p.url.parse::<SocketAddr>()
p.url
.parse::<SocketAddr>()
.map(|sa| sa.ip() == addr_ip)
.unwrap_or(false)
})
}
pub fn find_trusted_by_fingerprint(&self, fp: &str) -> Option<&TrustedConfig> {
self.trusted.iter().find(|t| normalize_fp(&t.fingerprint) == normalize_fp(fp))
self.trusted
.iter()
.find(|t| normalize_fp(&t.fingerprint) == normalize_fp(fp))
}
pub fn check_inbound_trust(&self, addr: SocketAddr, hello_fp: &str) -> Option<String> {
@@ -452,7 +477,12 @@ impl FederationManager {
return Some(peer.label.clone().unwrap_or_else(|| peer.url.clone()));
}
if let Some(trusted) = self.find_trusted_by_fingerprint(hello_fp) {
return Some(trusted.label.clone().unwrap_or_else(|| hello_fp[..16].to_string()));
return Some(
trusted
.label
.clone()
.unwrap_or_else(|| hello_fp[..16].to_string()),
);
}
None
}
@@ -471,7 +501,8 @@ pub async fn run_federation_media_egress(
if count == 1 || count % 250 == 0 {
info!(room = %out.room_name, count, "federation egress: forwarding media");
}
fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data).await;
fm.forward_to_peers(&out.room_name, &out.room_hash, &out.data)
.await;
}
info!(total = count, "federation egress task ended");
}
@@ -536,7 +567,9 @@ async fn run_stale_presence_sweeper(fm: Arc<FederationManager>) {
let links = fm.peer_links.lock().await;
let mut stale = Vec::new();
for (fp, link) in links.iter() {
if link.last_seen.elapsed() > stale_threshold && !link.remote_participants.is_empty() {
if link.last_seen.elapsed() > stale_threshold
&& !link.remote_participants.is_empty()
{
for room in link.remote_participants.keys() {
stale.push((fp.clone(), room.clone()));
}
@@ -615,7 +648,10 @@ async fn run_peer_loop(fm: Arc<FederationManager>, peer: PeerConfig) {
}
/// Connect to a peer relay and send hello.
async fn connect_to_peer(fm: &FederationManager, peer: &PeerConfig) -> Result<Arc<QuinnTransport>, anyhow::Error> {
async fn connect_to_peer(
fm: &FederationManager,
peer: &PeerConfig,
) -> Result<Arc<QuinnTransport>, anyhow::Error> {
let addr: SocketAddr = peer.url.parse()?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&fm.endpoint, addr, "_federation", client_cfg).await?;
@@ -625,7 +661,9 @@ async fn connect_to_peer(fm: &FederationManager, peer: &PeerConfig) -> Result<Ar
let hello = SignalMessage::FederationHello {
tls_fingerprint: fm.local_tls_fp.clone(),
};
transport.send_signal(&hello).await
transport
.send_signal(&hello)
.await
.map_err(|e| anyhow::anyhow!("federation hello send failed: {e}"))?;
info!(peer_url = %peer.url, label = ?peer.label, "federation: connected (hello sent)");
@@ -642,16 +680,22 @@ async fn run_federation_link(
peer_label: String,
) -> Result<(), anyhow::Error> {
// Register peer link + metrics
fm.metrics.federation_peer_status.with_label_values(&[&peer_label]).set(1);
fm.metrics
.federation_peer_status
.with_label_values(&[&peer_label])
.set(1);
{
let mut links = fm.peer_links.lock().await;
links.insert(peer_fp.clone(), PeerLink {
transport: transport.clone(),
label: peer_label.clone(),
active_rooms: HashSet::new(),
remote_participants: HashMap::new(),
last_seen: Instant::now(),
});
links.insert(
peer_fp.clone(),
PeerLink {
transport: transport.clone(),
label: peer_label.clone(),
active_rooms: HashSet::new(),
remote_participants: HashMap::new(),
last_seen: Instant::now(),
},
);
}
// Announce our currently active global rooms to this new peer
@@ -665,7 +709,10 @@ async fn run_federation_link(
if fm.is_global_room(room_name) {
let participants = fm.room_mgr.local_participant_list(room_name);
info!(peer = %peer_label, room = %room_name, participants = participants.len(), "announcing local global room to new peer");
msgs.push(SignalMessage::GlobalRoomActive { room: room_name.clone(), participants });
msgs.push(SignalMessage::GlobalRoomActive {
room: room_name.clone(),
participants,
});
}
}
@@ -761,7 +808,10 @@ async fn run_federation_link(
}
// Cleanup: remove peer link + metrics
fm.metrics.federation_peer_status.with_label_values(&[&peer_label]).set(0);
fm.metrics
.federation_peer_status
.with_label_values(&[&peer_label])
.set(0);
{
let mut links = fm.peer_links.lock().await;
links.remove(&peer_fp);
@@ -799,34 +849,43 @@ async fn handle_signal(
fm.metrics.federation_active_rooms.set(total as i64);
if let Some(link) = links.get_mut(peer_fp) {
// Tag remote participants with their relay label
let tagged: Vec<_> = participants.iter().map(|p| {
let mut tagged = p.clone();
if tagged.relay_label.is_none() {
tagged.relay_label = Some(link.label.clone());
}
tagged
}).collect();
let tagged: Vec<_> = participants
.iter()
.map(|p| {
let mut tagged = p.clone();
if tagged.relay_label.is_none() {
tagged.relay_label = Some(link.label.clone());
}
tagged
})
.collect();
link.remote_participants.insert(room.clone(), tagged);
}
// Propagate to other peers (with relay labels preserved)
let tagged_for_propagation = if let Some(link) = links.get(peer_fp) {
let label = link.label.clone();
participants.iter().map(|p| {
let mut t = p.clone();
if t.relay_label.is_none() {
t.relay_label = Some(label.clone());
}
t
}).collect::<Vec<_>>()
participants
.iter()
.map(|p| {
let mut t = p.clone();
if t.relay_label.is_none() {
t.relay_label = Some(label.clone());
}
t
})
.collect::<Vec<_>>()
} else {
participants.clone()
};
for (fp, link) in links.iter() {
if fp != peer_fp {
let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive {
room: room.clone(),
participants: tagged_for_propagation.clone(),
}).await;
let _ = link
.transport
.send_signal(&SignalMessage::GlobalRoomActive {
room: room.clone(),
participants: tagged_for_propagation.clone(),
})
.await;
}
}
drop(links);
@@ -835,19 +894,25 @@ async fn handle_signal(
// Find the local room name (may be hashed or raw)
let active = fm.room_mgr.active_rooms();
for local_room in &active {
if fm.is_global_room(local_room) && fm.resolve_global_room(local_room) == fm.resolve_global_room(&room) {
if fm.is_global_room(local_room)
&& fm.resolve_global_room(local_room) == fm.resolve_global_room(&room)
{
// Build merged participant list: local + all remote (deduped)
let mut all_participants = fm.room_mgr.local_participant_list(local_room);
{
let links = fm.peer_links.lock().await;
for link in links.values() {
if let Some(ref canonical) = fm.resolve_global_room(local_room) {
if let Some(remote) = link.remote_participants.get(canonical.as_str()) {
if let Some(remote) =
link.remote_participants.get(canonical.as_str())
{
all_participants.extend(remote.iter().cloned());
}
// Also check raw room name, but only if different from canonical
if canonical != local_room {
if let Some(remote) = link.remote_participants.get(local_room) {
if let Some(remote) =
link.remote_participants.get(local_room)
{
all_participants.extend(remote.iter().cloned());
}
}
@@ -890,7 +955,9 @@ async fn handle_signal(
let canonical = fm.resolve_global_room(&room);
let mut result = Vec::new();
for (fp, link) in links.iter() {
if fp == peer_fp { continue; }
if fp == peer_fp {
continue;
}
if let Some(ref c) = canonical {
if let Some(remote) = link.remote_participants.get(c.as_str()) {
result.extend(remote.iter().cloned());
@@ -904,11 +971,16 @@ async fn handle_signal(
// Propagate to other peers: send updated GlobalRoomActive with revised list,
// or GlobalRoomInactive if no participants remain anywhere
let local_active = fm.room_mgr.active_rooms().iter().any(|r| fm.resolve_global_room(r) == fm.resolve_global_room(&room));
let local_active = fm
.room_mgr
.active_rooms()
.iter()
.any(|r| fm.resolve_global_room(r) == fm.resolve_global_room(&room));
let has_remaining = !remaining_remote.is_empty() || local_active;
// Collect peer transports to send to (avoid holding lock across await)
let peer_sends: Vec<_> = links.iter()
let peer_sends: Vec<_> = links
.iter()
.filter(|(fp, _)| *fp != peer_fp)
.map(|(_, link)| link.transport.clone())
.collect();
@@ -920,7 +992,8 @@ async fn handle_signal(
if local_active {
for local_room in fm.room_mgr.active_rooms() {
if fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) {
updated_participants.extend(fm.room_mgr.local_participant_list(&local_room));
updated_participants
.extend(fm.room_mgr.local_participant_list(&local_room));
break;
}
}
@@ -943,7 +1016,9 @@ async fn handle_signal(
// Broadcast updated RoomUpdate to local clients (remote participant removed)
let active = fm.room_mgr.active_rooms();
for local_room in &active {
if fm.is_global_room(local_room) && fm.resolve_global_room(local_room) == fm.resolve_global_room(&room) {
if fm.is_global_room(local_room)
&& fm.resolve_global_room(local_room) == fm.resolve_global_room(&room)
{
let mut all_participants = fm.room_mgr.local_participant_list(local_room);
all_participants.extend(remaining_remote.iter().cloned());
// Deduplicate by fingerprint
@@ -972,7 +1047,10 @@ async fn handle_signal(
// Loop prevention: drop any forward whose origin matches
// our own federation TLS fingerprint. With
// broadcast-to-all-peers this prevents A→B→A echo loops.
SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => {
SignalMessage::FederatedSignalForward {
inner,
origin_relay_fp,
} => {
if origin_relay_fp == fm.local_tls_fp {
tracing::debug!(
peer = %peer_label,
@@ -1016,12 +1094,10 @@ async fn handle_signal(
}
/// Handle an incoming federation datagram (room-hash-tagged media).
async fn handle_datagram(
fm: &Arc<FederationManager>,
source_peer_fp: &str,
data: Bytes,
) {
if data.len() < 12 { return; } // 8-byte hash + min packet
async fn handle_datagram(fm: &Arc<FederationManager>, source_peer_fp: &str, data: Bytes) {
if data.len() < 12 {
return;
} // 8-byte hash + min packet
let mut rh = [0u8; 8];
rh.copy_from_slice(&data[..8]);
@@ -1030,7 +1106,8 @@ async fn handle_datagram(
let pkt = match wzp_proto::MediaPacket::from_bytes(media_bytes.clone()) {
Some(pkt) => pkt,
None => {
fm.event_log.emit(Event::new("federation_ingress_malformed").len(data.len()));
fm.event_log
.emit(Event::new("federation_ingress_malformed").len(data.len()));
return;
}
};
@@ -1038,13 +1115,22 @@ async fn handle_datagram(
// Event log: federation ingress
let peer_label = {
let links = fm.peer_links.lock().await;
links.get(source_peer_fp).map(|l| l.label.clone()).unwrap_or_default()
links
.get(source_peer_fp)
.map(|l| l.label.clone())
.unwrap_or_default()
};
fm.event_log.emit(Event::new("federation_ingress").packet(&pkt).peer(&peer_label));
fm.event_log.emit(
Event::new("federation_ingress")
.packet(&pkt)
.peer(&peer_label),
);
// Count inbound federation packet + update last_seen
fm.metrics.federation_packets_forwarded
.with_label_values(&[source_peer_fp, "in"]).inc();
fm.metrics
.federation_packets_forwarded
.with_label_values(&[source_peer_fp, "in"])
.inc();
{
let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(source_peer_fp) {
@@ -1065,7 +1151,11 @@ async fn handle_datagram(
{
let mut dedup = fm.dedup.lock().await;
if dedup.is_dup(&rh, pkt.header.seq, payload_hash) {
fm.event_log.emit(Event::new("dedup_drop").seq(pkt.header.seq).peer(&peer_label));
fm.event_log.emit(
Event::new("dedup_drop")
.seq(pkt.header.seq)
.peer(&peer_label),
);
return;
}
}
@@ -1074,18 +1164,33 @@ async fn handle_datagram(
let room_name = {
let active = fm.room_mgr.active_rooms();
// First: check local rooms (has participants)
active.iter().find(|r| room_hash(r) == rh).cloned()
.or_else(|| active.iter().find(|r| fm.global_room_hash(r) == rh).cloned())
active
.iter()
.find(|r| room_hash(r) == rh)
.cloned()
.or_else(|| {
active
.iter()
.find(|r| fm.global_room_hash(r) == rh)
.cloned()
})
// Second: check static global room config (hub relay may have no local participants)
.or_else(|| {
fm.global_rooms.iter().find(|name| room_hash(name) == rh).cloned()
fm.global_rooms
.iter()
.find(|name| room_hash(name) == rh)
.cloned()
})
};
let room_name = match room_name {
Some(r) => r,
None => {
fm.event_log.emit(Event::new("room_not_found").seq(pkt.header.seq).peer(&peer_label));
fm.event_log.emit(
Event::new("room_not_found")
.seq(pkt.header.seq)
.peer(&peer_label),
);
// Phase 4.1 diagnostic: log the hash + active rooms
// so we can diagnose cross-relay call-* media routing
// failures. This fires when a peer relay sends media
@@ -1107,10 +1212,15 @@ async fn handle_datagram(
// Rate limit per room
if FEDERATION_RATE_LIMIT_PPS > 0 {
let mut limiters = fm.rate_limiters.lock().await;
let limiter = limiters.entry(room_name.clone())
let limiter = limiters
.entry(room_name.clone())
.or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS));
if !limiter.allow() {
fm.event_log.emit(Event::new("rate_limit_drop").room(&room_name).seq(pkt.header.seq));
fm.event_log.emit(
Event::new("rate_limit_drop")
.room(&room_name)
.seq(pkt.header.seq),
);
return;
}
}
@@ -1122,14 +1232,26 @@ async fn handle_datagram(
match sender {
room::ParticipantSender::Quic(t) => {
if let Err(e) = t.send_raw_datagram(&media_bytes) {
fm.event_log.emit(Event::new("local_deliver_error").room(&room_name).seq(pkt.header.seq).reason(&e.to_string()));
fm.event_log.emit(
Event::new("local_deliver_error")
.room(&room_name)
.seq(pkt.header.seq)
.reason(&e.to_string()),
);
warn!("federation local delivery error: {e}");
}
}
room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; }
room::ParticipantSender::WebSocket(_) => {
let _ = sender.send_raw(&pkt.payload).await;
}
}
}
fm.event_log.emit(Event::new("local_deliver").room(&room_name).seq(pkt.header.seq).to_count(locals.len()));
fm.event_log.emit(
Event::new("local_deliver")
.room(&room_name)
.seq(pkt.header.seq)
.to_count(locals.len()),
);
// Multi-hop: forward to ALL other connected peers (not the source)
// Don't filter by active_rooms — the receiving peer decides whether to deliver

View File

@@ -20,29 +20,48 @@ use wzp_proto::{MediaTransport, QualityProfile, SignalMessage};
pub async fn accept_handshake(
transport: &dyn MediaTransport,
seed: &[u8; 32],
) -> Result<(Box<dyn CryptoSession>, QualityProfile, String, Option<String>), anyhow::Error> {
) -> Result<
(
Box<dyn CryptoSession>,
QualityProfile,
String,
Option<String>,
),
anyhow::Error,
> {
// 1. Receive CallOffer
let offer = transport
.recv_signal()
.await?
.ok_or_else(|| anyhow::anyhow!("connection closed before receiving CallOffer"))?;
let (caller_identity_pub, caller_ephemeral_pub, caller_signature, supported_profiles, caller_alias) =
match offer {
SignalMessage::CallOffer {
identity_pub,
ephemeral_pub,
signature,
supported_profiles,
alias,
} => (identity_pub, ephemeral_pub, signature, supported_profiles, alias),
other => {
return Err(anyhow::anyhow!(
"expected CallOffer, got {:?}",
std::mem::discriminant(&other)
))
}
};
let (
caller_identity_pub,
caller_ephemeral_pub,
caller_signature,
supported_profiles,
caller_alias,
) = match offer {
SignalMessage::CallOffer {
identity_pub,
ephemeral_pub,
signature,
supported_profiles,
alias,
} => (
identity_pub,
ephemeral_pub,
signature,
supported_profiles,
alias,
),
other => {
return Err(anyhow::anyhow!(
"expected CallOffer, got {:?}",
std::mem::discriminant(&other)
));
}
};
// 2. Verify caller's signature over (ephemeral_pub || "call-offer")
let mut verify_data = Vec::with_capacity(32 + 10);
@@ -81,11 +100,11 @@ pub async fn accept_handshake(
// Derive caller fingerprint: SHA-256(Ed25519 pub)[:16], formatted as xxxx:xxxx:...
// Must match the format used in signal registration and presence.
let caller_fp = {
use sha2::{Sha256, Digest};
use sha2::{Digest, Sha256};
let hash = Sha256::digest(&caller_identity_pub);
let fp = wzp_crypto::Fingerprint([
hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7],
hash[8], hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15],
hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], hash[8],
hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15],
]);
fp.to_string()
};

View File

@@ -12,7 +12,6 @@ pub mod call_registry;
pub mod config;
pub mod event_log;
pub mod federation;
pub mod signal_hub;
pub mod handshake;
pub mod metrics;
pub mod pipeline;
@@ -22,6 +21,7 @@ pub mod relay_link;
pub mod room;
pub mod route;
pub mod session_mgr;
pub mod signal_hub;
pub mod trunk;
pub mod ws;

View File

@@ -8,8 +8,8 @@
//! The web bridge connects with room name as SNI.
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use clap::Parser;
@@ -116,7 +116,9 @@ fn parse_args() -> CliResult {
}
// Track if we need to create the config after identity is known
let config_needs_create = args.config_file.as_ref()
let config_needs_create = args
.config_file
.as_ref()
.map(|p| !std::path::Path::new(p).exists())
.unwrap_or(false);
@@ -125,11 +127,10 @@ fn parse_args() -> CliResult {
// Will be re-created with personalized info after identity is loaded
RelayConfig::default()
} else {
wzp_relay::config::load_config(path)
.unwrap_or_else(|e| {
eprintln!("failed to load config from {path}: {e}");
std::process::exit(1);
})
wzp_relay::config::load_config(path).unwrap_or_else(|e| {
eprintln!("failed to load config from {path}: {e}");
std::process::exit(1);
})
}
} else {
RelayConfig::default()
@@ -164,7 +165,9 @@ fn parse_args() -> CliResult {
config.static_dir = Some(dir);
}
for name in args.global_room {
config.global_rooms.push(wzp_relay::config::GlobalRoomConfig { name });
config
.global_rooms
.push(wzp_relay::config::GlobalRoomConfig { name });
}
if let Some(tap) = args.debug_tap {
config.debug_tap = Some(tap);
@@ -199,7 +202,9 @@ async fn run_upstream(
let mut pipe = pipeline.lock().await;
let decoded = pipe.ingest(pkt);
let mut out = Vec::new();
for p in decoded { out.extend(pipe.prepare_outbound(p)); }
for p in decoded {
out.extend(pipe.prepare_outbound(p));
}
out
};
for p in &outbound {
@@ -208,10 +213,18 @@ async fn run_upstream(
return;
}
}
stats.upstream_packets.fetch_add(outbound.len() as u64, Ordering::Relaxed);
stats
.upstream_packets
.fetch_add(outbound.len() as u64, Ordering::Relaxed);
}
Ok(None) => {
info!("client disconnected (upstream)");
break;
}
Err(e) => {
error!("upstream recv: {e}");
break;
}
Ok(None) => { info!("client disconnected (upstream)"); break; }
Err(e) => { error!("upstream recv: {e}"); break; }
}
}
}
@@ -229,7 +242,9 @@ async fn run_downstream(
let mut pipe = pipeline.lock().await;
let decoded = pipe.ingest(pkt);
let mut out = Vec::new();
for p in decoded { out.extend(pipe.prepare_outbound(p)); }
for p in decoded {
out.extend(pipe.prepare_outbound(p));
}
out
};
for p in &outbound {
@@ -238,10 +253,18 @@ async fn run_downstream(
return;
}
}
stats.downstream_packets.fetch_add(outbound.len() as u64, Ordering::Relaxed);
stats
.downstream_packets
.fetch_add(outbound.len() as u64, Ordering::Relaxed);
}
Ok(None) => {
info!("remote disconnected (downstream)");
break;
}
Err(e) => {
error!("downstream recv: {e}");
break;
}
Ok(None) => { info!("remote disconnected (downstream)"); break; }
Err(e) => { error!("downstream recv: {e}"); break; }
}
}
}
@@ -266,7 +289,12 @@ const BUILD_GIT_HASH: &str = env!("WZP_BUILD_HASH");
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let CliResult { config, identity_path, config_file, config_needs_create } = parse_args();
let CliResult {
config,
identity_path,
config_file,
config_needs_create,
} = parse_args();
tracing_subscriber::fmt().init();
info!(version = BUILD_GIT_HASH, "wzp-relay build");
rustls::crypto::ring::default_provider()
@@ -303,7 +331,10 @@ async fn main() -> anyhow::Result<()> {
info!("loaded relay identity from {}", id_path.display());
s
} else {
warn!("corrupt identity file {}, generating new", id_path.display());
warn!(
"corrupt identity file {}, generating new",
id_path.display()
);
let s = wzp_crypto::Seed::generate();
let hex: String = s.0.iter().map(|b| format!("{b:02x}")).collect();
let _ = std::fs::write(&id_path, &hex);
@@ -386,7 +417,7 @@ async fn main() -> anyhow::Result<()> {
} else {
// Probe via a dummy "connected" UDP socket. Never actually sends.
match std::net::UdpSocket::bind("0.0.0.0:0")
.and_then(|s| { s.connect("8.8.8.8:80").map(|_| s) })
.and_then(|s| s.connect("8.8.8.8:80").map(|_| s))
.and_then(|s| s.local_addr())
{
Ok(a) if !a.ip().is_loopback() => a.ip(),
@@ -398,47 +429,48 @@ async fn main() -> anyhow::Result<()> {
info!(%advertised_addr_str, "relay advertised address for CallSetup");
// Forward mode
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> =
if let Some(remote_addr) = config.remote_relay {
info!(%remote_addr, "forward mode → remote relay");
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?;
Some(Arc::new(wzp_transport::QuinnTransport::new(conn)))
} else {
info!("room mode — clients join named rooms (SFU)");
None
};
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> = if let Some(remote_addr) =
config.remote_relay
{
info!(%remote_addr, "forward mode → remote relay");
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, remote_addr, "localhost", client_cfg).await?;
Some(Arc::new(wzp_transport::QuinnTransport::new(conn)))
} else {
info!("room mode — clients join named rooms (SFU)");
None
};
// Room manager (room mode only)
let room_mgr = Arc::new(RoomManager::new());
// Event log for protocol analysis
let event_log = wzp_relay::event_log::start_event_log(
config.event_log.as_ref().map(std::path::PathBuf::from)
config.event_log.as_ref().map(std::path::PathBuf::from),
);
// Federation manager
let global_room_set: std::collections::HashSet<String> = config.global_rooms.iter()
.map(|g| g.name.clone())
.collect();
let global_room_set: std::collections::HashSet<String> =
config.global_rooms.iter().map(|g| g.name.clone()).collect();
let federation_mgr = if !config.peers.is_empty() || !config.trusted.is_empty() || !global_room_set.is_empty() {
let fm = Arc::new(wzp_relay::federation::FederationManager::new(
config.peers.clone(),
config.trusted.clone(),
global_room_set.clone(),
room_mgr.clone(),
endpoint.clone(),
tls_fp.clone(),
metrics.clone(),
event_log.clone(),
));
let fm_run = fm.clone();
tokio::spawn(async move { fm_run.run().await });
Some(fm)
} else {
None
};
let federation_mgr =
if !config.peers.is_empty() || !config.trusted.is_empty() || !global_room_set.is_empty() {
let fm = Arc::new(wzp_relay::federation::FederationManager::new(
config.peers.clone(),
config.trusted.clone(),
global_room_set.clone(),
room_mgr.clone(),
endpoint.clone(),
tls_fp.clone(),
metrics.clone(),
event_log.clone(),
));
let fm_run = fm.clone();
tokio::spawn(async move { fm_run.run().await });
Some(fm)
} else {
None
};
// Session manager — enforces max concurrent sessions
let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions)));
@@ -624,14 +656,15 @@ async fn main() -> anyhow::Result<()> {
// active, then read back everything needed to
// cross-wire into the local CallSetup.
let room_name = format!("call-{call_id}");
let (callee_addr_for_setup, callee_local_for_setup, callee_mapped_for_setup) = {
let (
callee_addr_for_setup,
callee_local_for_setup,
callee_mapped_for_setup,
) = {
let mut reg = call_registry_d.lock().await;
reg.set_active(call_id, accept_mode, room_name.clone());
reg.set_peer_relay_fp(call_id, Some(origin_relay_fp.clone()));
reg.set_callee_reflexive_addr(
call_id,
callee_reflexive_addr.clone(),
);
reg.set_callee_reflexive_addr(call_id, callee_reflexive_addr.clone());
reg.set_callee_local_addrs(call_id, callee_local_addrs.clone());
reg.set_callee_mapped_addr(call_id, callee_mapped_addr.clone());
let c = reg.get(call_id);
@@ -762,7 +795,9 @@ async fn main() -> anyhow::Result<()> {
let relay_seed_bytes = relay_seed.0;
let metrics = metrics.clone();
let trunking_enabled = config.trunking_enabled;
let debug_tap = config.debug_tap.as_ref().map(|filter| room::DebugTap { room_filter: filter.clone() });
let debug_tap = config.debug_tap.as_ref().map(|filter| room::DebugTap {
room_filter: filter.clone(),
});
let presence = presence.clone();
let route_resolver = route_resolver.clone();
let federation_mgr = federation_mgr.clone();
@@ -771,7 +806,9 @@ async fn main() -> anyhow::Result<()> {
let advertised_addr_str = advertised_addr_str.clone();
// Phase 8: relay region + peer addresses for RegisterPresenceAck
let relay_region = config.region.clone();
let relay_peers_for_ack: Vec<String> = config.peers.iter()
let relay_peers_for_ack: Vec<String> = config
.peers
.iter()
.filter_map(|p| {
let label = p.label.as_deref().unwrap_or("peer");
Some(format!("{label}|{}", p.url))
@@ -800,9 +837,7 @@ async fn main() -> anyhow::Result<()> {
let room_name = connection
.handshake_data()
.and_then(|hd| {
hd.downcast::<quinn::crypto::rustls::HandshakeData>().ok()
})
.and_then(|hd| hd.downcast::<quinn::crypto::rustls::HandshakeData>().ok())
.and_then(|hd| hd.server_name.clone())
.unwrap_or_else(|| "default".to_string());
@@ -832,17 +867,23 @@ async fn main() -> anyhow::Result<()> {
loop {
match transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::Ping { timestamp_ms })) => {
if let Err(e) = transport.send_signal(
&wzp_proto::SignalMessage::Pong { timestamp_ms },
).await {
if let Err(e) = transport
.send_signal(&wzp_proto::SignalMessage::Pong { timestamp_ms })
.await
{
error!(%addr, "probe pong send error: {e}");
break;
}
}
Ok(Some(wzp_proto::SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => {
Ok(Some(wzp_proto::SignalMessage::PresenceUpdate {
fingerprints,
relay_addr,
})) => {
// A peer relay is telling us which fingerprints it has
let peer_addr: std::net::SocketAddr = relay_addr.parse().unwrap_or(addr);
let fps: std::collections::HashSet<String> = fingerprints.into_iter().collect();
let peer_addr: std::net::SocketAddr =
relay_addr.parse().unwrap_or(addr);
let fps: std::collections::HashSet<String> =
fingerprints.into_iter().collect();
{
let mut reg = presence.lock().await;
reg.update_peer(peer_addr, fps);
@@ -871,9 +912,13 @@ async fn main() -> anyhow::Result<()> {
wzp_relay::route::Route::Local => {
(true, vec![route_resolver.local_addr().to_string()])
}
wzp_relay::route::Route::DirectPeer(peer_addr) => {
(true, vec![route_resolver.local_addr().to_string(), peer_addr.to_string()])
}
wzp_relay::route::Route::DirectPeer(peer_addr) => (
true,
vec![
route_resolver.local_addr().to_string(),
peer_addr.to_string(),
],
),
_ => {
// Not found locally; if ttl > 0 we could forward
// to other peers (future multi-hop). For now, reply not found.
@@ -918,8 +963,12 @@ async fn main() -> anyhow::Result<()> {
let hello_fp = match tokio::time::timeout(
std::time::Duration::from_secs(5),
transport.recv_signal(),
).await {
Ok(Ok(Some(wzp_proto::SignalMessage::FederationHello { tls_fingerprint }))) => tls_fingerprint,
)
.await
{
Ok(Ok(Some(wzp_proto::SignalMessage::FederationHello {
tls_fingerprint,
}))) => tls_fingerprint,
_ => {
warn!(%addr, "federation: no hello received, closing");
return;
@@ -964,7 +1013,10 @@ async fn main() -> anyhow::Result<()> {
}
}
}
_ => { warn!(%addr, "signal: expected AuthToken"); return; }
_ => {
warn!(%addr, "signal: expected AuthToken");
return;
}
}
} else {
None
@@ -974,15 +1026,22 @@ async fn main() -> anyhow::Result<()> {
let (client_fp, client_alias) = match tokio::time::timeout(
std::time::Duration::from_secs(10),
transport.recv_signal(),
).await {
Ok(Ok(Some(SignalMessage::RegisterPresence { identity_pub, signature: _, alias }))) => {
)
.await
{
Ok(Ok(Some(SignalMessage::RegisterPresence {
identity_pub,
signature: _,
alias,
}))) => {
// Compute fingerprint: SHA-256(Ed25519 pub key)[:16], same as Fingerprint type
let fp = {
use sha2::{Sha256, Digest};
use sha2::{Digest, Sha256};
let hash = Sha256::digest(&identity_pub);
let fingerprint = wzp_crypto::Fingerprint([
hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7],
hash[8], hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15],
hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6],
hash[7], hash[8], hash[9], hash[10], hash[11], hash[12], hash[13],
hash[14], hash[15],
]);
fingerprint.to_string()
};
@@ -1006,13 +1065,15 @@ async fn main() -> anyhow::Result<()> {
}
// Send ack
let _ = transport.send_signal(&SignalMessage::RegisterPresenceAck {
success: true,
error: None,
relay_build: Some(BUILD_GIT_HASH.to_string()),
relay_region: relay_region.clone(),
available_relays: relay_peers_for_ack.clone(),
}).await;
let _ = transport
.send_signal(&SignalMessage::RegisterPresenceAck {
success: true,
error: None,
relay_build: Some(BUILD_GIT_HASH.to_string()),
relay_region: relay_region.clone(),
available_relays: relay_peers_for_ack.clone(),
})
.await;
info!(%addr, fingerprint = %client_fp, alias = ?client_alias, "signal client registered");
@@ -1086,10 +1147,12 @@ async fn main() -> anyhow::Result<()> {
if !forwarded {
info!(%addr, target = %target_fp, "call target not online (no federation route)");
let _ = transport.send_signal(&SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
call_id: None,
}).await;
let _ = transport
.send_signal(&SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
call_id: None,
})
.await;
continue;
}
@@ -1128,9 +1191,11 @@ async fn main() -> anyhow::Result<()> {
// Send ringing to caller immediately
// so the UI shows feedback while the
// federated delivery is in flight.
let _ = transport.send_signal(&SignalMessage::CallRinging {
call_id: call_id.clone(),
}).await;
let _ = transport
.send_signal(&SignalMessage::CallRinging {
call_id: call_id.clone(),
})
.await;
continue;
}
@@ -1141,10 +1206,23 @@ async fn main() -> anyhow::Result<()> {
// injected later into the callee's CallSetup.
{
let mut reg = call_registry.lock().await;
reg.create_call(call_id.clone(), client_fp.clone(), target_fp.clone());
reg.set_caller_reflexive_addr(&call_id, caller_addr_for_registry);
reg.set_caller_local_addrs(&call_id, caller_local_for_registry);
reg.set_caller_mapped_addr(&call_id, caller_mapped_for_registry);
reg.create_call(
call_id.clone(),
client_fp.clone(),
target_fp.clone(),
);
reg.set_caller_reflexive_addr(
&call_id,
caller_addr_for_registry,
);
reg.set_caller_local_addrs(
&call_id,
caller_local_for_registry,
);
reg.set_caller_mapped_addr(
&call_id,
caller_mapped_for_registry,
);
}
// Forward offer to callee
@@ -1156,9 +1234,11 @@ async fn main() -> anyhow::Result<()> {
// Send ringing to caller
drop(hub);
let _ = transport.send_signal(&SignalMessage::CallRinging {
call_id: call_id.clone(),
}).await;
let _ = transport
.send_signal(&SignalMessage::CallRinging {
call_id: call_id.clone(),
})
.await;
}
SignalMessage::DirectCallAnswer {
@@ -1186,7 +1266,10 @@ async fn main() -> anyhow::Result<()> {
let reg = call_registry.lock().await;
match reg.get(&call_id) {
Some(c) => (
Some(reg.peer_fingerprint(&call_id, &client_fp).map(|s| s.to_string())),
Some(
reg.peer_fingerprint(&call_id, &client_fp)
.map(|s| s.to_string()),
),
c.peer_relay_fp.clone(),
),
None => (None, None),
@@ -1213,20 +1296,29 @@ async fn main() -> anyhow::Result<()> {
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
};
let forward = SignalMessage::FederatedSignalForward {
inner: Box::new(hangup),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm.send_signal_to_peer(origin_fp, &forward).await {
let forward =
SignalMessage::FederatedSignalForward {
inner: Box::new(hangup),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm
.send_signal_to_peer(origin_fp, &forward)
.await
{
warn!(%call_id, %origin_fp, error = %e, "cross-relay reject forward failed");
}
}
} else {
let hub = signal_hub.lock().await;
let _ = hub.send_to(&peer_fp, &SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
}).await;
let _ = hub
.send_to(
&peer_fp,
&SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
},
)
.await;
}
} else {
// Accept — create private room + stash the
@@ -1236,18 +1328,36 @@ async fn main() -> anyhow::Result<()> {
// BOTH parties' addrs so we can cross-wire
// peer_direct_addr on the CallSetups below.
let room = format!("call-{call_id}");
let (caller_addr, callee_addr, caller_local, callee_local, caller_mapped, callee_mapped) = {
let (
caller_addr,
callee_addr,
caller_local,
callee_local,
caller_mapped,
callee_mapped,
) = {
let mut reg = call_registry.lock().await;
reg.set_active(&call_id, mode, room.clone());
reg.set_callee_reflexive_addr(&call_id, callee_addr_for_registry);
reg.set_callee_local_addrs(&call_id, callee_local_for_registry.clone());
reg.set_callee_mapped_addr(&call_id, callee_mapped_for_registry);
reg.set_callee_reflexive_addr(
&call_id,
callee_addr_for_registry,
);
reg.set_callee_local_addrs(
&call_id,
callee_local_for_registry.clone(),
);
reg.set_callee_mapped_addr(
&call_id,
callee_mapped_for_registry,
);
let call = reg.get(&call_id);
(
call.and_then(|c| c.caller_reflexive_addr.clone()),
call.and_then(|c| c.callee_reflexive_addr.clone()),
call.map(|c| c.caller_local_addrs.clone()).unwrap_or_default(),
call.map(|c| c.callee_local_addrs.clone()).unwrap_or_default(),
call.map(|c| c.caller_local_addrs.clone())
.unwrap_or_default(),
call.map(|c| c.callee_local_addrs.clone())
.unwrap_or_default(),
call.and_then(|c| c.caller_mapped_addr.clone()),
call.and_then(|c| c.callee_mapped_addr.clone()),
)
@@ -1278,11 +1388,15 @@ async fn main() -> anyhow::Result<()> {
// CallSetup (to our callee) with
// peer_direct_addr = caller_addr.
if let Some(ref fm) = federation_mgr {
let forward = SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm.send_signal_to_peer(origin_fp, &forward).await {
let forward =
SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm
.send_signal_to_peer(origin_fp, &forward)
.await
{
warn!(
%call_id,
%origin_fp,
@@ -1301,7 +1415,8 @@ async fn main() -> anyhow::Result<()> {
peer_mapped_addr: caller_mapped.clone(),
};
let hub = signal_hub.lock().await;
let _ = hub.send_to(&client_fp, &setup_for_callee).await;
let _ =
hub.send_to(&client_fp, &setup_for_callee).await;
} else {
// Local call (existing Phase 3 path).
// Forward answer to caller
@@ -1331,7 +1446,8 @@ async fn main() -> anyhow::Result<()> {
};
let hub = signal_hub.lock().await;
let _ = hub.send_to(&peer_fp, &setup_for_caller).await;
let _ = hub.send_to(&client_fp, &setup_for_callee).await;
let _ =
hub.send_to(&client_fp, &setup_for_callee).await;
}
}
}
@@ -1346,21 +1462,31 @@ async fn main() -> anyhow::Result<()> {
if let Some(cid) = call_id {
// Targeted hangup: only the named call
reg.get(cid)
.map(|c| vec![(c.call_id.clone(), if c.caller_fingerprint == client_fp {
c.callee_fingerprint.clone()
} else {
c.caller_fingerprint.clone()
})])
.map(|c| {
vec![(
c.call_id.clone(),
if c.caller_fingerprint == client_fp {
c.callee_fingerprint.clone()
} else {
c.caller_fingerprint.clone()
},
)]
})
.unwrap_or_default()
} else {
// Legacy: end all calls for this user
reg.calls_for_fingerprint(&client_fp)
.iter()
.map(|c| (c.call_id.clone(), if c.caller_fingerprint == client_fp {
c.callee_fingerprint.clone()
} else {
c.caller_fingerprint.clone()
}))
.map(|c| {
(
c.call_id.clone(),
if c.caller_fingerprint == client_fp {
c.callee_fingerprint.clone()
} else {
c.caller_fingerprint.clone()
},
)
})
.collect::<Vec<_>>()
}
};
@@ -1396,11 +1522,15 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref origin_fp) = peer_relay_fp {
// Cross-relay: wrap and forward
if let Some(ref fm) = federation_mgr {
let forward = SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm.send_signal_to_peer(origin_fp, &forward).await {
let forward =
SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm
.send_signal_to_peer(origin_fp, &forward)
.await
{
warn!(
%call_id,
%origin_fp,
@@ -1436,11 +1566,15 @@ async fn main() -> anyhow::Result<()> {
if let Some(fp) = peer_fp {
if let Some(ref origin_fp) = peer_relay_fp {
if let Some(ref fm) = federation_mgr {
let forward = SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm.send_signal_to_peer(origin_fp, &forward).await {
let forward =
SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
if let Err(e) = fm
.send_signal_to_peer(origin_fp, &forward)
.await
{
warn!(
%call_id,
%origin_fp,
@@ -1458,12 +1592,12 @@ async fn main() -> anyhow::Result<()> {
// Hard NAT: forward HardNatProbe + HardNatBirthdayStart
// to call peer (same pattern as CandidateUpdate).
SignalMessage::HardNatBirthdayStart { ref call_id, .. } |
SignalMessage::HardNatProbe { ref call_id, .. } |
SignalMessage::UpgradeProposal { ref call_id, .. } |
SignalMessage::UpgradeResponse { ref call_id, .. } |
SignalMessage::UpgradeConfirm { ref call_id, .. } |
SignalMessage::QualityCapability { ref call_id, .. } => {
SignalMessage::HardNatBirthdayStart { ref call_id, .. }
| SignalMessage::HardNatProbe { ref call_id, .. }
| SignalMessage::UpgradeProposal { ref call_id, .. }
| SignalMessage::UpgradeResponse { ref call_id, .. }
| SignalMessage::UpgradeConfirm { ref call_id, .. }
| SignalMessage::QualityCapability { ref call_id, .. } => {
let (peer_fp, peer_relay_fp) = {
let reg = call_registry.lock().await;
match reg.get(call_id) {
@@ -1479,11 +1613,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(fp) = peer_fp {
if let Some(ref origin_fp) = peer_relay_fp {
if let Some(ref fm) = federation_mgr {
let forward = SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
let _ = fm.send_signal_to_peer(origin_fp, &forward).await;
let forward =
SignalMessage::FederatedSignalForward {
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
let _ = fm
.send_signal_to_peer(origin_fp, &forward)
.await;
}
} else {
let hub = signal_hub.lock().await;
@@ -1493,7 +1630,9 @@ async fn main() -> anyhow::Result<()> {
}
SignalMessage::Ping { timestamp_ms } => {
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
let _ = transport
.send_signal(&SignalMessage::Pong { timestamp_ms })
.await;
}
// QUIC-native NAT reflection ("STUN for QUIC").
@@ -1510,11 +1649,12 @@ async fn main() -> anyhow::Result<()> {
// reaches this match arm.
SignalMessage::Reflect => {
let observed_addr = addr.to_string();
if let Err(e) = transport.send_signal(
&SignalMessage::ReflectResponse {
if let Err(e) = transport
.send_signal(&SignalMessage::ReflectResponse {
observed_addr: observed_addr.clone(),
},
).await {
})
.await
{
warn!(%addr, error = %e, "reflect: failed to send response");
} else {
debug!(%addr, %observed_addr, "reflect: responded");
@@ -1552,19 +1692,29 @@ async fn main() -> anyhow::Result<()> {
let reg = call_registry.lock().await;
reg.calls_for_fingerprint(&client_fp)
.iter()
.map(|c| (c.call_id.clone(), if c.caller_fingerprint == client_fp {
c.callee_fingerprint.clone()
} else {
c.caller_fingerprint.clone()
}))
.map(|c| {
(
c.call_id.clone(),
if c.caller_fingerprint == client_fp {
c.callee_fingerprint.clone()
} else {
c.caller_fingerprint.clone()
},
)
})
.collect::<Vec<_>>()
};
for (call_id, peer_fp) in &active_calls {
let hub = signal_hub.lock().await;
let _ = hub.send_to(peer_fp, &SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
}).await;
let _ = hub
.send_to(
peer_fp,
&SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
},
)
.await;
drop(hub);
let mut reg = call_registry.lock().await;
reg.end_call(call_id);
@@ -1632,22 +1782,20 @@ async fn main() -> anyhow::Result<()> {
// Crypto handshake: verify client identity + negotiate quality profile
let handshake_start = std::time::Instant::now();
let (_crypto_session, _chosen_profile, caller_fp, caller_alias) = match wzp_relay::handshake::accept_handshake(
&*transport,
&relay_seed_bytes,
).await {
Ok(result) => {
let elapsed = handshake_start.elapsed().as_secs_f64();
metrics.handshake_duration.observe(elapsed);
info!(%addr, elapsed_ms = %(elapsed * 1000.0), "crypto handshake complete");
result
}
Err(e) => {
error!(%addr, "handshake failed: {e}");
close_transport(&*transport, "cleanup").await;
return;
}
};
let (_crypto_session, _chosen_profile, caller_fp, caller_alias) =
match wzp_relay::handshake::accept_handshake(&*transport, &relay_seed_bytes).await {
Ok(result) => {
let elapsed = handshake_start.elapsed().as_secs_f64();
metrics.handshake_duration.observe(elapsed);
info!(%addr, elapsed_ms = %(elapsed * 1000.0), "crypto handshake complete");
result
}
Err(e) => {
error!(%addr, "handshake failed: {e}");
close_transport(&*transport, "cleanup").await;
return;
}
};
// Use the caller's identity fingerprint from the handshake
let participant_fp = authenticated_fp.clone().unwrap_or(caller_fp);
@@ -1704,8 +1852,18 @@ async fn main() -> anyhow::Result<()> {
}
});
let up = tokio::spawn(run_upstream(transport.clone(), remote.clone(), up_pipe, stats.clone()));
let dn = tokio::spawn(run_downstream(transport.clone(), remote.clone(), dn_pipe, stats));
let up = tokio::spawn(run_upstream(
transport.clone(),
remote.clone(),
up_pipe,
stats.clone(),
));
let dn = tokio::spawn(run_downstream(
transport.clone(),
remote.clone(),
dn_pipe,
stats,
));
tokio::select! { _ = up => {} _ = dn => {} }
stats_handle.abort();
@@ -1752,7 +1910,11 @@ async fn main() -> anyhow::Result<()> {
// Merge federated participants into RoomUpdate if this is a global room
let merged_update = if let Some(ref fm) = federation_mgr {
if fm.is_global_room(&room_name) {
if let SignalMessage::RoomUpdate { count: _, participants: mut local_parts } = update {
if let SignalMessage::RoomUpdate {
count: _,
participants: mut local_parts,
} = update
{
let remote = fm.get_remote_participants(&room_name).await;
local_parts.extend(remote);
// Deduplicate by fingerprint
@@ -1762,17 +1924,27 @@ async fn main() -> anyhow::Result<()> {
count: local_parts.len() as u32,
participants: local_parts,
}
} else { update }
} else { update }
} else { update };
} else {
update
}
} else {
update
}
} else {
update
};
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_signal(&room_name, &merged_update);
tap.log_event(&room_name, "join", &format!(
"participant={id} addr={addr} alias={}",
caller_alias.as_deref().unwrap_or("?")
));
tap.log_event(
&room_name,
"join",
&format!(
"participant={id} addr={addr} alias={}",
caller_alias.as_deref().unwrap_or("?")
),
);
}
}
room::broadcast_signal(&senders, &merged_update).await;
@@ -1789,10 +1961,8 @@ async fn main() -> anyhow::Result<()> {
}
};
let session_id_str: String = session_id
.iter()
.map(|b| format!("{b:02x}"))
.collect();
let session_id_str: String =
session_id.iter().map(|b| format!("{b:02x}")).collect();
// Set up federation media channel if this is a global room
let (federation_tx, federation_room_hash) = if let Some(ref fm) = federation_mgr {
let is_global = fm.is_global_room(&room_name);
@@ -1823,7 +1993,8 @@ async fn main() -> anyhow::Result<()> {
debug_tap,
federation_tx,
federation_room_hash,
).await;
)
.await;
// Participant disconnected — clean up presence + per-session metrics
if let Some(ref fp) = authenticated_fp {

View File

@@ -4,8 +4,8 @@ use prometheus::{
Encoder, GaugeVec, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Opts, Registry, TextEncoder,
};
use wzp_proto::packet::QualityReport;
use std::sync::Arc;
use wzp_proto::packet::QualityReport;
/// All relay-level Prometheus metrics.
#[derive(Clone)]
@@ -40,21 +40,23 @@ impl RelayMetrics {
pub fn new() -> Self {
let registry = Registry::new();
let active_sessions = IntGauge::with_opts(
Opts::new("wzp_relay_active_sessions", "Current active sessions"),
)
let active_sessions = IntGauge::with_opts(Opts::new(
"wzp_relay_active_sessions",
"Current active sessions",
))
.expect("metric");
let active_rooms = IntGauge::with_opts(
Opts::new("wzp_relay_active_rooms", "Current active rooms"),
)
let active_rooms =
IntGauge::with_opts(Opts::new("wzp_relay_active_rooms", "Current active rooms"))
.expect("metric");
let packets_forwarded = IntCounter::with_opts(Opts::new(
"wzp_relay_packets_forwarded_total",
"Total packets forwarded",
))
.expect("metric");
let packets_forwarded = IntCounter::with_opts(
Opts::new("wzp_relay_packets_forwarded_total", "Total packets forwarded"),
)
.expect("metric");
let bytes_forwarded = IntCounter::with_opts(
Opts::new("wzp_relay_bytes_forwarded_total", "Total bytes forwarded"),
)
let bytes_forwarded = IntCounter::with_opts(Opts::new(
"wzp_relay_bytes_forwarded_total",
"Total bytes forwarded",
))
.expect("metric");
let auth_attempts = IntCounterVec::new(
Opts::new("wzp_relay_auth_attempts_total", "Auth validation attempts"),
@@ -66,31 +68,51 @@ impl RelayMetrics {
"wzp_relay_handshake_duration_seconds",
"Crypto handshake time",
)
.buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]),
.buckets(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5,
]),
)
.expect("metric");
let federation_peer_status = IntGaugeVec::new(
Opts::new("wzp_federation_peer_status", "Peer connection status (0=disconnected, 1=connected)"),
Opts::new(
"wzp_federation_peer_status",
"Peer connection status (0=disconnected, 1=connected)",
),
&["peer"],
).expect("metric");
)
.expect("metric");
let federation_peer_rtt_ms = GaugeVec::new(
Opts::new("wzp_federation_peer_rtt_ms", "QUIC RTT to federated peer in milliseconds"),
Opts::new(
"wzp_federation_peer_rtt_ms",
"QUIC RTT to federated peer in milliseconds",
),
&["peer"],
).expect("metric");
)
.expect("metric");
let federation_packets_forwarded = IntCounterVec::new(
Opts::new("wzp_federation_packets_forwarded_total", "Packets forwarded to/from federated peers"),
Opts::new(
"wzp_federation_packets_forwarded_total",
"Packets forwarded to/from federated peers",
),
&["peer", "direction"],
).expect("metric");
let federation_packets_deduped = IntCounter::with_opts(
Opts::new("wzp_federation_packets_deduped_total", "Duplicate federation packets dropped"),
).expect("metric");
let federation_packets_rate_limited = IntCounter::with_opts(
Opts::new("wzp_federation_packets_rate_limited_total", "Federation packets dropped by rate limiter"),
).expect("metric");
let federation_active_rooms = IntGauge::with_opts(
Opts::new("wzp_federation_active_rooms", "Number of federated rooms currently active"),
).expect("metric");
)
.expect("metric");
let federation_packets_deduped = IntCounter::with_opts(Opts::new(
"wzp_federation_packets_deduped_total",
"Duplicate federation packets dropped",
))
.expect("metric");
let federation_packets_rate_limited = IntCounter::with_opts(Opts::new(
"wzp_federation_packets_rate_limited_total",
"Federation packets dropped by rate limiter",
))
.expect("metric");
let federation_active_rooms = IntGauge::with_opts(Opts::new(
"wzp_federation_active_rooms",
"Number of federated rooms currently active",
))
.expect("metric");
let session_buffer_depth = IntGaugeVec::new(
Opts::new(
@@ -109,10 +131,7 @@ impl RelayMetrics {
)
.expect("metric");
let session_rtt_ms = GaugeVec::new(
Opts::new(
"wzp_relay_session_rtt_ms",
"Round-trip time per session",
),
Opts::new("wzp_relay_session_rtt_ms", "Round-trip time per session"),
&["session_id"],
)
.expect("metric");
@@ -150,25 +169,63 @@ impl RelayMetrics {
)
.expect("metric");
registry.register(Box::new(active_sessions.clone())).expect("register");
registry.register(Box::new(active_rooms.clone())).expect("register");
registry.register(Box::new(packets_forwarded.clone())).expect("register");
registry.register(Box::new(bytes_forwarded.clone())).expect("register");
registry.register(Box::new(auth_attempts.clone())).expect("register");
registry.register(Box::new(handshake_duration.clone())).expect("register");
registry.register(Box::new(federation_peer_status.clone())).expect("register");
registry.register(Box::new(federation_peer_rtt_ms.clone())).expect("register");
registry.register(Box::new(federation_packets_forwarded.clone())).expect("register");
registry.register(Box::new(federation_packets_deduped.clone())).expect("register");
registry.register(Box::new(federation_packets_rate_limited.clone())).expect("register");
registry.register(Box::new(federation_active_rooms.clone())).expect("register");
registry.register(Box::new(session_buffer_depth.clone())).expect("register");
registry.register(Box::new(session_loss_pct.clone())).expect("register");
registry.register(Box::new(session_rtt_ms.clone())).expect("register");
registry.register(Box::new(session_underruns.clone())).expect("register");
registry.register(Box::new(session_overruns.clone())).expect("register");
registry.register(Box::new(session_dred_reconstructions.clone())).expect("register");
registry.register(Box::new(session_classical_plc.clone())).expect("register");
registry
.register(Box::new(active_sessions.clone()))
.expect("register");
registry
.register(Box::new(active_rooms.clone()))
.expect("register");
registry
.register(Box::new(packets_forwarded.clone()))
.expect("register");
registry
.register(Box::new(bytes_forwarded.clone()))
.expect("register");
registry
.register(Box::new(auth_attempts.clone()))
.expect("register");
registry
.register(Box::new(handshake_duration.clone()))
.expect("register");
registry
.register(Box::new(federation_peer_status.clone()))
.expect("register");
registry
.register(Box::new(federation_peer_rtt_ms.clone()))
.expect("register");
registry
.register(Box::new(federation_packets_forwarded.clone()))
.expect("register");
registry
.register(Box::new(federation_packets_deduped.clone()))
.expect("register");
registry
.register(Box::new(federation_packets_rate_limited.clone()))
.expect("register");
registry
.register(Box::new(federation_active_rooms.clone()))
.expect("register");
registry
.register(Box::new(session_buffer_depth.clone()))
.expect("register");
registry
.register(Box::new(session_loss_pct.clone()))
.expect("register");
registry
.register(Box::new(session_rtt_ms.clone()))
.expect("register");
registry
.register(Box::new(session_underruns.clone()))
.expect("register");
registry
.register(Box::new(session_overruns.clone()))
.expect("register");
registry
.register(Box::new(session_dred_reconstructions.clone()))
.expect("register");
registry
.register(Box::new(session_classical_plc.clone()))
.expect("register");
Self {
active_sessions,
@@ -230,10 +287,7 @@ impl RelayMetrics {
.with_label_values(&[session_id])
.inc_by(underruns - cur_underruns as u64);
}
let cur_overruns = self
.session_overruns
.with_label_values(&[session_id])
.get();
let cur_overruns = self.session_overruns.with_label_values(&[session_id]).get();
if overruns > cur_overruns as u64 {
self.session_overruns
.with_label_values(&[session_id])
@@ -284,7 +338,9 @@ impl RelayMetrics {
let _ = self
.session_dred_reconstructions
.remove_label_values(&[session_id]);
let _ = self.session_classical_plc.remove_label_values(&[session_id]);
let _ = self
.session_classical_plc
.remove_label_values(&[session_id]);
}
/// Get a reference to the underlying Prometheus registry.
@@ -298,7 +354,9 @@ impl RelayMetrics {
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).expect("encode");
encoder
.encode(&metric_families, &mut buffer)
.expect("encode");
String::from_utf8(buffer).expect("utf8")
}
}
@@ -310,7 +368,7 @@ pub async fn serve_metrics(
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
route_resolver: Option<Arc<crate::route::RouteResolver>>,
) {
use axum::{extract::Path, routing::get, Router};
use axum::{Router, extract::Path, routing::get};
let metrics_clone = metrics.clone();
let presence_all = presence.clone();
@@ -454,8 +512,8 @@ mod tests {
fn session_quality_update() {
let m = RelayMetrics::new();
let report = QualityReport {
loss_pct: 128, // ~50%
rtt_4ms: 25, // 100ms
loss_pct: 128, // ~50%
rtt_4ms: 25, // 100ms
jitter_ms: 10,
bitrate_cap_kbps: 200,
};

View File

@@ -11,11 +11,11 @@
use tracing::{debug, info};
use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder};
use wzp_proto::QualityProfile;
use wzp_proto::jitter::{JitterBuffer, PlayoutResult};
use wzp_proto::packet::{MediaHeader, MediaPacket};
use wzp_proto::quality::AdaptiveQualityController;
use wzp_proto::traits::{FecDecoder, FecEncoder, QualityController};
use wzp_proto::QualityProfile;
/// Configuration for a relay pipeline instance.
pub struct PipelineConfig {
@@ -51,7 +51,7 @@ pub struct RelayPipeline {
/// Current quality profile.
profile: QualityProfile,
/// Outbound sequence counter.
out_seq: u16,
out_seq: u32,
/// Packets processed count.
stats: PipelineStats,
}
@@ -110,15 +110,15 @@ impl RelayPipeline {
// Feed packet into FEC decoder
let header = &packet.header;
let _ = self.fec_decoder.add_symbol(
header.fec_block,
header.fec_symbol,
header.is_repair,
(header.fec_block & 0xFF) as u8,
(header.fec_block >> 8) as u8,
header.is_repair(),
&packet.payload,
);
// Try to decode the FEC block
let mut output = Vec::new();
if let Ok(Some(frames)) = self.fec_decoder.try_decode(header.fec_block) {
if let Ok(Some(frames)) = self.fec_decoder.try_decode((header.fec_block & 0xFF) as u8) {
debug!(
block = header.fec_block,
frames = frames.len(),
@@ -128,22 +128,21 @@ impl RelayPipeline {
for (i, frame) in frames.into_iter().enumerate() {
let reconstructed = MediaPacket {
header: MediaHeader {
version: 0,
is_repair: false,
version: 2,
flags: 0,
media_type: wzp_proto::MediaType::Audio,
codec_id: header.codec_id,
has_quality_report: false,
fec_ratio_encoded: header.fec_ratio_encoded,
stream_id: 0,
fec_ratio: header.fec_ratio,
// Reconstruct seq from block + symbol index
seq: (header.fec_block as u16)
.wrapping_mul(self.profile.frames_per_block as u16)
.wrapping_add(i as u16),
timestamp: header
.timestamp
.wrapping_add((i as u32) * (header.codec_id.frame_duration_ms() as u32)),
fec_block: header.fec_block,
fec_symbol: i as u8,
reserved: 0,
csrc_count: 0,
seq: (header.fec_block as u32)
.wrapping_mul(self.profile.frames_per_block as u32)
.wrapping_add(i as u32),
timestamp: header.timestamp.wrapping_add(
(i as u32) * (header.codec_id.frame_duration_ms() as u32),
),
fec_block: u16::from((header.fec_block & 0xFF) as u8)
| (u16::from(i as u8) << 8),
},
payload: bytes::Bytes::from(frame),
quality_report: None,
@@ -191,19 +190,16 @@ impl RelayPipeline {
for (sym_idx, repair_data) in repairs {
let repair_packet = MediaPacket {
header: MediaHeader {
version: 0,
is_repair: true,
version: 2,
flags: MediaHeader::FLAG_REPAIR,
media_type: wzp_proto::MediaType::Audio,
codec_id: packet.header.codec_id,
has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(
self.profile.fec_ratio,
),
stream_id: 0,
fec_ratio: MediaHeader::encode_fec_ratio(self.profile.fec_ratio),
seq: self.out_seq,
timestamp: packet.header.timestamp,
fec_block: self.fec_encoder.current_block_id(),
fec_symbol: sym_idx,
reserved: 0,
csrc_count: 0,
fec_block: u16::from(self.fec_encoder.current_block_id())
| (u16::from(sym_idx) << 8),
},
payload: bytes::Bytes::from(repair_data),
quality_report: None,
@@ -232,23 +228,21 @@ impl RelayPipeline {
#[cfg(test)]
mod tests {
use super::*;
use wzp_proto::CodecId;
use bytes::Bytes;
use wzp_proto::CodecId;
fn make_media_packet(seq: u16, block: u8, symbol: u8) -> MediaPacket {
fn make_media_packet(seq: u32, block: u8, symbol: u8) -> MediaPacket {
MediaPacket {
header: MediaHeader {
version: 0,
is_repair: false,
version: 2,
flags: 0,
media_type: wzp_proto::MediaType::Audio,
codec_id: CodecId::Opus24k,
has_quality_report: false,
fec_ratio_encoded: 0,
stream_id: 0,
fec_ratio: 0,
seq,
timestamp: seq as u32 * 20,
fec_block: block,
fec_symbol: symbol,
reserved: 0,
csrc_count: 0,
timestamp: seq * 20,
fec_block: u16::from(block) | (u16::from(symbol) << 8),
},
payload: Bytes::from(vec![seq as u8; 60]),
quality_report: None,
@@ -283,7 +277,7 @@ mod tests {
// Feed 5 packets (one full block)
let mut total_out = 0;
for i in 0..5u16 {
for i in 0..5u32 {
let pkt = make_media_packet(i, 0, i as u8);
let out = pipeline.prepare_outbound(pkt);
total_out += out.len();

View File

@@ -74,13 +74,21 @@ impl PresenceRegistry {
}
/// Register a fingerprint as locally connected (called after auth + handshake).
pub fn register_local(&mut self, fingerprint: &str, alias: Option<String>, room: Option<String>) {
self.local.insert(fingerprint.to_string(), LocalPresence {
fingerprint: fingerprint.to_string(),
alias,
connected_at: Instant::now(),
room,
});
pub fn register_local(
&mut self,
fingerprint: &str,
alias: Option<String>,
room: Option<String>,
) {
self.local.insert(
fingerprint.to_string(),
LocalPresence {
fingerprint: fingerprint.to_string(),
alias,
connected_at: Instant::now(),
room,
},
);
}
/// Unregister a locally connected fingerprint (called on disconnect).
@@ -98,11 +106,14 @@ impl PresenceRegistry {
// Insert new remote entries
for fp in &fingerprints {
self.remote.insert(fp.clone(), RemotePresence {
fingerprint: fp.clone(),
relay_addr: addr,
last_seen: now,
});
self.remote.insert(
fp.clone(),
RemotePresence {
fingerprint: fp.clone(),
relay_addr: addr,
last_seen: now,
},
);
}
// Update the peer record
@@ -156,7 +167,8 @@ impl PresenceRegistry {
self.remote.retain(|_, rp| rp.last_seen > cutoff);
// Expire peer relay records and their fingerprint sets
let stale_peers: Vec<SocketAddr> = self.peers
let stale_peers: Vec<SocketAddr> = self
.peers
.iter()
.filter(|(_, p)| p.last_update <= cutoff)
.map(|(addr, _)| *addr)
@@ -280,13 +292,15 @@ mod tests {
let all = reg.all_known();
assert_eq!(all.len(), 2);
let local_entries: Vec<_> = all.iter()
let local_entries: Vec<_> = all
.iter()
.filter(|(_, loc)| *loc == PresenceLocation::Local)
.collect();
assert_eq!(local_entries.len(), 1);
assert_eq!(local_entries[0].0, "local1");
let remote_entries: Vec<_> = all.iter()
let remote_entries: Vec<_> = all
.iter()
.filter(|(_, loc)| matches!(loc, PresenceLocation::Remote(_)))
.collect();
assert_eq!(remote_entries.len(), 1);

View File

@@ -43,8 +43,7 @@ impl ProbeMetrics {
/// Register probe metrics with the given `target` label value.
pub fn register(target: &str, registry: &Registry) -> Self {
let rtt_ms = Gauge::with_opts(
Opts::new("wzp_probe_rtt_ms", "RTT to peer relay in ms")
.const_label("target", target),
Opts::new("wzp_probe_rtt_ms", "RTT to peer relay in ms").const_label("target", target),
)
.expect("probe metric");
@@ -66,9 +65,15 @@ impl ProbeMetrics {
)
.expect("probe metric");
registry.register(Box::new(rtt_ms.clone())).expect("register");
registry.register(Box::new(loss_pct.clone())).expect("register");
registry.register(Box::new(jitter_ms.clone())).expect("register");
registry
.register(Box::new(rtt_ms.clone()))
.expect("register");
registry
.register(Box::new(loss_pct.clone()))
.expect("register");
registry
.register(Box::new(jitter_ms.clone()))
.expect("register");
registry.register(Box::new(up.clone())).expect("register");
Self {
@@ -168,7 +173,11 @@ impl ProbeRunner {
) -> Self {
let target_str = config.target.to_string();
let metrics = ProbeMetrics::register(&target_str, registry);
Self { config, metrics, presence }
Self {
config,
metrics,
presence,
}
}
/// Run the probe forever. This function never returns under normal operation.
@@ -198,13 +207,8 @@ impl ProbeRunner {
let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = wzp_transport::create_endpoint(bind_addr, None)?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(
&endpoint,
self.config.target,
"_probe",
client_cfg,
)
.await?;
let conn =
wzp_transport::connect(&endpoint, self.config.target, "_probe", client_cfg).await?;
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
self.metrics.up.set(1);
@@ -237,11 +241,15 @@ impl ProbeRunner {
loss_gauge.set(w.loss_pct());
jitter_gauge.set(w.jitter_ms());
}
Ok(Some(SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => {
Ok(Some(SignalMessage::PresenceUpdate {
fingerprints,
relay_addr,
})) => {
if let Some(ref reg) = recv_presence {
// Parse the relay_addr; fall back to the connection target
let addr = relay_addr.parse().unwrap_or(recv_target);
let fps: std::collections::HashSet<String> = fingerprints.into_iter().collect();
let fps: std::collections::HashSet<String> =
fingerprints.into_iter().collect();
let mut r = reg.lock().await;
r.update_peer(addr, fps);
}
@@ -374,10 +382,7 @@ pub fn mesh_summary(registry: &Registry) -> String {
let name = family.get_name();
for metric in family.get_metric() {
// Find the "target" label
let target_label = metric
.get_label()
.iter()
.find(|l| l.get_name() == "target");
let target_label = metric.get_label().iter().find(|l| l.get_name() == "target");
let target = match target_label {
Some(l) => l.get_value().to_string(),
None => continue,
@@ -420,10 +425,7 @@ pub fn mesh_summary(registry: &Registry) -> String {
/// Handle an incoming Ping signal by replying with a Pong carrying the same timestamp.
/// Returns true if the message was a Ping and was handled, false otherwise.
pub async fn handle_ping(
transport: &wzp_transport::QuinnTransport,
msg: &SignalMessage,
) -> bool {
pub async fn handle_ping(transport: &wzp_transport::QuinnTransport, msg: &SignalMessage) -> bool {
if let SignalMessage::Ping { timestamp_ms } = msg {
if let Err(e) = transport
.send_signal(&SignalMessage::Pong {
@@ -456,9 +458,18 @@ mod tests {
encoder.encode(&families, &mut buf).unwrap();
let output = String::from_utf8(buf).unwrap();
assert!(output.contains("wzp_probe_rtt_ms"), "missing wzp_probe_rtt_ms");
assert!(output.contains("wzp_probe_loss_pct"), "missing wzp_probe_loss_pct");
assert!(output.contains("wzp_probe_jitter_ms"), "missing wzp_probe_jitter_ms");
assert!(
output.contains("wzp_probe_rtt_ms"),
"missing wzp_probe_rtt_ms"
);
assert!(
output.contains("wzp_probe_loss_pct"),
"missing wzp_probe_loss_pct"
);
assert!(
output.contains("wzp_probe_jitter_ms"),
"missing wzp_probe_jitter_ms"
);
assert!(output.contains("wzp_probe_up"), "missing wzp_probe_up");
assert!(
output.contains("target=\"127.0.0.1:4433\""),

View File

@@ -40,10 +40,7 @@ impl RelayLink {
/// should skip normal client auth/handshake for relay-SNI connections.
pub async fn connect(target: SocketAddr) -> Result<Self, anyhow::Error> {
// Create a client-only endpoint on an OS-assigned port.
let endpoint = wzp_transport::create_endpoint(
"0.0.0.0:0".parse().unwrap(),
None,
)?;
let endpoint = wzp_transport::create_endpoint("0.0.0.0:0".parse().unwrap(), None)?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, target, "_relay", client_cfg).await?;
@@ -457,17 +454,15 @@ mod tests {
let pkt = MediaPacket {
header: wzp_proto::packet::MediaHeader {
version: 0,
is_repair: false,
version: 2,
flags: 0,
media_type: wzp_proto::MediaType::Audio,
codec_id: wzp_proto::CodecId::Opus16k,
has_quality_report: false,
fec_ratio_encoded: 0,
stream_id: 0,
fec_ratio: 0,
seq: 1,
timestamp: 100,
fec_block: 0,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
},
payload: bytes::Bytes::from_static(b"test"),
quality_report: None,

View File

@@ -4,18 +4,18 @@
//! the relay forwards it to all other participants in the room (SFU model).
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use bytes::Bytes;
use dashmap::DashMap;
use tracing::{error, info, warn};
use wzp_proto::MediaTransport;
use wzp_proto::packet::TrunkFrame;
use wzp_proto::quality::{AdaptiveQualityController, Tier};
use wzp_proto::traits::QualityController;
use wzp_proto::MediaTransport;
use crate::metrics::RelayMetrics;
use crate::trunk::TrunkBatcher;
@@ -32,7 +32,14 @@ impl DebugTap {
self.room_filter == "*" || self.room_filter == room_name
}
pub fn log_packet(&self, room: &str, dir: &str, addr: &std::net::SocketAddr, pkt: &wzp_proto::MediaPacket, fan_out: usize) {
pub fn log_packet(
&self,
room: &str,
dir: &str,
addr: &std::net::SocketAddr,
pkt: &wzp_proto::MediaPacket,
fan_out: usize,
) {
let h = &pkt.header;
info!(
target: "debug_tap",
@@ -43,8 +50,7 @@ impl DebugTap {
codec = ?h.codec_id,
ts = h.timestamp,
fec_block = h.fec_block,
fec_sym = h.fec_symbol,
repair = h.is_repair,
repair = h.is_repair(),
len = pkt.payload.len(),
fan_out,
"TAP"
@@ -53,8 +59,12 @@ impl DebugTap {
pub fn log_signal(&self, room: &str, signal: &wzp_proto::SignalMessage) {
match signal {
wzp_proto::SignalMessage::RoomUpdate { count, participants } => {
let names: Vec<&str> = participants.iter()
wzp_proto::SignalMessage::RoomUpdate {
count,
participants,
} => {
let names: Vec<&str> = participants
.iter()
.map(|p| p.alias.as_deref().unwrap_or("?"))
.collect();
info!(
@@ -66,7 +76,10 @@ impl DebugTap {
"TAP SIGNAL"
);
}
wzp_proto::SignalMessage::QualityDirective { recommended_profile, reason } => {
wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
} => {
info!(
target: "debug_tap",
room = %room,
@@ -119,7 +132,7 @@ pub struct TapStats {
pub out_pkts: u64,
pub seq_gaps: u64,
pub codecs_seen: std::collections::HashSet<wzp_proto::CodecId>,
last_seq: Option<u16>,
last_seq: Option<u32>,
}
impl TapStats {
@@ -225,17 +238,29 @@ impl ParticipantSender {
/// Send raw bytes to this participant.
pub async fn send_raw(&self, data: &[u8]) -> Result<(), String> {
match self {
ParticipantSender::WebSocket(tx) => {
tx.try_send(Bytes::copy_from_slice(data))
.map_err(|e| format!("ws send: {e}"))
}
ParticipantSender::WebSocket(tx) => tx
.try_send(Bytes::copy_from_slice(data))
.map_err(|e| format!("ws send: {e}")),
ParticipantSender::Quic(transport) => {
let pkt = wzp_proto::MediaPacket {
header: wzp_proto::packet::MediaHeader::default_pcm(),
header: wzp_proto::packet::MediaHeader {
version: 2,
flags: 0,
media_type: wzp_proto::MediaType::Audio,
codec_id: wzp_proto::CodecId::Opus24k,
stream_id: 0,
fec_ratio: 0,
seq: 0,
timestamp: 0,
fec_block: 0,
},
payload: Bytes::copy_from_slice(data),
quality_report: None,
};
transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}"))
transport
.send_media(&pkt)
.await
.map_err(|e| format!("quic send: {e}"))
}
}
}
@@ -301,13 +326,23 @@ impl Room {
) -> ParticipantId {
let id = next_id();
info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room");
self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias });
self.participants.push(Participant {
id,
_addr: addr,
sender,
fingerprint,
alias,
});
id
}
fn remove(&mut self, id: ParticipantId) {
self.participants.retain(|p| p.id != id);
info!(room_size = self.participants.len(), participant = id, "left room");
info!(
room_size = self.participants.len(),
participant = id,
"left room"
);
}
fn others(&self, exclude_id: ParticipantId) -> Vec<ParticipantSender> {
@@ -387,7 +422,8 @@ impl RoomManager {
/// Grant a fingerprint access to a room.
pub fn allow(&self, room_name: &str, fingerprint: &str) {
if let Some(ref acl) = self.acl {
acl.lock().unwrap()
acl.lock()
.unwrap()
.entry(room_name.to_string())
.or_default()
.insert(fingerprint.to_string());
@@ -398,7 +434,7 @@ impl RoomManager {
/// Returns true if ACL is disabled (open mode) or the fingerprint is in the allow list.
pub fn is_authorized(&self, room_name: &str, fingerprint: Option<&str>) -> bool {
match (&self.acl, fingerprint) {
(None, _) => true, // no ACL = open
(None, _) => true, // no ACL = open
(Some(_), None) => false, // ACL enabled but no fingerprint
(Some(acl), Some(fp)) => {
let acl = acl.lock().unwrap();
@@ -419,14 +455,29 @@ impl RoomManager {
sender: ParticipantSender,
fingerprint: Option<&str>,
alias: Option<&str>,
) -> Result<(ParticipantId, wzp_proto::SignalMessage, Vec<ParticipantSender>), String> {
) -> Result<
(
ParticipantId,
wzp_proto::SignalMessage,
Vec<ParticipantSender>,
),
String,
> {
if !self.is_authorized(room_name, fingerprint) {
warn!(room = room_name, fingerprint = ?fingerprint, "unauthorized room join attempt");
return Err("not authorized for this room".to_string());
}
let was_empty = self.rooms.get(room_name).map_or(true, |r| r.is_empty());
let mut room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string()));
let mut room = self
.rooms
.entry(room_name.to_string())
.or_insert_with(Room::new);
let id = room.add(
addr,
sender,
fingerprint.map(|s| s.to_string()),
alias.map(|s| s.to_string()),
);
room.qualities.insert(id, ParticipantQuality::new());
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
@@ -435,7 +486,9 @@ impl RoomManager {
let senders = room.all_senders();
drop(room); // release DashMap guard before event_tx send (not async, but good practice)
if was_empty {
let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() });
let _ = self.event_tx.send(RoomEvent::LocalJoin {
room: room_name.to_string(),
});
}
Ok((id, update, senders))
}
@@ -448,7 +501,13 @@ impl RoomManager {
sender: tokio::sync::mpsc::Sender<Bytes>,
fingerprint: Option<&str>,
) -> Result<ParticipantId, String> {
let (id, _update, _senders) = self.join(room_name, addr, ParticipantSender::WebSocket(sender), fingerprint, None)?;
let (id, _update, _senders) = self.join(
room_name,
addr,
ParticipantSender::WebSocket(sender),
fingerprint,
None,
)?;
Ok(id)
}
@@ -458,23 +517,30 @@ impl RoomManager {
}
/// Get participant list for a room (fingerprint + alias).
pub fn local_participant_list(&self, room_name: &str) -> Vec<wzp_proto::packet::RoomParticipant> {
self.rooms.get(room_name)
pub fn local_participant_list(
&self,
room_name: &str,
) -> Vec<wzp_proto::packet::RoomParticipant> {
self.rooms
.get(room_name)
.map(|room| room.participant_list())
.unwrap_or_default()
}
/// Get all senders for participants in a room (for federation inbound media delivery).
pub fn local_senders(&self, room_name: &str) -> Vec<ParticipantSender> {
self.rooms.get(room_name)
.map(|room| room.participants.iter()
.map(|p| p.sender.clone())
.collect())
self.rooms
.get(room_name)
.map(|room| room.participants.iter().map(|p| p.sender.clone()).collect())
.unwrap_or_default()
}
/// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty.
pub fn leave(&self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
pub fn leave(
&self,
room_name: &str,
participant_id: ParticipantId,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
let result = {
if let Some(mut room) = self.rooms.get_mut(room_name) {
room.qualities.remove(&participant_id);
@@ -482,7 +548,9 @@ impl RoomManager {
if room.is_empty() {
drop(room); // release write guard before remove
self.rooms.remove(room_name);
let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() });
let _ = self.event_tx.send(RoomEvent::LocalLeave {
room: room_name.to_string(),
});
info!(room = room_name, "room closed (empty)");
return None;
}
@@ -500,11 +568,7 @@ impl RoomManager {
}
/// Get senders for all OTHER participants in a room.
pub fn others(
&self,
room_name: &str,
participant_id: ParticipantId,
) -> Vec<ParticipantSender> {
pub fn others(&self, room_name: &str, participant_id: ParticipantId) -> Vec<ParticipantSender> {
self.rooms
.get(room_name)
.map(|r| r.others(participant_id))
@@ -523,7 +587,10 @@ impl RoomManager {
/// List all rooms with their sizes.
pub fn list(&self) -> Vec<(String, usize)> {
self.rooms.iter().map(|r| (r.key().clone(), r.len())).collect()
self.rooms
.iter()
.map(|r| (r.key().clone(), r.len()))
.collect()
}
/// Feed a quality report from a participant. If the room-wide weakest
@@ -537,7 +604,8 @@ impl RoomManager {
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
let mut room = self.rooms.get_mut(room_name)?;
let tier_changed = room.qualities
let tier_changed = room
.qualities
.get_mut(&participant_id)
.and_then(|pq| pq.observe(report))
.is_some();
@@ -639,7 +707,9 @@ impl TrunkedForwarder {
}
fn send_frame(&self, frame: &TrunkFrame) -> anyhow::Result<()> {
self.transport.send_trunk(frame).map_err(|e| anyhow::anyhow!(e))
self.transport
.send_trunk(frame)
.map_err(|e| anyhow::anyhow!(e))
}
}
@@ -667,12 +737,25 @@ pub async fn run_participant(
) {
if trunking_enabled {
run_participant_trunked(
room_mgr, room_name, participant_id, transport, metrics, session_id,
room_mgr,
room_name,
participant_id,
transport,
metrics,
session_id,
)
.await;
} else {
run_participant_plain(
room_mgr, room_name, participant_id, transport, metrics, session_id, debug_tap, federation_tx, federation_room_hash,
room_mgr,
room_name,
participant_id,
transport,
metrics,
session_id,
debug_tap,
federation_tx,
federation_room_hash,
)
.await;
}
@@ -822,7 +905,8 @@ async fn run_participant_plain(
let data = pkt.to_bytes();
let _ = fed_tx.try_send(FederationMediaOut {
room_name: room_name.clone(),
room_hash: federation_room_hash.unwrap_or_else(|| crate::federation::room_hash(&room_name)),
room_hash: federation_room_hash
.unwrap_or_else(|| crate::federation::room_hash(&room_name)),
data,
});
}
@@ -874,18 +958,24 @@ async fn run_participant_plain(
if let Some((update, senders)) = room_mgr.leave(&room_name, participant_id) {
if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_event(&room_name, "leave", &format!(
"participant={participant_id} addr={addr} forwarded={packets_forwarded}"
));
tap.log_event(
&room_name,
"leave",
&format!(
"participant={participant_id} addr={addr} forwarded={packets_forwarded}"
),
);
tap.log_signal(&room_name, &update);
}
}
broadcast_signal(&senders, &update).await;
} else if let Some(ref tap) = debug_tap {
if tap.matches(&room_name) {
tap.log_event(&room_name, "leave", &format!(
"participant={participant_id} addr={addr} (room closed)"
));
tap.log_event(
&room_name,
"leave",
&format!("participant={participant_id} addr={addr} (room closed)"),
);
}
}
}
@@ -1146,17 +1236,15 @@ mod tests {
fn make_test_packet(payload: &[u8]) -> wzp_proto::MediaPacket {
wzp_proto::MediaPacket {
header: wzp_proto::packet::MediaHeader {
version: 0,
is_repair: false,
version: 2,
flags: 0,
media_type: wzp_proto::MediaType::Audio,
codec_id: wzp_proto::CodecId::Opus16k,
has_quality_report: false,
fec_ratio_encoded: 0,
stream_id: 0,
fec_ratio: 0,
seq: 1,
timestamp: 100,
fec_block: 0,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
},
payload: Bytes::from(payload.to_vec()),
quality_report: None,
@@ -1266,6 +1354,10 @@ mod tests {
let participants = vec![good, bad];
let weakest = weakest_tier(participants.iter());
assert_ne!(weakest, Tier::Good, "weakest should not be Good when one participant is bad");
assert_ne!(
weakest,
Tier::Good,
"weakest should not be Good when one participant is bad"
);
}
}

View File

@@ -97,14 +97,13 @@ impl RouteResolver {
}
/// Build a JSON-serializable route response for the HTTP API.
pub fn route_json(
&self,
fingerprint: &str,
route: &Route,
) -> serde_json::Value {
pub fn route_json(&self, fingerprint: &str, route: &Route) -> serde_json::Value {
let (route_type, relay_chain) = match route {
Route::Local => ("local", vec![self.local_addr.to_string()]),
Route::DirectPeer(addr) => ("direct_peer", vec![self.local_addr.to_string(), addr.to_string()]),
Route::DirectPeer(addr) => (
"direct_peer",
vec![self.local_addr.to_string(), addr.to_string()],
),
Route::Chain(chain) => {
let mut addrs = vec![self.local_addr.to_string()];
addrs.extend(chain.iter().map(|a| a.to_string()));
@@ -184,7 +183,10 @@ mod tests {
reg.update_peer(peer, fps);
// Local lookup works via multi-hop
assert_eq!(resolver.resolve_multi_hop(&reg, "local_fp", 3), Route::Local);
assert_eq!(
resolver.resolve_multi_hop(&reg, "local_fp", 3),
Route::Local
);
// Remote lookup works via multi-hop
assert_eq!(
resolver.resolve_multi_hop(&reg, "remote_fp", 3),

View File

@@ -143,18 +143,18 @@ impl SessionManager {
fingerprint: Option<String>,
) -> Result<SessionId, String> {
if self.total_count() >= self.max_sessions {
return Err(format!(
"max sessions ({}) exceeded",
self.max_sessions
));
return Err(format!("max sessions ({}) exceeded", self.max_sessions));
}
let id = rand_session_id();
self.tracked.insert(id, SessionInfo {
room_name: room.to_string(),
fingerprint,
connected_at: Instant::now(),
state: SessionState::Active,
});
self.tracked.insert(
id,
SessionInfo {
room_name: room.to_string(),
fingerprint,
connected_at: Instant::now(),
state: SessionState::Active,
},
);
Ok(id)
}
@@ -165,7 +165,10 @@ impl SessionManager {
/// Number of currently tracked (room-mode) sessions.
pub fn active_count(&self) -> usize {
self.tracked.values().filter(|s| s.state == SessionState::Active).count()
self.tracked
.values()
.filter(|s| s.state == SessionState::Active)
.count()
}
/// Return all session IDs that belong to a given room.
@@ -278,7 +281,9 @@ mod tests {
#[test]
fn session_info_returns_correct_data() {
let mut mgr = SessionManager::new(10);
let id = mgr.create_session("room-x", Some("alice-fp".into())).unwrap();
let id = mgr
.create_session("room-x", Some("alice-fp".into()))
.unwrap();
let info = mgr.session_info(id).expect("session should exist");
assert_eq!(info.room_name, "room-x");
@@ -297,6 +302,9 @@ mod tests {
mgr.create_session("room", None).unwrap();
// Both layers should now reject
assert!(mgr.create_session("room", None).is_err());
assert!(mgr.create_pipeline_session([2u8; 16], PipelineConfig::default()).is_none());
assert!(
mgr.create_pipeline_session([2u8; 16], PipelineConfig::default())
.is_none()
);
}
}

View File

@@ -34,12 +34,15 @@ impl SignalHub {
/// Register a new signaling client.
pub fn register(&mut self, fp: String, transport: Arc<QuinnTransport>, alias: Option<String>) {
info!(fingerprint = %fp, alias = ?alias, "signal client registered");
self.clients.insert(fp.clone(), SignalClient {
fingerprint: fp,
alias,
transport,
connected_at: Instant::now(),
});
self.clients.insert(
fp.clone(),
SignalClient {
fingerprint: fp,
alias,
transport,
connected_at: Instant::now(),
},
);
}
/// Unregister a signaling client. Returns the client if found.
@@ -64,10 +67,11 @@ impl SignalHub {
/// Send a signal message to a client by fingerprint.
pub async fn send_to(&self, fp: &str, msg: &SignalMessage) -> Result<(), String> {
match self.clients.get(fp) {
Some(client) => {
client.transport.send_signal(msg).await
.map_err(|e| format!("send to {fp}: {e}"))
}
Some(client) => client
.transport
.send_signal(msg)
.await
.map_err(|e| format!("send to {fp}: {e}")),
None => Err(format!("{fp} not online")),
}
}

View File

@@ -8,17 +8,17 @@ use std::net::SocketAddr;
use std::sync::Arc;
use axum::{
Router,
extract::{
ws::{Message, WebSocket},
Path, State, WebSocketUpgrade,
ws::{Message, WebSocket},
},
response::IntoResponse,
routing::get,
Router,
};
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{Mutex, mpsc};
use tower_http::services::ServeDir;
use tracing::{error, info, warn};
@@ -143,9 +143,15 @@ async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) {
// 4. Join room with WS sender
let addr: SocketAddr = ([0, 0, 0, 0], 0).into();
let participant_id = {
match state.room_mgr.join_ws(&room, addr, tx, fingerprint.as_deref()) {
match state
.room_mgr
.join_ws(&room, addr, tx, fingerprint.as_deref())
{
Ok(id) => {
state.metrics.active_rooms.set(state.room_mgr.list().len() as i64);
state
.metrics
.active_rooms
.set(state.room_mgr.list().len() as i64);
id
}
Err(e) => {
@@ -187,10 +193,7 @@ async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) {
for other in &others {
let _ = other.send_raw(&data).await;
}
state
.metrics
.packets_forwarded
.inc_by(others.len() as u64);
state.metrics.packets_forwarded.inc_by(others.len() as u64);
state
.metrics
.bytes_forwarded
@@ -211,7 +214,10 @@ async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) {
}
state.room_mgr.leave(&room, participant_id);
state.metrics.active_rooms.set(state.room_mgr.list().len() as i64);
state
.metrics
.active_rooms
.set(state.room_mgr.list().len() as i64);
let session_id_str: String = session_id.iter().map(|b| format!("{b:02x}")).collect();
state.metrics.remove_session_metrics(&session_id_str);

View File

@@ -94,9 +94,10 @@ fn relay_a_handle_offer(reg_a: &mut CallRegistry, offer: &SignalMessage) -> Sign
/// reproduced here for the test.
fn relay_b_handle_forwarded_offer(reg_b: &mut CallRegistry, forward: &SignalMessage) {
let (inner, origin_relay_fp) = match forward {
SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => {
(inner.as_ref().clone(), origin_relay_fp.clone())
}
SignalMessage::FederatedSignalForward {
inner,
origin_relay_fp,
} => (inner.as_ref().clone(), origin_relay_fp.clone()),
_ => panic!("not a forward"),
};
// Loop-prevention: drop self-sourced.
@@ -114,11 +115,7 @@ fn relay_b_handle_forwarded_offer(reg_b: &mut CallRegistry, forward: &SignalMess
};
// Simulated: target is local to B (Bob is registered here).
reg_b.create_call(
call_id.clone(),
caller_fingerprint,
target_fingerprint,
);
reg_b.create_call(call_id.clone(), caller_fingerprint, target_fingerprint);
reg_b.set_caller_reflexive_addr(&call_id, caller_reflexive_addr);
reg_b.set_peer_relay_fp(&call_id, Some(origin_relay_fp));
}
@@ -194,9 +191,10 @@ fn relay_a_handle_forwarded_answer(
forward: &SignalMessage,
) -> SignalMessage {
let (inner, origin_relay_fp) = match forward {
SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => {
(inner.as_ref().clone(), origin_relay_fp.clone())
}
SignalMessage::FederatedSignalForward {
inner,
origin_relay_fp,
} => (inner.as_ref().clone(), origin_relay_fp.clone()),
_ => panic!("not a forward"),
};
assert_ne!(origin_relay_fp, RELAY_A_TLS_FP);
@@ -270,12 +268,15 @@ fn cross_relay_answer_crosswires_peer_direct_addrs() {
// Bob answers on Relay B.
let answer = bob_answer("c-xrelay-2");
let (answer_forward, setup_for_bob) =
relay_b_handle_local_answer(&mut reg_b, &answer);
let (answer_forward, setup_for_bob) = relay_b_handle_local_answer(&mut reg_b, &answer);
// Bob's CallSetup carries Alice's addr.
match setup_for_bob {
SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr,
relay_addr,
..
} => {
assert_eq!(peer_direct_addr.as_deref(), Some(ALICE_ADDR));
assert_eq!(relay_addr, RELAY_B_ADDR);
}
@@ -286,7 +287,11 @@ fn cross_relay_answer_crosswires_peer_direct_addrs() {
// her CallSetup.
let setup_for_alice = relay_a_handle_forwarded_answer(&mut reg_a, &answer_forward);
match setup_for_alice {
SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr,
relay_addr,
..
} => {
assert_eq!(peer_direct_addr.as_deref(), Some(BOB_ADDR));
assert_eq!(relay_addr, RELAY_A_ADDR);
}
@@ -313,9 +318,14 @@ fn cross_relay_loop_prevention_drops_self_sourced_forward() {
// The dispatcher in main.rs calls this explicit check before
// doing any work. Reproduce it inline.
let origin = match &forward {
SignalMessage::FederatedSignalForward { origin_relay_fp, .. } => origin_relay_fp.clone(),
SignalMessage::FederatedSignalForward {
origin_relay_fp, ..
} => origin_relay_fp.clone(),
_ => unreachable!(),
};
// Relay B sees origin == its own fp → drop.
assert_eq!(origin, RELAY_B_TLS_FP, "loop-prevention triggers on self-fp");
assert_eq!(
origin, RELAY_B_TLS_FP,
"loop-prevention triggers on self-fp"
);
}

View File

@@ -21,10 +21,10 @@ use bytes::Bytes;
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_relay::config::{PeerConfig, TrustedConfig};
use wzp_relay::event_log::EventLogger;
use wzp_relay::federation::{room_hash, FederationManager};
use wzp_relay::federation::{FederationManager, room_hash};
use wzp_relay::metrics::RelayMetrics;
use wzp_relay::room::RoomManager;
use wzp_transport::{client_config, create_endpoint, server_config, QuinnTransport};
use wzp_transport::{QuinnTransport, client_config, create_endpoint, server_config};
// ───────────────────────────── helpers ──────────────────────────────
@@ -41,8 +41,7 @@ fn create_test_fm_full(
) -> Arc<FederationManager> {
let _ = rustls::crypto::ring::default_provider().install_default();
let (sc, _cert) = server_config();
let ep = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), Some(sc))
.expect("test endpoint");
let ep = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), Some(sc)).expect("test endpoint");
let room_mgr = Arc::new(RoomManager::new());
let metrics = Arc::new(RelayMetrics::new());
let event_log = EventLogger::Noop;
@@ -219,7 +218,10 @@ async fn forward_to_peers_empty_returns_immediately() {
fm.forward_to_peers("room", &hash, &data),
)
.await;
assert!(result.is_ok(), "forward_to_peers should return immediately with no peers");
assert!(
result.is_ok(),
"forward_to_peers should return immediately with no peers"
);
}
// ─────────── 4. forward_to_peers with live QUIC peer links ──────────
@@ -339,20 +341,20 @@ async fn broadcast_signal_sends_to_all_peers() {
.expect("FM should connect to mock peer within 5s");
// The FM sends FederationHello as the first signal. Read it.
let hello = tokio::time::timeout(
Duration::from_secs(2),
peer_transport.recv_signal(),
)
.await
.expect("hello timeout")
.expect("recv ok")
.expect("some message");
let hello = tokio::time::timeout(Duration::from_secs(2), peer_transport.recv_signal())
.await
.expect("hello timeout")
.expect("recv ok")
.expect("some message");
match hello {
SignalMessage::FederationHello { tls_fingerprint } => {
assert_eq!(tls_fingerprint, "test-relay-fp-abc123");
}
other => panic!("expected FederationHello, got: {:?}", std::mem::discriminant(&other)),
other => panic!(
"expected FederationHello, got: {:?}",
std::mem::discriminant(&other)
),
}
// Now the FM's run_federation_link registered the peer in peer_links
@@ -372,20 +374,22 @@ async fn broadcast_signal_sends_to_all_peers() {
assert_eq!(count, 1, "should have broadcast to exactly 1 peer");
// Read the signal on the peer side
let received = tokio::time::timeout(
Duration::from_secs(2),
peer_transport.recv_signal(),
)
.await
.expect("broadcast signal timeout")
.expect("recv ok")
.expect("some message");
let received = tokio::time::timeout(Duration::from_secs(2), peer_transport.recv_signal())
.await
.expect("broadcast signal timeout")
.expect("recv ok")
.expect("some message");
match received {
SignalMessage::FederatedSignalForward { origin_relay_fp, .. } => {
SignalMessage::FederatedSignalForward {
origin_relay_fp, ..
} => {
assert_eq!(origin_relay_fp, "other-relay-fp");
}
other => panic!("expected FederatedSignalForward, got: {:?}", std::mem::discriminant(&other)),
other => panic!(
"expected FederatedSignalForward, got: {:?}",
std::mem::discriminant(&other)
),
}
drop(peer_transport);
@@ -585,14 +589,11 @@ async fn federation_media_egress_forwards_to_peer() {
.expect("FM should connect within 5s");
// Read the FederationHello
let _hello = tokio::time::timeout(
Duration::from_secs(2),
peer_transport.recv_signal(),
)
.await
.expect("hello timeout")
.expect("recv ok")
.expect("some message");
let _hello = tokio::time::timeout(Duration::from_secs(2), peer_transport.recv_signal())
.await
.expect("hello timeout")
.expect("recv ok")
.expect("some message");
// Wait for link setup
tokio::time::sleep(Duration::from_millis(100)).await;

View File

@@ -11,14 +11,18 @@ use wzp_client::perform_handshake;
use wzp_crypto::{KeyExchange, WarzoneKeyExchange};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_relay::handshake::accept_handshake;
use wzp_transport::{client_config, create_endpoint, server_config, QuinnTransport};
use wzp_transport::{QuinnTransport, client_config, create_endpoint, server_config};
/// Establish a QUIC connection and wrap both sides in `QuinnTransport`.
///
/// Returns (client_transport, server_transport, _endpoints) where the endpoint
/// tuple must be kept alive for the duration of the test to avoid premature
/// connection teardown.
async fn connected_pair() -> (Arc<QuinnTransport>, Arc<QuinnTransport>, (quinn::Endpoint, quinn::Endpoint)) {
async fn connected_pair() -> (
Arc<QuinnTransport>,
Arc<QuinnTransport>,
(quinn::Endpoint, quinn::Endpoint),
) {
let _ = rustls::crypto::ring::default_provider().install_default();
let (sc, _cert_der) = server_config();
@@ -31,7 +35,9 @@ async fn connected_pair() -> (Arc<QuinnTransport>, Arc<QuinnTransport>, (quinn::
let server_ep_clone = server_ep.clone();
let accept_fut = tokio::spawn(async move {
let conn = wzp_transport::accept(&server_ep_clone).await.expect("accept");
let conn = wzp_transport::accept(&server_ep_clone)
.await
.expect("accept");
Arc::new(QuinnTransport::new(conn))
});
@@ -59,9 +65,8 @@ async fn handshake_succeeds() {
// Clone Arc so the server transport stays alive in the main task too.
let server_t = Arc::clone(&server_transport);
let callee_handle = tokio::spawn(async move {
accept_handshake(server_t.as_ref(), &callee_seed).await
});
let callee_handle =
tokio::spawn(async move { accept_handshake(server_t.as_ref(), &callee_seed).await });
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
.await
@@ -120,9 +125,8 @@ async fn handshake_verifies_identity() {
);
let server_t = Arc::clone(&server_transport);
let callee_handle = tokio::spawn(async move {
accept_handshake(server_t.as_ref(), &callee_seed).await
});
let callee_handle =
tokio::spawn(async move { accept_handshake(server_t.as_ref(), &callee_seed).await });
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
.await
@@ -179,13 +183,17 @@ async fn auth_then_handshake() {
let token = match auth_msg {
SignalMessage::AuthToken { token } => token,
other => panic!("expected AuthToken, got {:?}", std::mem::discriminant(&other)),
other => panic!(
"expected AuthToken, got {:?}",
std::mem::discriminant(&other)
),
};
// 2. Run the cryptographic handshake
let (session, profile, _caller_fp, _caller_alias) = accept_handshake(server_t.as_ref(), &callee_seed)
.await
.expect("accept_handshake after auth");
let (session, profile, _caller_fp, _caller_alias) =
accept_handshake(server_t.as_ref(), &callee_seed)
.await
.expect("accept_handshake after auth");
(token, session, profile)
});
@@ -203,9 +211,7 @@ async fn auth_then_handshake() {
.await
.expect("perform_handshake after auth");
let (received_token, callee_session, _profile) = callee_handle
.await
.expect("join callee task");
let (received_token, callee_session, _profile) = callee_handle.await.expect("join callee task");
// Verify the auth token was received correctly.
assert_eq!(received_token, "bearer-test-token-12345");
@@ -246,9 +252,8 @@ async fn handshake_rejects_bad_signature() {
// Spawn callee -- it should reject the tampered CallOffer.
let server_t = Arc::clone(&server_transport);
let callee_handle = tokio::spawn(async move {
accept_handshake(server_t.as_ref(), &callee_seed).await
});
let callee_handle =
tokio::spawn(async move { accept_handshake(server_t.as_ref(), &callee_seed).await });
// Manually build a CallOffer with a corrupted signature.
let mut kx = WarzoneKeyExchange::from_identity_seed(&caller_seed);

View File

@@ -151,12 +151,13 @@ fn both_peers_advertise_reflex_addrs_cross_wire_in_setup() {
);
let answer = mk_answer("c1", CallAcceptMode::AcceptTrusted, Some(callee_addr));
let (setup_caller, setup_callee) =
handle_answer_and_build_setups(&mut reg, &answer);
let (setup_caller, setup_callee) = handle_answer_and_build_setups(&mut reg, &answer);
// The CALLER's setup should carry the CALLEE's addr as peer_direct_addr.
match setup_caller {
SignalMessage::CallSetup { peer_direct_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr, ..
} => {
assert_eq!(
peer_direct_addr.as_deref(),
Some(callee_addr),
@@ -168,7 +169,9 @@ fn both_peers_advertise_reflex_addrs_cross_wire_in_setup() {
// The CALLEE's setup should carry the CALLER's addr.
match setup_callee {
SignalMessage::CallSetup { peer_direct_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr, ..
} => {
assert_eq!(
peer_direct_addr.as_deref(),
Some(caller_addr),
@@ -193,12 +196,13 @@ fn privacy_mode_answer_omits_callee_addr_from_setup() {
// AcceptGeneric explicitly passes None for callee_reflexive_addr —
// the whole point is to hide the callee's IP from the caller.
let answer = mk_answer("c2", CallAcceptMode::AcceptGeneric, None);
let (setup_caller, setup_callee) =
handle_answer_and_build_setups(&mut reg, &answer);
let (setup_caller, setup_callee) = handle_answer_and_build_setups(&mut reg, &answer);
// CALLER should see peer_direct_addr = None (privacy preserved).
match setup_caller {
SignalMessage::CallSetup { peer_direct_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr, ..
} => {
assert!(
peer_direct_addr.is_none(),
"privacy mode must not leak callee addr to caller"
@@ -210,7 +214,9 @@ fn privacy_mode_answer_omits_callee_addr_from_setup() {
// CALLEE still gets the caller's addr — only the callee opted for
// privacy, the caller already volunteered its addr in the offer.
match setup_callee {
SignalMessage::CallSetup { peer_direct_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr, ..
} => {
assert_eq!(
peer_direct_addr.as_deref(),
Some(caller_addr),
@@ -242,11 +248,12 @@ fn pre_phase3_caller_leaves_both_setups_relay_only() {
CallAcceptMode::AcceptTrusted,
Some("198.51.100.9:4433"),
);
let (setup_caller, setup_callee) =
handle_answer_and_build_setups(&mut reg, &answer);
let (setup_caller, setup_callee) = handle_answer_and_build_setups(&mut reg, &answer);
match setup_caller {
SignalMessage::CallSetup { peer_direct_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr, ..
} => {
// Phase 3 relay behavior: we always inject whatever
// addrs are in the registry, regardless of who
// advertised. The caller here gets the callee's addr
@@ -258,7 +265,9 @@ fn pre_phase3_caller_leaves_both_setups_relay_only() {
// The callee's setup has no caller addr (pre-Phase-3 offer).
match setup_callee {
SignalMessage::CallSetup { peer_direct_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr, ..
} => {
assert!(
peer_direct_addr.is_none(),
"callee should see no caller addr when offer was pre-Phase-3"
@@ -278,12 +287,15 @@ fn neither_peer_advertises_both_setups_are_relay_only() {
handle_offer(&mut reg, &mk_offer("c4", None));
let answer = mk_answer("c4", CallAcceptMode::AcceptTrusted, None);
let (setup_caller, setup_callee) =
handle_answer_and_build_setups(&mut reg, &answer);
let (setup_caller, setup_callee) = handle_answer_and_build_setups(&mut reg, &answer);
for (label, setup) in [("caller", setup_caller), ("callee", setup_callee)] {
match setup {
SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => {
SignalMessage::CallSetup {
peer_direct_addr,
relay_addr,
..
} => {
assert!(
peer_direct_addr.is_none(),
"{label}'s CallSetup must have no peer_direct_addr"

View File

@@ -24,9 +24,9 @@ use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use wzp_client::reflect::{detect_nat_type, probe_reflect_addr, NatType};
use wzp_client::reflect::{NatType, detect_nat_type, probe_reflect_addr};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::{create_endpoint, server_config, QuinnTransport};
use wzp_transport::{QuinnTransport, create_endpoint, server_config};
/// Minimal mock relay that loops accepting connections, handles
/// RegisterPresence + Reflect, and responds correctly. Mirrors the
@@ -136,10 +136,7 @@ async fn detect_nat_type_two_loopback_relays_probes_work_but_classify_unknown()
let (addr_b, _h_b) = spawn_mock_relay().await;
let detection = detect_nat_type(
vec![
("RelayA".into(), addr_a),
("RelayB".into(), addr_b),
],
vec![("RelayA".into(), addr_a), ("RelayB".into(), addr_b)],
2000,
None,
)
@@ -194,10 +191,7 @@ async fn detect_nat_type_dead_relay_is_unknown() {
let dead_addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let detection = detect_nat_type(
vec![
("Alive".into(), alive_addr),
("Dead".into(), dead_addr),
],
vec![("Alive".into(), alive_addr), ("Dead".into(), dead_addr)],
600, // tight timeout so the dead probe fails fast
None,
)
@@ -207,8 +201,16 @@ async fn detect_nat_type_dead_relay_is_unknown() {
// Find the alive and dead probes by name (order of JoinSet
// completions is not guaranteed).
let alive = detection.probes.iter().find(|p| p.relay_name == "Alive").unwrap();
let dead = detection.probes.iter().find(|p| p.relay_name == "Dead").unwrap();
let alive = detection
.probes
.iter()
.find(|p| p.relay_name == "Alive")
.unwrap();
let dead = detection
.probes
.iter()
.find(|p| p.relay_name == "Dead")
.unwrap();
assert!(
alive.observed_addr.is_some(),

View File

@@ -31,7 +31,7 @@ use std::sync::Arc;
use std::time::Duration;
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::{client_config, create_endpoint, server_config, QuinnTransport};
use wzp_transport::{QuinnTransport, client_config, create_endpoint, server_config};
/// Spawn a minimal mock relay that loops over `recv_signal`,
/// matches on `Reflect`, and responds with `ReflectResponse` using
@@ -94,7 +94,11 @@ async fn spawn_mock_relay_without_reflect(
/// distinct-ports test).
async fn connected_pair_with_port(
_client_port_hint: u16,
) -> (Arc<QuinnTransport>, Arc<QuinnTransport>, (quinn::Endpoint, quinn::Endpoint)) {
) -> (
Arc<QuinnTransport>,
Arc<QuinnTransport>,
(quinn::Endpoint, quinn::Endpoint),
) {
let _ = rustls::crypto::ring::default_provider().install_default();
let (sc, _cert_der) = server_config();
@@ -109,7 +113,9 @@ async fn connected_pair_with_port(
let server_ep_clone = server_ep.clone();
let accept_fut = tokio::spawn(async move {
let conn = wzp_transport::accept(&server_ep_clone).await.expect("accept");
let conn = wzp_transport::accept(&server_ep_clone)
.await
.expect("accept");
Arc::new(QuinnTransport::new(conn))
});
@@ -134,10 +140,7 @@ async fn reflect_happy_path() {
// Grab the client's actual bound port so we can cross-check
// against the reflected response.
let client_port = client_ep
.local_addr()
.expect("client local addr")
.port();
let client_port = client_ep.local_addr().expect("client local addr").port();
assert_ne!(client_port, 0, "client must have a real bound port");
// Start the mock relay's reflect handler.
@@ -162,7 +165,10 @@ async fn reflect_happy_path() {
let observed_addr = match resp {
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
other => panic!("expected ReflectResponse, got {:?}", std::mem::discriminant(&other)),
other => panic!(
"expected ReflectResponse, got {:?}",
std::mem::discriminant(&other)
),
};
let parsed: SocketAddr = observed_addr
@@ -210,19 +216,17 @@ async fn reflect_two_clients_distinct_ports() {
// Client A
let client_ep_a = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep A");
let conn_a =
wzp_transport::connect(&client_ep_a, server_listen, "localhost", client_config())
.await
.expect("connect A");
let conn_a = wzp_transport::connect(&client_ep_a, server_listen, "localhost", client_config())
.await
.expect("connect A");
let client_a = Arc::new(QuinnTransport::new(conn_a));
let port_a = client_ep_a.local_addr().unwrap().port();
// Client B
let client_ep_b = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep B");
let conn_b =
wzp_transport::connect(&client_ep_b, server_listen, "localhost", client_config())
.await
.expect("connect B");
let conn_b = wzp_transport::connect(&client_ep_b, server_listen, "localhost", client_config())
.await
.expect("connect B");
let client_b = Arc::new(QuinnTransport::new(conn_b));
let port_b = client_ep_b.local_addr().unwrap().port();
@@ -252,7 +256,8 @@ async fn reflect_two_clients_distinct_ports() {
}
};
let (addr_a, addr_b) = tokio::join!(reflect_for(client_a.clone()), reflect_for(client_b.clone()));
let (addr_a, addr_b) =
tokio::join!(reflect_for(client_a.clone()), reflect_for(client_b.clone()));
let parsed_a: SocketAddr = addr_a.parse().unwrap();
let parsed_b: SocketAddr = addr_b.parse().unwrap();
@@ -277,12 +282,10 @@ async fn reflect_two_clients_distinct_ports() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reflect_old_relay_times_out() {
let (client_transport, server_transport, _endpoints) =
connected_pair_with_port(0).await;
let (client_transport, server_transport, _endpoints) = connected_pair_with_port(0).await;
// Mock relay that ignores Reflect — simulates a pre-Phase-1 build.
let _relay_handle =
spawn_mock_relay_without_reflect(Arc::clone(&server_transport)).await;
let _relay_handle = spawn_mock_relay_without_reflect(Arc::clone(&server_transport)).await;
client_transport
.send_signal(&SignalMessage::Reflect)