From 1c684f6b4742473d27edf98c867e3ae2ee163427 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 8 Apr 2026 15:48:55 +0400 Subject: [PATCH] fix: rewrite seq/fec for federation-delivered packets Federation media from different senders had conflicting seq numbers, FEC block IDs, and Opus decoder state. The relay now assigns fresh monotonic seq/fec_block/fec_symbol to all federation-delivered packets, ensuring clients see a clean continuous stream regardless of sender changes. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-relay/src/federation.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 1c83727..6dbaf31 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -140,6 +140,9 @@ pub struct FederationManager { peer_links: Arc>>, /// Dedup filter for incoming federation datagrams. dedup: Mutex, + /// Per-room seq counter for federation media delivered to local clients. + /// Ensures clients see monotonically increasing seq regardless of federation sender. + local_delivery_seq: std::sync::atomic::AtomicU16, /// Per-room rate limiters for inbound federation media. rate_limiters: Mutex>, } @@ -164,6 +167,7 @@ impl FederationManager { metrics, peer_links: Arc::new(Mutex::new(HashMap::new())), dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), + local_delivery_seq: std::sync::atomic::AtomicU16::new(0), rate_limiters: Mutex::new(HashMap::new()), } } @@ -907,15 +911,26 @@ async fn handle_datagram( } } - // Deliver to all local participants + // Deliver to all local participants with rewritten seq/fec + // so the client sees a monotonic stream regardless of which federation sender let locals = { let mgr = fm.room_mgr.lock().await; mgr.local_senders(&room_name) }; - for sender in &locals { - match sender { - room::ParticipantSender::Quic(t) => { let _ = t.send_media(&pkt).await; } - room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; } + if !locals.is_empty() { + let new_seq = fm.local_delivery_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let mut local_pkt = pkt.clone(); + local_pkt.header.seq = new_seq; + // Rewrite FEC block/symbol to match new seq so decoder doesn't see stale blocks + let frames_per_block = 5u16; // matches default FEC config + local_pkt.header.fec_block = (new_seq / frames_per_block) as u8; + local_pkt.header.fec_symbol = (new_seq % frames_per_block) as u8; + local_pkt.header.is_repair = false; // federation packets are source-only for local delivery + for sender in &locals { + match sender { + room::ParticipantSender::Quic(t) => { let _ = t.send_media(&local_pkt).await; } + room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&local_pkt.payload).await; } + } } }