T4.4: SignalMessage::Nack + PictureLossIndication; NACK sender/receiver state machines

This commit is contained in:
Siavash Sameni
2026-05-12 09:25:29 +04:00
parent e177e63843
commit 81042ac190
7 changed files with 695 additions and 11 deletions

View File

@@ -11,7 +11,7 @@
//! 5. Connects QUIC to relay for media
use serde::{Deserialize, Serialize};
use wzp_proto::packet::{SignalMessage, default_signal_version};
use wzp_proto::packet::SignalMessage;
/// featherChat CallSignal types (mirrors warzone-protocol::message::CallSignalType).
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -141,6 +141,9 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
| SignalMessage::QualityCapability { .. } => CallSignalType::Offer, // quality negotiation
SignalMessage::PresenceList { .. } => CallSignalType::Offer, // lobby presence
SignalMessage::QualityDirective { .. } => CallSignalType::Offer, // relay-initiated
SignalMessage::Nack { .. } | SignalMessage::PictureLossIndication { .. } => {
CallSignalType::Offer
} // relay-initiated (video loss recovery)
}
}
@@ -148,6 +151,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
mod tests {
use super::*;
use wzp_proto::QualityProfile;
use wzp_proto::default_signal_version;
#[test]
fn payload_roundtrip() {

View File

@@ -1183,6 +1183,29 @@ pub enum SignalMessage {
/// Receiver-side arrival time of the latest packet (microseconds since epoch).
recv_time_us: u64,
},
/// Negative acknowledgement — request retransmission of specific packets.
/// Sent by the receiver when it detects gaps and RTT is low enough
/// that retransmission will arrive before decode deadline.
Nack {
/// NACK format version (default 1).
#[serde(default = "default_signal_version")]
version: u8,
/// Which media stream has the gap.
stream_id: u8,
/// Missing sequence numbers.
seqs: Vec<u32>,
},
/// Picture Loss Indication — decoder can't proceed, needs a fresh keyframe.
/// Used instead of Nack when RTT is too high for retransmission to help.
PictureLossIndication {
/// PLI format version (default 1).
#[serde(default = "default_signal_version")]
version: u8,
/// Which media stream needs the keyframe.
stream_id: u8,
},
}
/// How the callee responds to a direct call.
@@ -2679,4 +2702,81 @@ mod tests {
_ => panic!("wrong variant"),
}
}
#[test]
fn nack_roundtrip() {
let original = SignalMessage::Nack {
version: 1,
stream_id: 7,
seqs: vec![42, 43, 44],
};
let json = serde_json::to_string(&original).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::Nack {
version,
stream_id,
seqs,
} => {
assert_eq!(version, 1);
assert_eq!(stream_id, 7);
assert_eq!(seqs, vec![42, 43, 44]);
}
_ => panic!("wrong variant"),
}
let bin = bincode::serialize(&original).unwrap();
let decoded: SignalMessage = bincode::deserialize(&bin).unwrap();
assert!(matches!(decoded, SignalMessage::Nack { .. }));
}
#[test]
fn nack_default_version() {
let json = r#"{"Nack": {"stream_id": 3, "seqs": [10, 11]}}"#;
let decoded: SignalMessage = serde_json::from_str(json).unwrap();
match decoded {
SignalMessage::Nack { version, .. } => {
assert_eq!(version, 1, "serde default makes omitted version 1");
}
_ => panic!("wrong variant"),
}
}
#[test]
fn picture_loss_indication_roundtrip() {
let original = SignalMessage::PictureLossIndication {
version: 1,
stream_id: 5,
};
let json = serde_json::to_string(&original).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::PictureLossIndication { version, stream_id } => {
assert_eq!(version, 1);
assert_eq!(stream_id, 5);
}
_ => panic!("wrong variant"),
}
let bin = bincode::serialize(&original).unwrap();
let decoded: SignalMessage = bincode::deserialize(&bin).unwrap();
assert!(matches!(
decoded,
SignalMessage::PictureLossIndication { .. }
));
}
#[test]
fn picture_loss_indication_default_version() {
let json = r#"{"PictureLossIndication": {"stream_id": 2}}"#;
let decoded: SignalMessage = serde_json::from_str(json).unwrap();
match decoded {
SignalMessage::PictureLossIndication { version, .. } => {
assert_eq!(version, 1, "serde default makes omitted version 1");
}
_ => panic!("wrong variant"),
}
}
}

View File

@@ -9,12 +9,14 @@ pub mod depacketizer;
pub mod encoder;
pub mod framer;
pub mod mediacodec;
pub mod nack;
pub mod videotoolbox;
pub use decoder::VideoDecoder;
pub use depacketizer::H264Depacketizer;
pub use encoder::{VideoEncoder, VideoError, VideoFrame};
pub use framer::{FramedPacket, H264Framer};
pub use nack::{CachedPacket, NackAction, NackReceiver, NackSender};
pub use videotoolbox::{VideoToolboxDecoder, VideoToolboxEncoder};
#[cfg(test)]

View File

@@ -0,0 +1,381 @@
//! NACK sender / receiver state machines for video packet-loss recovery.
//!
//! The sender side caches the last 500 ms of packets so it can retransmit on
//! request. The receiver side detects gaps and decides whether to NACK (low
//! RTT) or emit a Picture-Loss-Indication (high RTT).
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
/// A packet cached for potential retransmission.
#[derive(Clone, Debug, PartialEq)]
pub struct CachedPacket {
pub seq: u32,
pub data: Vec<u8>,
pub timestamp_ms: u64,
}
/// Action emitted by the receiver-side NACK state machine.
#[derive(Debug, Clone, PartialEq)]
pub enum NackAction {
/// Request retransmission of one or more packets.
Nack { seqs: Vec<u32> },
/// RTT is too high for NACK to help — request a keyframe instead.
PictureLossIndication,
}
/// Sender-side NACK handler.
///
/// Retains recently sent packets in a 500 ms ring buffer. On `Nack` the
/// sender looks up the requested sequence numbers and returns clones of the
/// cached payloads (if they are still in the buffer).
#[derive(Debug)]
pub struct NackSender {
buffer: Vec<(Instant, CachedPacket)>,
max_age: Duration,
}
impl NackSender {
pub const DEFAULT_MAX_AGE_MS: u64 = 500;
/// Create a new sender buffer.
pub fn new() -> Self {
Self {
buffer: Vec::with_capacity(1024),
max_age: Duration::from_millis(Self::DEFAULT_MAX_AGE_MS),
}
}
/// Record a packet that was just sent.
pub fn on_send(&mut self, packet: CachedPacket, now: Instant) {
self.buffer.push((now, packet));
}
/// Handle an incoming NACK — return any packets we still have.
pub fn on_nack(&mut self, seqs: &[u32], now: Instant) -> Vec<CachedPacket> {
self.evict(now);
let mut out = Vec::with_capacity(seqs.len());
for seq in seqs {
if let Some((_, pkt)) = self.buffer.iter().find(|(_, p)| p.seq == *seq) {
out.push(pkt.clone());
}
}
out
}
/// Periodic housekeeping — evict stale packets.
pub fn tick(&mut self, now: Instant) {
self.evict(now);
}
fn evict(&mut self, now: Instant) {
self.buffer
.retain(|(t, _)| now.duration_since(*t) <= self.max_age);
}
}
impl Default for NackSender {
fn default() -> Self {
Self::new()
}
}
/// Receiver-side NACK / PLI state machine.
///
/// Tracks received sequence numbers and emits [`NackAction`]s for gaps.
///
/// Rules (from PRD-video-v1):
/// * Wait at least `frame_interval` after a gap is noticed before acting.
/// * If `RTT < 2 * frame_interval` → emit `Nack`.
/// * Otherwise → emit `PictureLossIndication`.
/// * Backoff: max 1 Nack per sequence number per `2 * RTT`.
/// * Rate cap: max 50 NACKs / second.
#[derive(Debug)]
pub struct NackReceiver {
frame_interval: Duration,
rtt: Duration,
/// Missing seq → when first noticed.
missing: BTreeMap<u32, Instant>,
/// Seq → when last NACK sent.
last_nack: BTreeMap<u32, Instant>,
/// Next expected sequence number (contiguous from start).
next_expected: u32,
/// NACK rate cap window.
nacks_this_sec: u32,
sec_window: Instant,
max_nack_rate: u32,
}
impl NackReceiver {
pub const DEFAULT_MAX_NACK_RATE: u32 = 50;
/// Create a new receiver state machine.
///
/// * `frame_interval` — e.g. 33 ms for 30 fps.
/// * `rtt` — initial RTT estimate.
pub fn new(frame_interval: Duration, rtt: Duration) -> Self {
Self {
frame_interval,
rtt,
missing: BTreeMap::new(),
last_nack: BTreeMap::new(),
next_expected: 0,
nacks_this_sec: 0,
sec_window: Instant::now(),
max_nack_rate: Self::DEFAULT_MAX_NACK_RATE,
}
}
/// Update the RTT estimate (e.g. from transport feedback).
pub fn set_rtt(&mut self, rtt: Duration) {
self.rtt = rtt;
}
/// Record that a packet was received.
pub fn on_packet(&mut self, seq: u32, now: Instant) {
// Advance the rate window.
if now.duration_since(self.sec_window) >= Duration::from_secs(1) {
self.sec_window = now;
self.nacks_this_sec = 0;
}
let ahead = seq.wrapping_sub(self.next_expected);
if ahead == 0 {
// In-order packet, no gap.
self.next_expected = self.next_expected.wrapping_add(1);
self.missing.remove(&seq);
self.last_nack.remove(&seq);
} else if ahead < u32::MAX / 2 {
// seq >= next_expected (with wrap handling). There is a gap.
for offset in 0..ahead {
let missing_seq = self.next_expected.wrapping_add(offset);
self.missing.entry(missing_seq).or_insert(now);
}
self.next_expected = seq.wrapping_add(1);
self.missing.remove(&seq);
self.last_nack.remove(&seq);
} else {
// seq < next_expected — reordered or very late. Just remove from missing.
self.missing.remove(&seq);
self.last_nack.remove(&seq);
}
}
/// Periodic check — evaluate gaps and decide whether to NACK or PLI.
///
/// Call this at roughly `frame_interval` granularity (or on a timer).
pub fn tick(&mut self, now: Instant) -> Vec<NackAction> {
if now.duration_since(self.sec_window) >= Duration::from_secs(1) {
self.sec_window = now;
self.nacks_this_sec = 0;
}
let threshold = self.frame_interval;
let backoff = self.rtt.saturating_mul(2);
let mut nack_seqs = Vec::new();
for (&seq, &noticed_at) in &self.missing {
if now.duration_since(noticed_at) < threshold {
continue; // too fresh, packet may still arrive
}
if let Some(&last_nack_time) = self.last_nack.get(&seq) {
if now.duration_since(last_nack_time) < backoff {
continue; // still in backoff
}
}
nack_seqs.push(seq);
}
if nack_seqs.is_empty() {
return Vec::new();
}
// Decide NACK vs PLI based on RTT.
if self.rtt < self.frame_interval.saturating_mul(2) {
// Rate cap: clamp batch to remaining budget.
let budget = self.max_nack_rate.saturating_sub(self.nacks_this_sec) as usize;
if budget == 0 {
return vec![NackAction::PictureLossIndication];
}
nack_seqs.truncate(budget);
self.nacks_this_sec += nack_seqs.len() as u32;
for seq in &nack_seqs {
self.last_nack.insert(*seq, now);
}
vec![NackAction::Nack { seqs: nack_seqs }]
} else {
vec![NackAction::PictureLossIndication]
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}
#[test]
fn sender_caches_and_retransmits() {
let mut sender = NackSender::new();
let now = Instant::now();
sender.on_send(
CachedPacket {
seq: 10,
data: vec![1, 2, 3],
timestamp_ms: 100,
},
now,
);
sender.on_send(
CachedPacket {
seq: 11,
data: vec![4, 5, 6],
timestamp_ms: 133,
},
now,
);
let found = sender.on_nack(&[10, 11], now);
assert_eq!(found.len(), 2);
assert_eq!(found[0].seq, 10);
assert_eq!(found[1].seq, 11);
}
#[test]
fn sender_evicts_after_500ms() {
let mut sender = NackSender::new();
let now = Instant::now();
sender.on_send(
CachedPacket {
seq: 10,
data: vec![1],
timestamp_ms: 0,
},
now,
);
let later = now + Duration::from_millis(501);
let found = sender.on_nack(&[10], later);
assert!(found.is_empty(), "packet should be evicted after 500 ms");
}
#[test]
fn receiver_detects_gap_and_nacks() {
let mut recv = NackReceiver::new(ms(33), ms(20));
let now = Instant::now();
recv.on_packet(0, now);
recv.on_packet(2, now); // gap: 1 is missing
// Immediately tick — gap is too fresh.
let actions = recv.tick(now);
assert!(actions.is_empty());
// After frame_interval, should NACK.
let later = now + ms(40);
let actions = recv.tick(later);
assert_eq!(actions.len(), 1);
assert!(matches!(actions[0], NackAction::Nack { ref seqs } if seqs == &[1]));
}
#[test]
fn receiver_uses_pli_when_rtt_is_high() {
let mut recv = NackReceiver::new(ms(33), ms(100));
let now = Instant::now();
recv.on_packet(0, now);
recv.on_packet(2, now); // gap: 1 is missing
let later = now + ms(40);
let actions = recv.tick(later);
assert_eq!(actions.len(), 1);
assert_eq!(actions[0], NackAction::PictureLossIndication);
}
#[test]
fn receiver_backoff_respects_2x_rtt() {
let mut recv = NackReceiver::new(ms(33), ms(20));
let now = Instant::now();
recv.on_packet(0, now);
recv.on_packet(2, now); // gap: 1 is missing
let later = now + ms(40);
let actions = recv.tick(later);
assert!(matches!(actions[0], NackAction::Nack { .. }));
// Tick again immediately — should be in backoff.
let actions2 = recv.tick(later);
assert!(actions2.is_empty(), "should not re-nack within 2*RTT");
// After backoff expires, should NACK again.
let much_later = later + ms(50); // 2*RTT = 40ms
let actions3 = recv.tick(much_later);
assert!(matches!(actions3[0], NackAction::Nack { .. }));
}
#[test]
fn receiver_late_packet_fills_gap() {
let mut recv = NackReceiver::new(ms(33), ms(20));
let now = Instant::now();
recv.on_packet(0, now);
recv.on_packet(2, now); // gap: 1 is missing
let later = now + ms(40);
let actions = recv.tick(later);
assert!(matches!(actions[0], NackAction::Nack { .. }));
// Late arrival of packet 1
recv.on_packet(1, later);
let actions2 = recv.tick(later + ms(1));
assert!(
actions2.is_empty()
|| !matches!(actions2[0], NackAction::Nack { seqs: ref s } if s.contains(&1)),
"filled gap should not be nacked again"
);
}
#[test]
fn receiver_rate_cap_falls_back_to_pli() {
let mut recv = NackReceiver::new(ms(33), ms(20));
let now = Instant::now();
// Create many gaps.
recv.on_packet(0, now);
recv.on_packet(100, now); // gaps 1..99
let later = now + ms(40);
let actions = recv.tick(later);
// Either we got a Nack with <= max_nack_rate seqs, or we got PLI.
match actions.first() {
Some(NackAction::Nack { seqs }) => {
assert!(
seqs.len() as u32 <= NackReceiver::DEFAULT_MAX_NACK_RATE,
"rate cap exceeded"
);
}
Some(NackAction::PictureLossIndication) => {}
_ => panic!("expected an action"),
}
}
#[test]
fn receiver_wraparound_ok() {
let mut recv = NackReceiver::new(ms(33), ms(20));
let now = Instant::now();
recv.on_packet(u32::MAX, now);
recv.on_packet(1, now); // gap: 0 is missing (wrap)
let later = now + ms(40);
let actions = recv.tick(later);
assert!(matches!(actions[0], NackAction::Nack { ref seqs } if seqs == &[0]));
}
}