From c3bd65722420c1f65d7cdc09ca300f638db26071 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 8 Apr 2026 15:26:00 +0400 Subject: [PATCH] =?UTF-8?q?fix:=20FEC=20decoder=20resets=20stale=20blocks?= =?UTF-8?q?=20=E2=80=94=20fixes=20consecutive=20federation=20connects?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a new sender reuses the same block_id values as a previous sender, the FEC decoder was silently dropping all data because blocks were marked as "already decoded". Now blocks older than 2 seconds are automatically reset when new data arrives for them. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-fec/src/decoder.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/crates/wzp-fec/src/decoder.rs b/crates/wzp-fec/src/decoder.rs index 65b772a..b11841f 100644 --- a/crates/wzp-fec/src/decoder.rs +++ b/crates/wzp-fec/src/decoder.rs @@ -1,6 +1,7 @@ //! RaptorQ FEC decoder — reassembles source blocks from received source and repair symbols. use std::collections::HashMap; +use std::time::Instant; use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder}; use wzp_proto::error::FecError; @@ -9,6 +10,9 @@ use wzp_proto::FecDecoder; /// Length prefix size (u16 little-endian), must match encoder. const LEN_PREFIX: usize = 2; +/// Decoded blocks older than this are eligible for reuse by a new sender. +const BLOCK_STALE_SECS: u64 = 2; + /// State for one in-flight block being decoded. struct BlockState { /// Number of source symbols expected. @@ -21,6 +25,8 @@ struct BlockState { decoded: bool, /// Cached decoded result. result: Option>>, + /// When this block was last decoded (for staleness check). + decoded_at: Option, } /// RaptorQ-based FEC decoder that handles multiple concurrent blocks. @@ -58,6 +64,7 @@ impl RaptorQFecDecoder { symbol_size: self.symbol_size, decoded: false, result: None, + decoded_at: None, }) } } @@ -74,8 +81,20 @@ impl FecDecoder for RaptorQFecDecoder { let block = self.get_or_create_block(block_id); if block.decoded { - // Already decoded, ignore additional symbols. - return Ok(()); + // If the block was decoded recently, skip (normal duplicate). + // If it's stale (>2s), a new sender is reusing this block_id — reset it. + if let Some(at) = block.decoded_at { + if at.elapsed().as_secs() >= BLOCK_STALE_SECS { + block.decoded = false; + block.result = None; + block.decoded_at = None; + block.packets.clear(); + } else { + return Ok(()); + } + } else { + return Ok(()); + } } // Data should already be at symbol_size (length-prefixed and padded by the encoder). @@ -132,6 +151,7 @@ impl FecDecoder for RaptorQFecDecoder { let block = self.blocks.get_mut(&block_id).unwrap(); block.decoded = true; + block.decoded_at = Some(Instant::now()); block.result = Some(frames.clone()); Ok(Some(frames)) }