From f935bd69cd630bf10ff1f2b4f5bf096d365aadcd Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 8 Apr 2026 21:55:06 +0400 Subject: [PATCH] fix: rewrite seq/fec for federation-delivered packets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Time-based dedup (2s TTL) replaces fixed-window dedup — consecutive senders with same seq numbers no longer collide - Raw byte forwarding for federation local delivery (no re-serialization) - Jitter buffer resets on large backward seq jumps (>100) - recv_media skips malformed datagrams instead of returning connection-closed - SIGTERM handler for clean QUIC shutdown on wzp-client - JSONL event log infrastructure (--event-log flag) for protocol analysis - FEC disabled on GOOD profile for federation debugging (fec_ratio=0.0) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/cli.rs | 20 +++ crates/wzp-proto/src/codec_id.rs | 4 +- crates/wzp-proto/src/jitter.rs | 34 ++++- crates/wzp-relay/Cargo.toml | 1 + crates/wzp-relay/src/config.rs | 4 + crates/wzp-relay/src/event_log.rs | 201 +++++++++++++++++++++++++++++ crates/wzp-relay/src/federation.rs | 110 +++++++++------- crates/wzp-relay/src/lib.rs | 1 + crates/wzp-relay/src/main.rs | 12 ++ crates/wzp-transport/src/quic.rs | 8 +- 10 files changed, 338 insertions(+), 57 deletions(-) create mode 100644 crates/wzp-relay/src/event_log.rs diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index 7928e9c..4ec253c 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -297,6 +297,26 @@ async fn main() -> anyhow::Result<()> { let transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); + // Register shutdown handler so SIGTERM/SIGINT always closes QUIC cleanly. + // Without this, killed clients leave zombie connections on the relay for ~30s. + { + let shutdown_transport = transport.clone(); + tokio::spawn(async move { + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register SIGTERM handler"); + let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("failed to register SIGINT handler"); + tokio::select! { + _ = sigterm.recv() => { info!("SIGTERM received, closing connection..."); } + _ = sigint.recv() => { info!("SIGINT received, closing connection..."); } + } + // Close the QUIC connection immediately (APPLICATION_CLOSE frame). + // Don't call process::exit — let the main task detect the closed + // connection and perform clean shutdown (e.g., save recordings). + shutdown_transport.connection().close(0u32.into(), b"shutdown"); + }); + } + // Send auth token if provided (relay with --auth-url expects this first) if let Some(ref token) = cli.token { let auth = wzp_proto::SignalMessage::AuthToken { diff --git a/crates/wzp-proto/src/codec_id.rs b/crates/wzp-proto/src/codec_id.rs index d90c3a0..a8692bc 100644 --- a/crates/wzp-proto/src/codec_id.rs +++ b/crates/wzp-proto/src/codec_id.rs @@ -105,10 +105,10 @@ pub struct QualityProfile { } impl QualityProfile { - /// Good conditions: Opus 24kbps, light FEC. + /// Good conditions: Opus 24kbps, FEC disabled for federation debugging. pub const GOOD: Self = Self { codec: CodecId::Opus24k, - fec_ratio: 0.2, + fec_ratio: 0.0, frame_duration_ms: 20, frames_per_block: 5, }; diff --git a/crates/wzp-proto/src/jitter.rs b/crates/wzp-proto/src/jitter.rs index 383b3d5..b63a71a 100644 --- a/crates/wzp-proto/src/jitter.rs +++ b/crates/wzp-proto/src/jitter.rs @@ -273,10 +273,21 @@ impl JitterBuffer { return; } - // Check if packet is too old (already played out) + // Check if packet is too old (already played out). + // A backward jump of >100 seq (~2s at 50fps) indicates a new sender in a + // federation room — reset instead of dropping. if self.stats.packets_played > 0 && seq_before(seq, self.next_playout_seq) { - self.stats.packets_late += 1; - return; + let backward_distance = self.next_playout_seq.wrapping_sub(seq); + tracing::warn!(seq, next = self.next_playout_seq, backward_distance, "jitter: backward seq detected"); + if backward_distance > 100 { + tracing::info!(seq, next = self.next_playout_seq, "jitter: RESET — new sender detected"); + self.buffer.clear(); + self.next_playout_seq = seq; + self.stats.packets_late = 0; + } else { + self.stats.packets_late += 1; + return; + } } // If we haven't started playout yet, adjust next_playout_seq to earliest known @@ -412,10 +423,21 @@ impl JitterBuffer { return; } - // Check if packet is too old (already played out) + // Check if packet is too old (already played out). + // A backward jump of >100 seq (~2s at 50fps) indicates a new sender in a + // federation room — reset instead of dropping. if self.stats.packets_played > 0 && seq_before(seq, self.next_playout_seq) { - self.stats.packets_late += 1; - return; + let backward_distance = self.next_playout_seq.wrapping_sub(seq); + tracing::warn!(seq, next = self.next_playout_seq, backward_distance, "jitter: backward seq detected"); + if backward_distance > 100 { + tracing::info!(seq, next = self.next_playout_seq, "jitter: RESET — new sender detected"); + self.buffer.clear(); + self.next_playout_seq = seq; + self.stats.packets_late = 0; + } else { + self.stats.packets_late += 1; + return; + } } // If we haven't started playout yet, adjust next_playout_seq to earliest known diff --git a/crates/wzp-relay/Cargo.toml b/crates/wzp-relay/Cargo.toml index 6014314..8e7a3e0 100644 --- a/crates/wzp-relay/Cargo.toml +++ b/crates/wzp-relay/Cargo.toml @@ -30,6 +30,7 @@ tower-http = { version = "0.6", features = ["fs"] } futures-util = "0.3" dirs = "6" sha2 = { workspace = true } +chrono = "0.4" [[bin]] name = "wzp-relay" diff --git a/crates/wzp-relay/src/config.rs b/crates/wzp-relay/src/config.rs index 27c959c..a299144 100644 --- a/crates/wzp-relay/src/config.rs +++ b/crates/wzp-relay/src/config.rs @@ -90,6 +90,9 @@ pub struct RelayConfig { /// Debug tap: log packet headers for matching rooms ("*" = all rooms). /// Activated via --debug-tap or debug_tap = "room" in TOML. pub debug_tap: Option, + /// JSONL event log path for protocol analysis (--event-log). + #[serde(skip)] + pub event_log: Option, } impl Default for RelayConfig { @@ -112,6 +115,7 @@ impl Default for RelayConfig { global_rooms: Vec::new(), trusted: Vec::new(), debug_tap: None, + event_log: None, } } } diff --git a/crates/wzp-relay/src/event_log.rs b/crates/wzp-relay/src/event_log.rs new file mode 100644 index 0000000..fb4fcb8 --- /dev/null +++ b/crates/wzp-relay/src/event_log.rs @@ -0,0 +1,201 @@ +//! JSONL event log for protocol analysis. +//! +//! When `--event-log ` is set, every media packet emits a structured +//! event at each decision point (recv, forward, drop, deliver). +//! Use `wzp-analyzer` to correlate events across multiple relays. + +use std::path::PathBuf; +use std::sync::Arc; + +use serde::Serialize; +use tokio::sync::mpsc; +use tracing::{error, info}; + +/// A single protocol event for JSONL output. +#[derive(Debug, Serialize)] +pub struct Event { + /// ISO 8601 timestamp with microseconds. + pub ts: String, + /// Event type. + pub event: &'static str, + /// Room name. + #[serde(skip_serializing_if = "Option::is_none")] + pub room: Option, + /// Source address or peer label. + #[serde(skip_serializing_if = "Option::is_none")] + pub src: Option, + /// Packet sequence number. + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, + /// Codec identifier. + #[serde(skip_serializing_if = "Option::is_none")] + pub codec: Option, + /// FEC block ID. + #[serde(skip_serializing_if = "Option::is_none")] + pub fec_block: Option, + /// FEC symbol index. + #[serde(skip_serializing_if = "Option::is_none")] + pub fec_sym: Option, + /// Is FEC repair packet. + #[serde(skip_serializing_if = "Option::is_none")] + pub repair: Option, + /// Payload length in bytes. + #[serde(skip_serializing_if = "Option::is_none")] + pub len: Option, + /// Number of recipients. + #[serde(skip_serializing_if = "Option::is_none")] + pub to_count: Option, + /// Peer label (for federation events). + #[serde(skip_serializing_if = "Option::is_none")] + pub peer: Option, + /// Drop/error reason. + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, + /// Presence action (active/inactive). + #[serde(skip_serializing_if = "Option::is_none")] + pub action: Option, + /// Participant count (presence events). + #[serde(skip_serializing_if = "Option::is_none")] + pub participants: Option, +} + +impl Event { + fn now() -> String { + chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() + } + + /// Create a minimal event with just type and timestamp. + pub fn new(event: &'static str) -> Self { + Self { + ts: Self::now(), + event, + room: None, + src: None, + seq: None, + codec: None, + fec_block: None, + fec_sym: None, + repair: None, + len: None, + to_count: None, + peer: None, + reason: None, + action: None, + participants: None, + } + } + + /// Set room. + pub fn room(mut self, room: &str) -> Self { self.room = Some(room.to_string()); self } + /// Set source. + pub fn src(mut self, src: &str) -> Self { self.src = Some(src.to_string()); self } + /// Set packet header fields from a MediaPacket. + pub fn packet(mut self, pkt: &wzp_proto::MediaPacket) -> Self { + self.seq = Some(pkt.header.seq); + self.codec = Some(format!("{:?}", pkt.header.codec_id)); + self.fec_block = Some(pkt.header.fec_block); + self.fec_sym = Some(pkt.header.fec_symbol); + self.repair = Some(pkt.header.is_repair); + self.len = Some(pkt.payload.len()); + self + } + /// Set seq only (when full packet not available). + pub fn seq(mut self, seq: u16) -> Self { self.seq = Some(seq); self } + /// Set payload length. + pub fn len(mut self, len: usize) -> Self { self.len = Some(len); self } + /// Set recipient count. + pub fn to_count(mut self, n: usize) -> Self { self.to_count = Some(n); self } + /// Set peer label. + pub fn peer(mut self, peer: &str) -> Self { self.peer = Some(peer.to_string()); self } + /// Set drop reason. + pub fn reason(mut self, reason: &str) -> Self { self.reason = Some(reason.to_string()); self } + /// Set presence action. + pub fn action(mut self, action: &str) -> Self { self.action = Some(action.to_string()); self } + /// Set participant count. + pub fn participants(mut self, n: usize) -> Self { self.participants = Some(n); self } +} + +/// Handle for emitting events. Cheap to clone. +#[derive(Clone)] +pub struct EventLog { + tx: mpsc::UnboundedSender, +} + +impl EventLog { + /// Emit an event (non-blocking, drops if channel is full). + pub fn emit(&self, event: Event) { + let _ = self.tx.send(event); + } +} + +/// No-op event log for when `--event-log` is not set. +/// All methods are no-ops that compile to nothing. +#[derive(Clone)] +pub struct NoopEventLog; + +/// Unified event log handle — either real or no-op. +#[derive(Clone)] +pub enum EventLogger { + Active(EventLog), + Noop, +} + +impl EventLogger { + pub fn emit(&self, event: Event) { + if let EventLogger::Active(log) = self { + log.emit(event); + } + } + + pub fn is_active(&self) -> bool { + matches!(self, EventLogger::Active(_)) + } +} + +/// Start the event log writer. Returns an `EventLogger` handle. +pub fn start_event_log(path: Option) -> EventLogger { + match path { + Some(path) => { + let (tx, rx) = mpsc::unbounded_channel(); + tokio::spawn(writer_task(path, rx)); + info!("event log enabled"); + EventLogger::Active(EventLog { tx }) + } + None => EventLogger::Noop, + } +} + +/// Background task that writes events to a JSONL file. +async fn writer_task(path: PathBuf, mut rx: mpsc::UnboundedReceiver) { + use tokio::io::AsyncWriteExt; + + let file = match tokio::fs::File::create(&path).await { + Ok(f) => f, + Err(e) => { + error!("failed to create event log {}: {e}", path.display()); + return; + } + }; + let mut writer = tokio::io::BufWriter::new(file); + let mut count: u64 = 0; + + while let Some(event) = rx.recv().await { + match serde_json::to_string(&event) { + Ok(json) => { + if writer.write_all(json.as_bytes()).await.is_err() { break; } + if writer.write_all(b"\n").await.is_err() { break; } + count += 1; + // Flush every 100 events + if count % 100 == 0 { + let _ = writer.flush().await; + } + } + Err(e) => { + error!("event log serialize error: {e}"); + } + } + } + + let _ = writer.flush().await; + info!(events = count, "event log closed"); +} diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 6dbaf31..07c1733 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -19,6 +19,7 @@ use wzp_proto::{MediaTransport, SignalMessage}; use wzp_transport::QuinnTransport; use crate::config::{PeerConfig, TrustedConfig}; +use crate::event_log::{Event, EventLogger}; use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager}; /// Compute 8-byte room hash for federation datagram tagging. @@ -34,41 +35,42 @@ fn normalize_fp(fp: &str) -> String { fp.replace(':', "").to_lowercase() } -/// Sliding-window dedup filter for federation datagrams. -/// Tracks recently seen (room_hash, seq) pairs to discard duplicates -/// arriving via multiple federation paths (e.g., A↔B↔C and A↔C). +/// Time-based dedup filter for federation datagrams. +/// Tracks recently seen packets and expires entries older than 2 seconds. +/// This prevents duplicate delivery when the same packet arrives via +/// multiple federation paths, while allowing new senders that happen to +/// reuse the same seq numbers. struct Deduplicator { - /// Ring buffer of recent packet fingerprints (room_hash XOR'd with seq). - seen: HashSet, - /// Ordered list for eviction. - order: std::collections::VecDeque, - capacity: usize, + /// Recently seen packet keys with insertion time. + entries: HashMap, + /// Expiry duration. + ttl: Duration, } impl Deduplicator { - fn new(capacity: usize) -> Self { + fn new(_capacity: usize) -> Self { Self { - seen: HashSet::with_capacity(capacity), - order: std::collections::VecDeque::with_capacity(capacity), - capacity, + entries: HashMap::with_capacity(512), + ttl: Duration::from_secs(2), } } - /// Returns true if this packet is a duplicate (already seen). - /// The source_fp_hash distinguishes packets from different senders - /// that share the same room and seq number. - fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16, source_fp_hash: u64) -> bool { - let key = u64::from_be_bytes(*room_hash) ^ (seq as u64) ^ source_fp_hash; - if self.seen.contains(&key) { - return true; + /// Returns true if this packet is a duplicate (already seen within TTL). + fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16, extra: u64) -> bool { + let key = u64::from_be_bytes(*room_hash) ^ (seq as u64) ^ extra; + let now = Instant::now(); + + // Periodic cleanup (every ~256 packets) + if self.entries.len() > 256 { + self.entries.retain(|_, ts| now.duration_since(*ts) < self.ttl); } - if self.order.len() >= self.capacity { - if let Some(old) = self.order.pop_front() { - self.seen.remove(&old); + + if let Some(ts) = self.entries.get(&key) { + if now.duration_since(*ts) < self.ttl { + return true; // seen recently — duplicate } } - self.seen.insert(key); - self.order.push_back(key); + self.entries.insert(key, now); false } } @@ -143,6 +145,8 @@ pub struct FederationManager { /// Per-room seq counter for federation media delivered to local clients. /// Ensures clients see monotonically increasing seq regardless of federation sender. local_delivery_seq: std::sync::atomic::AtomicU16, + /// JSONL event log for protocol analysis. + event_log: EventLogger, /// Per-room rate limiters for inbound federation media. rate_limiters: Mutex>, } @@ -156,6 +160,7 @@ impl FederationManager { endpoint: quinn::Endpoint, local_tls_fp: String, metrics: Arc, + event_log: EventLogger, ) -> Self { Self { peers, @@ -168,6 +173,7 @@ impl FederationManager { peer_links: Arc::new(Mutex::new(HashMap::new())), dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), local_delivery_seq: std::sync::atomic::AtomicU16::new(0), + event_log, rate_limiters: Mutex::new(HashMap::new()), } } @@ -854,9 +860,19 @@ async fn handle_datagram( let pkt = match wzp_proto::MediaPacket::from_bytes(media_bytes.clone()) { Some(pkt) => pkt, - None => return, + None => { + fm.event_log.emit(Event::new("federation_ingress_malformed").len(data.len())); + return; + } }; + // Event log: federation ingress + let peer_label = { + let links = fm.peer_links.lock().await; + links.get(source_peer_fp).map(|l| l.label.clone()).unwrap_or_default() + }; + fm.event_log.emit(Event::new("federation_ingress").packet(&pkt).peer(&peer_label)); + // Count inbound federation packet + update last_seen fm.metrics.federation_packets_forwarded .with_label_values(&[source_peer_fp, "in"]).inc(); @@ -867,18 +883,20 @@ async fn handle_datagram( } } - // Dedup: drop packets we've already seen (multi-path duplicates) - // Include source peer fingerprint so different senders with same seq aren't confused - let source_fp_hash = { + // Dedup: drop packets we've already seen (multi-path duplicates). + // Key uses a hash of the actual payload bytes — unique per Opus frame, + // so different senders with the same seq/timestamp never collide. + let payload_hash = { let mut h = 0u64; - for (i, b) in source_peer_fp.bytes().enumerate().take(8) { + for (i, &b) in media_bytes.iter().take(16).enumerate() { h ^= (b as u64) << ((i % 8) * 8); } h }; { let mut dedup = fm.dedup.lock().await; - if dedup.is_dup(&rh, pkt.header.seq, source_fp_hash) { + if dedup.is_dup(&rh, pkt.header.seq, payload_hash) { + fm.event_log.emit(Event::new("dedup_drop").seq(pkt.header.seq).peer(&peer_label)); return; } } @@ -898,7 +916,10 @@ async fn handle_datagram( let room_name = match room_name { Some(r) => r, - None => return, // not a known room + None => { + fm.event_log.emit(Event::new("room_not_found").seq(pkt.header.seq).peer(&peer_label)); + return; + } }; // Rate limit per room @@ -907,32 +928,29 @@ async fn handle_datagram( let limiter = limiters.entry(room_name.clone()) .or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS)); if !limiter.allow() { + fm.event_log.emit(Event::new("rate_limit_drop").room(&room_name).seq(pkt.header.seq)); return; } } - // Deliver to all local participants with rewritten seq/fec - // so the client sees a monotonic stream regardless of which federation sender + // Deliver to all local participants — forward the raw bytes as-is. + // The original sender's MediaPacket is preserved exactly (no re-serialization). let locals = { let mgr = fm.room_mgr.lock().await; mgr.local_senders(&room_name) }; - if !locals.is_empty() { - let new_seq = fm.local_delivery_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let mut local_pkt = pkt.clone(); - local_pkt.header.seq = new_seq; - // Rewrite FEC block/symbol to match new seq so decoder doesn't see stale blocks - let frames_per_block = 5u16; // matches default FEC config - local_pkt.header.fec_block = (new_seq / frames_per_block) as u8; - local_pkt.header.fec_symbol = (new_seq % frames_per_block) as u8; - local_pkt.header.is_repair = false; // federation packets are source-only for local delivery - for sender in &locals { - match sender { - room::ParticipantSender::Quic(t) => { let _ = t.send_media(&local_pkt).await; } - room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&local_pkt.payload).await; } + for sender in &locals { + match sender { + room::ParticipantSender::Quic(t) => { + if let Err(e) = t.send_raw_datagram(&media_bytes) { + fm.event_log.emit(Event::new("local_deliver_error").room(&room_name).seq(pkt.header.seq).reason(&e.to_string())); + warn!("federation local delivery error: {e}"); + } } + room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; } } } + fm.event_log.emit(Event::new("local_deliver").room(&room_name).seq(pkt.header.seq).to_count(locals.len())); // Multi-hop: forward to ALL other connected peers (not the source) // Don't filter by active_rooms — the receiving peer decides whether to deliver diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index 48e7688..b4ebc54 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -9,6 +9,7 @@ pub mod auth; pub mod config; +pub mod event_log; pub mod federation; pub mod handshake; pub mod metrics; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index eaff258..b0ba35a 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -135,6 +135,12 @@ fn parse_args() -> CliResult { args.get(i).expect("--debug-tap requires a room name (or '*' for all)").to_string(), ); } + "--event-log" => { + i += 1; + config.event_log = Some( + args.get(i).expect("--event-log requires a file path").to_string(), + ); + } "--version" | "-V" => { println!("wzp-relay {}", env!("WZP_BUILD_HASH")); std::process::exit(0); @@ -387,6 +393,11 @@ async fn main() -> anyhow::Result<()> { // Room manager (room mode only) let room_mgr = Arc::new(Mutex::new(RoomManager::new())); + // Event log for protocol analysis + let event_log = wzp_relay::event_log::start_event_log( + config.event_log.as_ref().map(std::path::PathBuf::from) + ); + // Federation manager let global_room_set: std::collections::HashSet = config.global_rooms.iter() .map(|g| g.name.clone()) @@ -401,6 +412,7 @@ async fn main() -> anyhow::Result<()> { endpoint.clone(), tls_fp.clone(), metrics.clone(), + event_log.clone(), )); let fm_run = fm.clone(); tokio::spawn(async move { fm_run.run().await }); diff --git a/crates/wzp-transport/src/quic.rs b/crates/wzp-transport/src/quic.rs index 580d118..caf7a02 100644 --- a/crates/wzp-transport/src/quic.rs +++ b/crates/wzp-transport/src/quic.rs @@ -143,7 +143,7 @@ impl MediaTransport for QuinnTransport { } }; - match datagram::deserialize_media(data) { + match datagram::deserialize_media(data.clone()) { Some(packet) => { // Record receive observation { @@ -156,8 +156,10 @@ impl MediaTransport for QuinnTransport { Ok(Some(packet)) } None => { - tracing::warn!("received malformed media datagram"); - Ok(None) + tracing::warn!(len = data.len(), "skipping malformed media datagram, continuing"); + // Don't return Ok(None) — that signals connection closed. + // Recurse to read the next datagram instead. + Box::pin(self.recv_media()).await } } }