fix: rewrite seq/fec for federation-delivered packets
Federation media from different senders had conflicting seq numbers, FEC block IDs, and Opus decoder state. The relay now assigns fresh monotonic seq/fec_block/fec_symbol to all federation-delivered packets, ensuring clients see a clean continuous stream regardless of sender changes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -140,6 +140,9 @@ pub struct FederationManager {
|
|||||||
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
|
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
|
||||||
/// Dedup filter for incoming federation datagrams.
|
/// Dedup filter for incoming federation datagrams.
|
||||||
dedup: Mutex<Deduplicator>,
|
dedup: Mutex<Deduplicator>,
|
||||||
|
/// Per-room seq counter for federation media delivered to local clients.
|
||||||
|
/// Ensures clients see monotonically increasing seq regardless of federation sender.
|
||||||
|
local_delivery_seq: std::sync::atomic::AtomicU16,
|
||||||
/// Per-room rate limiters for inbound federation media.
|
/// Per-room rate limiters for inbound federation media.
|
||||||
rate_limiters: Mutex<HashMap<String, RateLimiter>>,
|
rate_limiters: Mutex<HashMap<String, RateLimiter>>,
|
||||||
}
|
}
|
||||||
@@ -164,6 +167,7 @@ impl FederationManager {
|
|||||||
metrics,
|
metrics,
|
||||||
peer_links: Arc::new(Mutex::new(HashMap::new())),
|
peer_links: Arc::new(Mutex::new(HashMap::new())),
|
||||||
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
|
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
|
||||||
|
local_delivery_seq: std::sync::atomic::AtomicU16::new(0),
|
||||||
rate_limiters: Mutex::new(HashMap::new()),
|
rate_limiters: Mutex::new(HashMap::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -907,15 +911,26 @@ async fn handle_datagram(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deliver to all local participants
|
// Deliver to all local participants with rewritten seq/fec
|
||||||
|
// so the client sees a monotonic stream regardless of which federation sender
|
||||||
let locals = {
|
let locals = {
|
||||||
let mgr = fm.room_mgr.lock().await;
|
let mgr = fm.room_mgr.lock().await;
|
||||||
mgr.local_senders(&room_name)
|
mgr.local_senders(&room_name)
|
||||||
};
|
};
|
||||||
|
if !locals.is_empty() {
|
||||||
|
let new_seq = fm.local_delivery_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
let mut local_pkt = pkt.clone();
|
||||||
|
local_pkt.header.seq = new_seq;
|
||||||
|
// Rewrite FEC block/symbol to match new seq so decoder doesn't see stale blocks
|
||||||
|
let frames_per_block = 5u16; // matches default FEC config
|
||||||
|
local_pkt.header.fec_block = (new_seq / frames_per_block) as u8;
|
||||||
|
local_pkt.header.fec_symbol = (new_seq % frames_per_block) as u8;
|
||||||
|
local_pkt.header.is_repair = false; // federation packets are source-only for local delivery
|
||||||
for sender in &locals {
|
for sender in &locals {
|
||||||
match sender {
|
match sender {
|
||||||
room::ParticipantSender::Quic(t) => { let _ = t.send_media(&pkt).await; }
|
room::ParticipantSender::Quic(t) => { let _ = t.send_media(&local_pkt).await; }
|
||||||
room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&pkt.payload).await; }
|
room::ParticipantSender::WebSocket(_) => { let _ = sender.send_raw(&local_pkt.payload).await; }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user