diff --git a/crates/wzp-fec/src/decoder.rs b/crates/wzp-fec/src/decoder.rs index 65b772a..b11841f 100644 --- a/crates/wzp-fec/src/decoder.rs +++ b/crates/wzp-fec/src/decoder.rs @@ -1,6 +1,7 @@ //! RaptorQ FEC decoder — reassembles source blocks from received source and repair symbols. use std::collections::HashMap; +use std::time::Instant; use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder}; use wzp_proto::error::FecError; @@ -9,6 +10,9 @@ use wzp_proto::FecDecoder; /// Length prefix size (u16 little-endian), must match encoder. const LEN_PREFIX: usize = 2; +/// Decoded blocks older than this are eligible for reuse by a new sender. +const BLOCK_STALE_SECS: u64 = 2; + /// State for one in-flight block being decoded. struct BlockState { /// Number of source symbols expected. @@ -21,6 +25,8 @@ struct BlockState { decoded: bool, /// Cached decoded result. result: Option>>, + /// When this block was last decoded (for staleness check). + decoded_at: Option, } /// RaptorQ-based FEC decoder that handles multiple concurrent blocks. @@ -58,6 +64,7 @@ impl RaptorQFecDecoder { symbol_size: self.symbol_size, decoded: false, result: None, + decoded_at: None, }) } } @@ -74,8 +81,20 @@ impl FecDecoder for RaptorQFecDecoder { let block = self.get_or_create_block(block_id); if block.decoded { - // Already decoded, ignore additional symbols. - return Ok(()); + // If the block was decoded recently, skip (normal duplicate). + // If it's stale (>2s), a new sender is reusing this block_id — reset it. + if let Some(at) = block.decoded_at { + if at.elapsed().as_secs() >= BLOCK_STALE_SECS { + block.decoded = false; + block.result = None; + block.decoded_at = None; + block.packets.clear(); + } else { + return Ok(()); + } + } else { + return Ok(()); + } } // Data should already be at symbol_size (length-prefixed and padded by the encoder). @@ -132,6 +151,7 @@ impl FecDecoder for RaptorQFecDecoder { let block = self.blocks.get_mut(&block_id).unwrap(); block.decoded = true; + block.decoded_at = Some(Instant::now()); block.result = Some(frames.clone()); Ok(Some(frames)) }