fix: FEC decoder resets stale blocks — fixes consecutive federation connects
Some checks failed
Mirror to GitHub / mirror (push) Failing after 36s
Build Release Binaries / build-amd64 (push) Failing after 2m0s

When a new sender reuses the same block_id values as a previous sender,
the FEC decoder was silently dropping all data because blocks were marked
as "already decoded". Now blocks older than 2 seconds are automatically
reset when new data arrives for them.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 15:26:00 +04:00
parent 8b79cdc6fc
commit c3bd657224

View File

@@ -1,6 +1,7 @@
//! RaptorQ FEC decoder — reassembles source blocks from received source and repair symbols. //! RaptorQ FEC decoder — reassembles source blocks from received source and repair symbols.
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Instant;
use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder}; use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder};
use wzp_proto::error::FecError; use wzp_proto::error::FecError;
@@ -9,6 +10,9 @@ use wzp_proto::FecDecoder;
/// Length prefix size (u16 little-endian), must match encoder. /// Length prefix size (u16 little-endian), must match encoder.
const LEN_PREFIX: usize = 2; const LEN_PREFIX: usize = 2;
/// Decoded blocks older than this are eligible for reuse by a new sender.
const BLOCK_STALE_SECS: u64 = 2;
/// State for one in-flight block being decoded. /// State for one in-flight block being decoded.
struct BlockState { struct BlockState {
/// Number of source symbols expected. /// Number of source symbols expected.
@@ -21,6 +25,8 @@ struct BlockState {
decoded: bool, decoded: bool,
/// Cached decoded result. /// Cached decoded result.
result: Option<Vec<Vec<u8>>>, result: Option<Vec<Vec<u8>>>,
/// When this block was last decoded (for staleness check).
decoded_at: Option<Instant>,
} }
/// RaptorQ-based FEC decoder that handles multiple concurrent blocks. /// RaptorQ-based FEC decoder that handles multiple concurrent blocks.
@@ -58,6 +64,7 @@ impl RaptorQFecDecoder {
symbol_size: self.symbol_size, symbol_size: self.symbol_size,
decoded: false, decoded: false,
result: None, result: None,
decoded_at: None,
}) })
} }
} }
@@ -74,8 +81,20 @@ impl FecDecoder for RaptorQFecDecoder {
let block = self.get_or_create_block(block_id); let block = self.get_or_create_block(block_id);
if block.decoded { if block.decoded {
// Already decoded, ignore additional symbols. // If the block was decoded recently, skip (normal duplicate).
return Ok(()); // If it's stale (>2s), a new sender is reusing this block_id — reset it.
if let Some(at) = block.decoded_at {
if at.elapsed().as_secs() >= BLOCK_STALE_SECS {
block.decoded = false;
block.result = None;
block.decoded_at = None;
block.packets.clear();
} else {
return Ok(());
}
} else {
return Ok(());
}
} }
// Data should already be at symbol_size (length-prefixed and padded by the encoder). // Data should already be at symbol_size (length-prefixed and padded by the encoder).
@@ -132,6 +151,7 @@ impl FecDecoder for RaptorQFecDecoder {
let block = self.blocks.get_mut(&block_id).unwrap(); let block = self.blocks.get_mut(&block_id).unwrap();
block.decoded = true; block.decoded = true;
block.decoded_at = Some(Instant::now());
block.result = Some(frames.clone()); block.result = Some(frames.clone());
Ok(Some(frames)) Ok(Some(frames))
} }