diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 1c83727..6dbaf31 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -140,6 +140,9 @@ pub struct FederationManager { peer_links: Arc>>, /// Dedup filter for incoming federation datagrams. dedup: Mutex, + /// Per-room seq counter for federation media delivered to local clients. + /// Ensures clients see monotonically increasing seq regardless of federation sender. + local_delivery_seq: std::sync::atomic::AtomicU16, /// Per-room rate limiters for inbound federation media. rate_limiters: Mutex>, } @@ -164,6 +167,7 @@ impl FederationManager { metrics, peer_links: Arc::new(Mutex::new(HashMap::new())), dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), + local_delivery_seq: std::sync::atomic::AtomicU16::new(0), rate_limiters: Mutex::new(HashMap::new()), } } @@ -907,15 +911,26 @@ async fn handle_datagram( } } - // Deliver to all local participants + // Deliver to all local participants with rewritten seq/fec + // so the client sees a monotonic stream regardless of which federation sender let locals = { let mgr = fm.room_mgr.lock().await; mgr.local_senders(&room_name) }; - for sender in &locals { - match sender { - room::ParticipantSender::Quic(t) => { let _ = t.send_media(&pkt).await; } - room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; } + if !locals.is_empty() { + let new_seq = fm.local_delivery_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let mut local_pkt = pkt.clone(); + local_pkt.header.seq = new_seq; + // Rewrite FEC block/symbol to match new seq so decoder doesn't see stale blocks + let frames_per_block = 5u16; // matches default FEC config + local_pkt.header.fec_block = (new_seq / frames_per_block) as u8; + local_pkt.header.fec_symbol = (new_seq % frames_per_block) as u8; + local_pkt.header.is_repair = false; // federation packets are source-only for local delivery + for sender in &locals { + match sender { + room::ParticipantSender::Quic(t) => { let _ = t.send_media(&local_pkt).await; } + room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&local_pkt.payload).await; } + } } }