fix: rewrite seq/fec for federation-delivered packets
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 2m48s
Mirror to GitHub / mirror (push) Failing after 4m2s

- Time-based dedup (2s TTL) replaces fixed-window dedup — consecutive
  senders with same seq numbers no longer collide
- Raw byte forwarding for federation local delivery (no re-serialization)
- Jitter buffer resets on large backward seq jumps (>100)
- recv_media skips malformed datagrams instead of returning connection-closed
- SIGTERM handler for clean QUIC shutdown on wzp-client
- JSONL event log infrastructure (--event-log flag) for protocol analysis
- FEC disabled on GOOD profile for federation debugging (fec_ratio=0.0)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 21:55:06 +04:00
parent 1c684f6b47
commit f935bd69cd
10 changed files with 338 additions and 57 deletions

View File

@@ -90,6 +90,9 @@ pub struct RelayConfig {
/// Debug tap: log packet headers for matching rooms ("*" = all rooms).
/// Activated via --debug-tap <room> or debug_tap = "room" in TOML.
pub debug_tap: Option<String>,
/// JSONL event log path for protocol analysis (--event-log).
#[serde(skip)]
pub event_log: Option<String>,
}
impl Default for RelayConfig {
@@ -112,6 +115,7 @@ impl Default for RelayConfig {
global_rooms: Vec::new(),
trusted: Vec::new(),
debug_tap: None,
event_log: None,
}
}
}

View File

@@ -0,0 +1,201 @@
//! JSONL event log for protocol analysis.
//!
//! When `--event-log <path>` is set, every media packet emits a structured
//! event at each decision point (recv, forward, drop, deliver).
//! Use `wzp-analyzer` to correlate events across multiple relays.
use std::path::PathBuf;
use std::sync::Arc;
use serde::Serialize;
use tokio::sync::mpsc;
use tracing::{error, info};
/// A single protocol event for JSONL output.
#[derive(Debug, Serialize)]
pub struct Event {
/// ISO 8601 timestamp with microseconds.
pub ts: String,
/// Event type.
pub event: &'static str,
/// Room name.
#[serde(skip_serializing_if = "Option::is_none")]
pub room: Option<String>,
/// Source address or peer label.
#[serde(skip_serializing_if = "Option::is_none")]
pub src: Option<String>,
/// Packet sequence number.
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<u16>,
/// Codec identifier.
#[serde(skip_serializing_if = "Option::is_none")]
pub codec: Option<String>,
/// FEC block ID.
#[serde(skip_serializing_if = "Option::is_none")]
pub fec_block: Option<u8>,
/// FEC symbol index.
#[serde(skip_serializing_if = "Option::is_none")]
pub fec_sym: Option<u8>,
/// Is FEC repair packet.
#[serde(skip_serializing_if = "Option::is_none")]
pub repair: Option<bool>,
/// Payload length in bytes.
#[serde(skip_serializing_if = "Option::is_none")]
pub len: Option<usize>,
/// Number of recipients.
#[serde(skip_serializing_if = "Option::is_none")]
pub to_count: Option<usize>,
/// Peer label (for federation events).
#[serde(skip_serializing_if = "Option::is_none")]
pub peer: Option<String>,
/// Drop/error reason.
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
/// Presence action (active/inactive).
#[serde(skip_serializing_if = "Option::is_none")]
pub action: Option<String>,
/// Participant count (presence events).
#[serde(skip_serializing_if = "Option::is_none")]
pub participants: Option<usize>,
}
impl Event {
fn now() -> String {
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string()
}
/// Create a minimal event with just type and timestamp.
pub fn new(event: &'static str) -> Self {
Self {
ts: Self::now(),
event,
room: None,
src: None,
seq: None,
codec: None,
fec_block: None,
fec_sym: None,
repair: None,
len: None,
to_count: None,
peer: None,
reason: None,
action: None,
participants: None,
}
}
/// Set room.
pub fn room(mut self, room: &str) -> Self { self.room = Some(room.to_string()); self }
/// Set source.
pub fn src(mut self, src: &str) -> Self { self.src = Some(src.to_string()); self }
/// Set packet header fields from a MediaPacket.
pub fn packet(mut self, pkt: &wzp_proto::MediaPacket) -> Self {
self.seq = Some(pkt.header.seq);
self.codec = Some(format!("{:?}", pkt.header.codec_id));
self.fec_block = Some(pkt.header.fec_block);
self.fec_sym = Some(pkt.header.fec_symbol);
self.repair = Some(pkt.header.is_repair);
self.len = Some(pkt.payload.len());
self
}
/// Set seq only (when full packet not available).
pub fn seq(mut self, seq: u16) -> Self { self.seq = Some(seq); self }
/// Set payload length.
pub fn len(mut self, len: usize) -> Self { self.len = Some(len); self }
/// Set recipient count.
pub fn to_count(mut self, n: usize) -> Self { self.to_count = Some(n); self }
/// Set peer label.
pub fn peer(mut self, peer: &str) -> Self { self.peer = Some(peer.to_string()); self }
/// Set drop reason.
pub fn reason(mut self, reason: &str) -> Self { self.reason = Some(reason.to_string()); self }
/// Set presence action.
pub fn action(mut self, action: &str) -> Self { self.action = Some(action.to_string()); self }
/// Set participant count.
pub fn participants(mut self, n: usize) -> Self { self.participants = Some(n); self }
}
/// Handle for emitting events. Cheap to clone.
#[derive(Clone)]
pub struct EventLog {
tx: mpsc::UnboundedSender<Event>,
}
impl EventLog {
/// Emit an event (non-blocking, drops if channel is full).
pub fn emit(&self, event: Event) {
let _ = self.tx.send(event);
}
}
/// No-op event log for when `--event-log` is not set.
/// All methods are no-ops that compile to nothing.
#[derive(Clone)]
pub struct NoopEventLog;
/// Unified event log handle — either real or no-op.
#[derive(Clone)]
pub enum EventLogger {
Active(EventLog),
Noop,
}
impl EventLogger {
pub fn emit(&self, event: Event) {
if let EventLogger::Active(log) = self {
log.emit(event);
}
}
pub fn is_active(&self) -> bool {
matches!(self, EventLogger::Active(_))
}
}
/// Start the event log writer. Returns an `EventLogger` handle.
pub fn start_event_log(path: Option<PathBuf>) -> EventLogger {
match path {
Some(path) => {
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(writer_task(path, rx));
info!("event log enabled");
EventLogger::Active(EventLog { tx })
}
None => EventLogger::Noop,
}
}
/// Background task that writes events to a JSONL file.
async fn writer_task(path: PathBuf, mut rx: mpsc::UnboundedReceiver<Event>) {
use tokio::io::AsyncWriteExt;
let file = match tokio::fs::File::create(&path).await {
Ok(f) => f,
Err(e) => {
error!("failed to create event log {}: {e}", path.display());
return;
}
};
let mut writer = tokio::io::BufWriter::new(file);
let mut count: u64 = 0;
while let Some(event) = rx.recv().await {
match serde_json::to_string(&event) {
Ok(json) => {
if writer.write_all(json.as_bytes()).await.is_err() { break; }
if writer.write_all(b"\n").await.is_err() { break; }
count += 1;
// Flush every 100 events
if count % 100 == 0 {
let _ = writer.flush().await;
}
}
Err(e) => {
error!("event log serialize error: {e}");
}
}
}
let _ = writer.flush().await;
info!(events = count, "event log closed");
}

View File

@@ -19,6 +19,7 @@ use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::QuinnTransport;
use crate::config::{PeerConfig, TrustedConfig};
use crate::event_log::{Event, EventLogger};
use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager};
/// Compute 8-byte room hash for federation datagram tagging.
@@ -34,41 +35,42 @@ fn normalize_fp(fp: &str) -> String {
fp.replace(':', "").to_lowercase()
}
/// Sliding-window dedup filter for federation datagrams.
/// Tracks recently seen (room_hash, seq) pairs to discard duplicates
/// arriving via multiple federation paths (e.g., A↔B↔C and A↔C).
/// Time-based dedup filter for federation datagrams.
/// Tracks recently seen packets and expires entries older than 2 seconds.
/// This prevents duplicate delivery when the same packet arrives via
/// multiple federation paths, while allowing new senders that happen to
/// reuse the same seq numbers.
struct Deduplicator {
/// Ring buffer of recent packet fingerprints (room_hash XOR'd with seq).
seen: HashSet<u64>,
/// Ordered list for eviction.
order: std::collections::VecDeque<u64>,
capacity: usize,
/// Recently seen packet keys with insertion time.
entries: HashMap<u64, Instant>,
/// Expiry duration.
ttl: Duration,
}
impl Deduplicator {
fn new(capacity: usize) -> Self {
fn new(_capacity: usize) -> Self {
Self {
seen: HashSet::with_capacity(capacity),
order: std::collections::VecDeque::with_capacity(capacity),
capacity,
entries: HashMap::with_capacity(512),
ttl: Duration::from_secs(2),
}
}
/// Returns true if this packet is a duplicate (already seen).
/// The source_fp_hash distinguishes packets from different senders
/// that share the same room and seq number.
fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16, source_fp_hash: u64) -> bool {
let key = u64::from_be_bytes(*room_hash) ^ (seq as u64) ^ source_fp_hash;
if self.seen.contains(&key) {
return true;
/// Returns true if this packet is a duplicate (already seen within TTL).
fn is_dup(&mut self, room_hash: &[u8; 8], seq: u16, extra: u64) -> bool {
let key = u64::from_be_bytes(*room_hash) ^ (seq as u64) ^ extra;
let now = Instant::now();
// Periodic cleanup (every ~256 packets)
if self.entries.len() > 256 {
self.entries.retain(|_, ts| now.duration_since(*ts) < self.ttl);
}
if self.order.len() >= self.capacity {
if let Some(old) = self.order.pop_front() {
self.seen.remove(&old);
if let Some(ts) = self.entries.get(&key) {
if now.duration_since(*ts) < self.ttl {
return true; // seen recently — duplicate
}
}
self.seen.insert(key);
self.order.push_back(key);
self.entries.insert(key, now);
false
}
}
@@ -143,6 +145,8 @@ pub struct FederationManager {
/// Per-room seq counter for federation media delivered to local clients.
/// Ensures clients see monotonically increasing seq regardless of federation sender.
local_delivery_seq: std::sync::atomic::AtomicU16,
/// JSONL event log for protocol analysis.
event_log: EventLogger,
/// Per-room rate limiters for inbound federation media.
rate_limiters: Mutex<HashMap<String, RateLimiter>>,
}
@@ -156,6 +160,7 @@ impl FederationManager {
endpoint: quinn::Endpoint,
local_tls_fp: String,
metrics: Arc<crate::metrics::RelayMetrics>,
event_log: EventLogger,
) -> Self {
Self {
peers,
@@ -168,6 +173,7 @@ impl FederationManager {
peer_links: Arc::new(Mutex::new(HashMap::new())),
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
local_delivery_seq: std::sync::atomic::AtomicU16::new(0),
event_log,
rate_limiters: Mutex::new(HashMap::new()),
}
}
@@ -854,9 +860,19 @@ async fn handle_datagram(
let pkt = match wzp_proto::MediaPacket::from_bytes(media_bytes.clone()) {
Some(pkt) => pkt,
None => return,
None => {
fm.event_log.emit(Event::new("federation_ingress_malformed").len(data.len()));
return;
}
};
// Event log: federation ingress
let peer_label = {
let links = fm.peer_links.lock().await;
links.get(source_peer_fp).map(|l| l.label.clone()).unwrap_or_default()
};
fm.event_log.emit(Event::new("federation_ingress").packet(&pkt).peer(&peer_label));
// Count inbound federation packet + update last_seen
fm.metrics.federation_packets_forwarded
.with_label_values(&[source_peer_fp, "in"]).inc();
@@ -867,18 +883,20 @@ async fn handle_datagram(
}
}
// Dedup: drop packets we've already seen (multi-path duplicates)
// Include source peer fingerprint so different senders with same seq aren't confused
let source_fp_hash = {
// Dedup: drop packets we've already seen (multi-path duplicates).
// Key uses a hash of the actual payload bytes — unique per Opus frame,
// so different senders with the same seq/timestamp never collide.
let payload_hash = {
let mut h = 0u64;
for (i, b) in source_peer_fp.bytes().enumerate().take(8) {
for (i, &b) in media_bytes.iter().take(16).enumerate() {
h ^= (b as u64) << ((i % 8) * 8);
}
h
};
{
let mut dedup = fm.dedup.lock().await;
if dedup.is_dup(&rh, pkt.header.seq, source_fp_hash) {
if dedup.is_dup(&rh, pkt.header.seq, payload_hash) {
fm.event_log.emit(Event::new("dedup_drop").seq(pkt.header.seq).peer(&peer_label));
return;
}
}
@@ -898,7 +916,10 @@ async fn handle_datagram(
let room_name = match room_name {
Some(r) => r,
None => return, // not a known room
None => {
fm.event_log.emit(Event::new("room_not_found").seq(pkt.header.seq).peer(&peer_label));
return;
}
};
// Rate limit per room
@@ -907,32 +928,29 @@ async fn handle_datagram(
let limiter = limiters.entry(room_name.clone())
.or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS));
if !limiter.allow() {
fm.event_log.emit(Event::new("rate_limit_drop").room(&room_name).seq(pkt.header.seq));
return;
}
}
// Deliver to all local participants with rewritten seq/fec
// so the client sees a monotonic stream regardless of which federation sender
// Deliver to all local participants — forward the raw bytes as-is.
// The original sender's MediaPacket is preserved exactly (no re-serialization).
let locals = {
let mgr = fm.room_mgr.lock().await;
mgr.local_senders(&room_name)
};
if !locals.is_empty() {
let new_seq = fm.local_delivery_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut local_pkt = pkt.clone();
local_pkt.header.seq = new_seq;
// Rewrite FEC block/symbol to match new seq so decoder doesn't see stale blocks
let frames_per_block = 5u16; // matches default FEC config
local_pkt.header.fec_block = (new_seq / frames_per_block) as u8;
local_pkt.header.fec_symbol = (new_seq % frames_per_block) as u8;
local_pkt.header.is_repair = false; // federation packets are source-only for local delivery
for sender in &locals {
match sender {
room::ParticipantSender::Quic(t) => { let _ = t.send_media(&local_pkt).await; }
room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&local_pkt.payload).await; }
for sender in &locals {
match sender {
room::ParticipantSender::Quic(t) => {
if let Err(e) = t.send_raw_datagram(&media_bytes) {
fm.event_log.emit(Event::new("local_deliver_error").room(&room_name).seq(pkt.header.seq).reason(&e.to_string()));
warn!("federation local delivery error: {e}");
}
}
room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; }
}
}
fm.event_log.emit(Event::new("local_deliver").room(&room_name).seq(pkt.header.seq).to_count(locals.len()));
// Multi-hop: forward to ALL other connected peers (not the source)
// Don't filter by active_rooms — the receiving peer decides whether to deliver

View File

@@ -9,6 +9,7 @@
pub mod auth;
pub mod config;
pub mod event_log;
pub mod federation;
pub mod handshake;
pub mod metrics;

View File

@@ -135,6 +135,12 @@ fn parse_args() -> CliResult {
args.get(i).expect("--debug-tap requires a room name (or '*' for all)").to_string(),
);
}
"--event-log" => {
i += 1;
config.event_log = Some(
args.get(i).expect("--event-log requires a file path").to_string(),
);
}
"--version" | "-V" => {
println!("wzp-relay {}", env!("WZP_BUILD_HASH"));
std::process::exit(0);
@@ -387,6 +393,11 @@ async fn main() -> anyhow::Result<()> {
// Room manager (room mode only)
let room_mgr = Arc::new(Mutex::new(RoomManager::new()));
// Event log for protocol analysis
let event_log = wzp_relay::event_log::start_event_log(
config.event_log.as_ref().map(std::path::PathBuf::from)
);
// Federation manager
let global_room_set: std::collections::HashSet<String> = config.global_rooms.iter()
.map(|g| g.name.clone())
@@ -401,6 +412,7 @@ async fn main() -> anyhow::Result<()> {
endpoint.clone(),
tls_fp.clone(),
metrics.clone(),
event_log.clone(),
));
let fm_run = fm.clone();
tokio::spawn(async move { fm_run.run().await });