fix(audit): address C2, C3, M4, M5 from 2026-05-25 audit
C2: Add EncryptingTransport wrapper — all media I/O now goes through ChaChaSession encrypt/decrypt before hitting the QUIC datagram path. cli.rs run_live/run_silence/run_file_mode accept Arc<dyn MediaTransport> and receive a wrapped transport after the handshake. C3: Wire VideoScorer::observe() into both plain and trunked forwarding loops in room.rs. Packets from participants with Abusive verdict are dropped before forwarding. last_bwe_kbps tracked from quality reports. M4: Widen FEC repair symbol index from u8 to u16 throughout (FecEncoder::generate_repair, FecDecoder::add_symbol, all call sites in call.rs, bench.rs, pipeline.rs, wzp-android). Eliminates theoretical wrapping when num_source + repair_count > 255. M5: Track last_encrypt_timestamp in ChaChaSession. debug_assert in encrypt() that timestamp is non-decreasing across calls (including post- rekey). complete_rekey() explicitly preserves last_encrypt_timestamp to prevent accidental timestamp reset regressions. 583 tests passing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -796,7 +796,7 @@ async fn run_call(
|
|||||||
),
|
),
|
||||||
seq: rs,
|
seq: rs,
|
||||||
timestamp: t,
|
timestamp: t,
|
||||||
fec_block: ((sym_idx as u16) << 8) | (block_id as u16),
|
fec_block: (sym_idx << 8) | (block_id as u16),
|
||||||
},
|
},
|
||||||
payload: Bytes::from(repair_data),
|
payload: Bytes::from(repair_data),
|
||||||
quality_report: None,
|
quality_report: None,
|
||||||
@@ -949,7 +949,7 @@ async fn run_call(
|
|||||||
|
|
||||||
let is_repair = pkt.header.is_repair();
|
let is_repair = pkt.header.is_repair();
|
||||||
let pkt_block = pkt.header.fec_block as u8;
|
let pkt_block = pkt.header.fec_block as u8;
|
||||||
let pkt_symbol = (pkt.header.fec_block >> 8) as u8;
|
let pkt_symbol = pkt.header.fec_block >> 8;
|
||||||
let pkt_is_opus = pkt.header.codec_id.is_opus();
|
let pkt_is_opus = pkt.header.codec_id.is_opus();
|
||||||
|
|
||||||
// Phase 2: Opus packets bypass RaptorQ entirely — DRED
|
// Phase 2: Opus packets bypass RaptorQ entirely — DRED
|
||||||
|
|||||||
@@ -138,7 +138,7 @@ impl Pipeline {
|
|||||||
let is_repair = header.is_repair();
|
let is_repair = header.is_repair();
|
||||||
if let Err(e) = self.fec_decoder.add_symbol(
|
if let Err(e) = self.fec_decoder.add_symbol(
|
||||||
header.fec_block as u8,
|
header.fec_block as u8,
|
||||||
(header.fec_block >> 8) as u8,
|
header.fec_block >> 8,
|
||||||
is_repair,
|
is_repair,
|
||||||
&packet.payload,
|
&packet.payload,
|
||||||
) {
|
) {
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ pub fn bench_fec_recovery(loss_pct: f32) -> FecResult {
|
|||||||
|
|
||||||
// Collect all symbols: source + repair
|
// Collect all symbols: source + repair
|
||||||
struct Symbol {
|
struct Symbol {
|
||||||
index: u8,
|
index: u16,
|
||||||
is_repair: bool,
|
is_repair: bool,
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
}
|
}
|
||||||
@@ -180,7 +180,7 @@ pub fn bench_fec_recovery(loss_pct: f32) -> FecResult {
|
|||||||
// For add_symbol we need to provide the raw data; the decoder pads internally
|
// For add_symbol we need to provide the raw data; the decoder pads internally
|
||||||
total_source_bytes += sym.len();
|
total_source_bytes += sym.len();
|
||||||
all_symbols.push(Symbol {
|
all_symbols.push(Symbol {
|
||||||
index: i as u8,
|
index: i as u16,
|
||||||
is_repair: false,
|
is_repair: false,
|
||||||
data: sym.clone(),
|
data: sym.clone(),
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -409,7 +409,7 @@ impl CallEncoder {
|
|||||||
fec_ratio: MediaHeader::encode_fec_ratio(self.profile.fec_ratio),
|
fec_ratio: MediaHeader::encode_fec_ratio(self.profile.fec_ratio),
|
||||||
seq: self.seq,
|
seq: self.seq,
|
||||||
timestamp: self.timestamp_ms,
|
timestamp: self.timestamp_ms,
|
||||||
fec_block: u16::from(self.block_id) | (u16::from(sym_idx) << 8),
|
fec_block: u16::from(self.block_id) | (sym_idx << 8),
|
||||||
},
|
},
|
||||||
payload: Bytes::from(repair_data),
|
payload: Bytes::from(repair_data),
|
||||||
quality_report: None,
|
quality_report: None,
|
||||||
@@ -566,7 +566,7 @@ impl CallDecoder {
|
|||||||
if !packet.header.codec_id.is_opus() {
|
if !packet.header.codec_id.is_opus() {
|
||||||
let _ = self.fec_dec.add_symbol(
|
let _ = self.fec_dec.add_symbol(
|
||||||
(packet.header.fec_block & 0xFF) as u8,
|
(packet.header.fec_block & 0xFF) as u8,
|
||||||
(packet.header.fec_block >> 8) as u8,
|
packet.header.fec_block >> 8,
|
||||||
packet.header.is_repair(),
|
packet.header.is_repair(),
|
||||||
&packet.payload,
|
&packet.payload,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -388,7 +388,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Crypto handshake — establishes verified identity + session key
|
// Crypto handshake — establishes verified identity + session key
|
||||||
let _crypto_session = wzp_client::handshake::perform_handshake(
|
let session = wzp_client::handshake::perform_handshake(
|
||||||
&*transport,
|
&*transport,
|
||||||
&seed.0,
|
&seed.0,
|
||||||
None, // alias — desktop client doesn't set one yet
|
None, // alias — desktop client doesn't set one yet
|
||||||
@@ -396,10 +396,15 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.await?;
|
.await?;
|
||||||
info!("crypto handshake complete");
|
info!("crypto handshake complete");
|
||||||
|
|
||||||
|
// Wrap the transport so all media I/O goes through AEAD encryption.
|
||||||
|
let enc_transport: Arc<dyn wzp_proto::MediaTransport> = Arc::new(
|
||||||
|
wzp_client::encrypted_transport::EncryptingTransport::new(transport.clone(), session),
|
||||||
|
);
|
||||||
|
|
||||||
if cli.live {
|
if cli.live {
|
||||||
#[cfg(feature = "audio")]
|
#[cfg(feature = "audio")]
|
||||||
{
|
{
|
||||||
return run_live(transport).await;
|
return run_live(enc_transport).await;
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "audio"))]
|
#[cfg(not(feature = "audio"))]
|
||||||
{
|
{
|
||||||
@@ -423,19 +428,19 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
} else if cli.send_tone_secs.is_some() || cli.send_file.is_some() || cli.record_file.is_some() {
|
} else if cli.send_tone_secs.is_some() || cli.send_file.is_some() || cli.record_file.is_some() {
|
||||||
run_file_mode(
|
run_file_mode(
|
||||||
transport,
|
enc_transport,
|
||||||
cli.send_tone_secs,
|
cli.send_tone_secs,
|
||||||
cli.send_file,
|
cli.send_file,
|
||||||
cli.record_file,
|
cli.record_file,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
run_silence(transport).await
|
run_silence(enc_transport).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send silence frames (connectivity test).
|
/// Send silence frames (connectivity test).
|
||||||
async fn run_silence(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::Result<()> {
|
async fn run_silence(transport: Arc<dyn wzp_proto::MediaTransport>) -> anyhow::Result<()> {
|
||||||
let config = CallConfig::default();
|
let config = CallConfig::default();
|
||||||
let mut encoder = CallEncoder::new(&config);
|
let mut encoder = CallEncoder::new(&config);
|
||||||
|
|
||||||
@@ -485,7 +490,7 @@ async fn run_silence(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::R
|
|||||||
|
|
||||||
/// File/tone mode: send a test tone or audio file, and/or record received audio.
|
/// File/tone mode: send a test tone or audio file, and/or record received audio.
|
||||||
async fn run_file_mode(
|
async fn run_file_mode(
|
||||||
transport: Arc<wzp_transport::QuinnTransport>,
|
transport: Arc<dyn wzp_proto::MediaTransport>,
|
||||||
send_tone_secs: Option<u32>,
|
send_tone_secs: Option<u32>,
|
||||||
send_file: Option<String>,
|
send_file: Option<String>,
|
||||||
record_file: Option<String>,
|
record_file: Option<String>,
|
||||||
@@ -674,7 +679,7 @@ async fn run_file_mode(
|
|||||||
|
|
||||||
/// Live mode: capture from mic, encode, send; receive, decode, play.
|
/// Live mode: capture from mic, encode, send; receive, decode, play.
|
||||||
#[cfg(feature = "audio")]
|
#[cfg(feature = "audio")]
|
||||||
async fn run_live(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::Result<()> {
|
async fn run_live(transport: Arc<dyn wzp_proto::MediaTransport>) -> anyhow::Result<()> {
|
||||||
use wzp_client::audio_io::{AudioCapture, AudioPlayback};
|
use wzp_client::audio_io::{AudioCapture, AudioPlayback};
|
||||||
|
|
||||||
let capture = AudioCapture::start()?;
|
let capture = AudioCapture::start()?;
|
||||||
|
|||||||
213
crates/wzp-client/src/encrypted_transport.rs
Normal file
213
crates/wzp-client/src/encrypted_transport.rs
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
//! `EncryptingTransport` — wraps any `MediaTransport` with a `CryptoSession`.
|
||||||
|
//!
|
||||||
|
//! All outbound `send_media` calls encrypt the payload before handing off to
|
||||||
|
//! the inner transport; all inbound `recv_media` calls decrypt after receiving.
|
||||||
|
//! Signal, quality, and close are forwarded unchanged.
|
||||||
|
//!
|
||||||
|
//! The quality report travels in plaintext so the relay can make QoS decisions
|
||||||
|
//! without being able to decrypt media content.
|
||||||
|
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use wzp_proto::{
|
||||||
|
CryptoSession, MediaHeader, MediaPacket, MediaTransport, PathQuality, SignalMessage,
|
||||||
|
TransportError,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Wraps a `MediaTransport` and applies AEAD encryption/decryption to media payloads.
|
||||||
|
pub struct EncryptingTransport {
|
||||||
|
inner: Arc<dyn MediaTransport>,
|
||||||
|
session: Mutex<Box<dyn CryptoSession>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EncryptingTransport {
|
||||||
|
pub fn new(inner: Arc<dyn MediaTransport>, session: Box<dyn CryptoSession>) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
session: Mutex::new(session),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl MediaTransport for EncryptingTransport {
|
||||||
|
async fn send_media(&self, packet: &MediaPacket) -> Result<(), TransportError> {
|
||||||
|
let mut header_bytes = Vec::with_capacity(MediaHeader::WIRE_SIZE);
|
||||||
|
packet.header.write_to(&mut header_bytes);
|
||||||
|
|
||||||
|
let mut ciphertext = Vec::new();
|
||||||
|
self.session
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.encrypt(&header_bytes, &packet.payload, &mut ciphertext)
|
||||||
|
.map_err(|e| TransportError::Internal(format!("encrypt: {e}")))?;
|
||||||
|
|
||||||
|
let encrypted = MediaPacket {
|
||||||
|
header: packet.header,
|
||||||
|
payload: Bytes::from(ciphertext),
|
||||||
|
quality_report: packet.quality_report.clone(),
|
||||||
|
};
|
||||||
|
self.inner.send_media(&encrypted).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recv_media(&self) -> Result<Option<MediaPacket>, TransportError> {
|
||||||
|
let packet = match self.inner.recv_media().await? {
|
||||||
|
Some(p) => p,
|
||||||
|
None => return Ok(None),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut header_bytes = Vec::with_capacity(MediaHeader::WIRE_SIZE);
|
||||||
|
packet.header.write_to(&mut header_bytes);
|
||||||
|
|
||||||
|
let mut plaintext = Vec::new();
|
||||||
|
self.session
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.decrypt(&header_bytes, &packet.payload, &mut plaintext)
|
||||||
|
.map_err(|e| TransportError::Internal(format!("decrypt: {e}")))?;
|
||||||
|
|
||||||
|
Ok(Some(MediaPacket {
|
||||||
|
header: packet.header,
|
||||||
|
payload: Bytes::from(plaintext),
|
||||||
|
quality_report: packet.quality_report,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_signal(&self, msg: &SignalMessage) -> Result<(), TransportError> {
|
||||||
|
self.inner.send_signal(msg).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recv_signal(&self) -> Result<Option<SignalMessage>, TransportError> {
|
||||||
|
self.inner.recv_signal().await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn path_quality(&self) -> PathQuality {
|
||||||
|
self.inner.path_quality()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&self) -> Result<(), TransportError> {
|
||||||
|
self.inner.close().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::sync::Mutex as StdMutex;
|
||||||
|
use wzp_crypto::ChaChaSession;
|
||||||
|
use wzp_proto::{CodecId, MediaType};
|
||||||
|
|
||||||
|
struct LoopbackTransport {
|
||||||
|
sent: StdMutex<Vec<MediaPacket>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LoopbackTransport {
|
||||||
|
fn new() -> Arc<Self> {
|
||||||
|
Arc::new(Self {
|
||||||
|
sent: StdMutex::new(Vec::new()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn take_sent(&self) -> Vec<MediaPacket> {
|
||||||
|
self.sent.lock().unwrap().drain(..).collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl MediaTransport for LoopbackTransport {
|
||||||
|
async fn send_media(&self, packet: &MediaPacket) -> Result<(), TransportError> {
|
||||||
|
self.sent.lock().unwrap().push(packet.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn recv_media(&self) -> Result<Option<MediaPacket>, TransportError> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
async fn send_signal(&self, _msg: &SignalMessage) -> Result<(), TransportError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn recv_signal(&self) -> Result<Option<SignalMessage>, TransportError> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
fn path_quality(&self) -> PathQuality {
|
||||||
|
PathQuality::default()
|
||||||
|
}
|
||||||
|
async fn close(&self) -> Result<(), TransportError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_header(seq: u32) -> MediaHeader {
|
||||||
|
MediaHeader {
|
||||||
|
version: 2,
|
||||||
|
flags: 0,
|
||||||
|
media_type: MediaType::Audio,
|
||||||
|
codec_id: CodecId::Opus24k,
|
||||||
|
stream_id: 0,
|
||||||
|
fec_ratio: 0,
|
||||||
|
seq,
|
||||||
|
timestamp: seq * 20,
|
||||||
|
fec_block: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn payload_is_encrypted_on_wire() {
|
||||||
|
let key = [0x42u8; 32];
|
||||||
|
let session: Box<dyn CryptoSession> = Box::new(ChaChaSession::new(key));
|
||||||
|
let loopback = LoopbackTransport::new();
|
||||||
|
let enc = EncryptingTransport::new(loopback.clone(), session);
|
||||||
|
|
||||||
|
let header = make_header(1);
|
||||||
|
let plaintext = b"secret audio frame";
|
||||||
|
let pkt = MediaPacket {
|
||||||
|
header,
|
||||||
|
payload: Bytes::from_static(plaintext),
|
||||||
|
quality_report: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
enc.send_media(&pkt).await.unwrap();
|
||||||
|
|
||||||
|
let sent = loopback.take_sent();
|
||||||
|
assert_eq!(sent.len(), 1);
|
||||||
|
assert_eq!(sent[0].header, header, "header must be preserved");
|
||||||
|
assert_ne!(
|
||||||
|
sent[0].payload.as_ref(),
|
||||||
|
plaintext.as_ref(),
|
||||||
|
"plaintext must not appear on wire"
|
||||||
|
);
|
||||||
|
// Ciphertext is longer by exactly the AEAD tag (16 bytes)
|
||||||
|
assert_eq!(sent[0].payload.len(), plaintext.len() + 16);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn encrypt_then_decrypt_roundtrip() {
|
||||||
|
let key = [0x42u8; 32];
|
||||||
|
let send_session: Box<dyn CryptoSession> = Box::new(ChaChaSession::new(key));
|
||||||
|
let mut recv_session = ChaChaSession::new(key);
|
||||||
|
|
||||||
|
let loopback = LoopbackTransport::new();
|
||||||
|
let enc = EncryptingTransport::new(loopback.clone(), send_session);
|
||||||
|
|
||||||
|
let header = make_header(5);
|
||||||
|
let plaintext = b"hello encrypted world";
|
||||||
|
let pkt = MediaPacket {
|
||||||
|
header,
|
||||||
|
payload: Bytes::from_static(plaintext),
|
||||||
|
quality_report: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
enc.send_media(&pkt).await.unwrap();
|
||||||
|
|
||||||
|
let sent = loopback.take_sent();
|
||||||
|
let wire_pkt = &sent[0];
|
||||||
|
|
||||||
|
let mut header_bytes = Vec::new();
|
||||||
|
header.write_to(&mut header_bytes);
|
||||||
|
let mut decrypted = Vec::new();
|
||||||
|
recv_session
|
||||||
|
.decrypt(&header_bytes, &wire_pkt.payload, &mut decrypted)
|
||||||
|
.expect("decrypt should succeed with matching key");
|
||||||
|
assert_eq!(&decrypted[..], plaintext);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -29,6 +29,7 @@ pub mod audio_linux_aec;
|
|||||||
pub mod bench;
|
pub mod bench;
|
||||||
pub mod birthday;
|
pub mod birthday;
|
||||||
pub mod call;
|
pub mod call;
|
||||||
|
pub mod encrypted_transport;
|
||||||
pub mod drift_test;
|
pub mod drift_test;
|
||||||
pub mod dual_path;
|
pub mod dual_path;
|
||||||
pub mod echo_test;
|
pub mod echo_test;
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ pub struct ChaChaSession {
|
|||||||
sas_code: Option<u32>,
|
sas_code: Option<u32>,
|
||||||
/// Per-stream anti-replay windows, keyed by (stream_id, media_type).
|
/// Per-stream anti-replay windows, keyed by (stream_id, media_type).
|
||||||
anti_replay: HashMap<(u8, MediaType), AntiReplayWindow>,
|
anti_replay: HashMap<(u8, MediaType), AntiReplayWindow>,
|
||||||
|
/// Last timestamp seen in encrypt() — used to assert monotonicity across rekeys.
|
||||||
|
last_encrypt_timestamp: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChaChaSession {
|
impl ChaChaSession {
|
||||||
@@ -55,6 +57,7 @@ impl ChaChaSession {
|
|||||||
pending_rekey_secret: None,
|
pending_rekey_secret: None,
|
||||||
sas_code: None,
|
sas_code: None,
|
||||||
anti_replay: HashMap::new(),
|
anti_replay: HashMap::new(),
|
||||||
|
last_encrypt_timestamp: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,6 +125,18 @@ impl CryptoSession for ChaChaSession {
|
|||||||
|
|
||||||
out.extend_from_slice(&ciphertext);
|
out.extend_from_slice(&ciphertext);
|
||||||
self.send_seq = self.send_seq.wrapping_add(1); // packet counter for rekey trigger only
|
self.send_seq = self.send_seq.wrapping_add(1); // packet counter for rekey trigger only
|
||||||
|
|
||||||
|
// M5: assert timestamp_ms is non-decreasing across calls (including post-rekey).
|
||||||
|
// Timestamps are u32 and wrap at 2^32 ms (~49 days); allow wrapping.
|
||||||
|
debug_assert!(
|
||||||
|
self.last_encrypt_timestamp
|
||||||
|
.map_or(true, |last| header.timestamp.wrapping_sub(last) < u32::MAX / 2),
|
||||||
|
"encrypt: timestamp must not decrease (last={:?}, now={})",
|
||||||
|
self.last_encrypt_timestamp,
|
||||||
|
header.timestamp,
|
||||||
|
);
|
||||||
|
self.last_encrypt_timestamp = Some(header.timestamp);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -189,7 +204,9 @@ impl CryptoSession for ChaChaSession {
|
|||||||
.perform_rekey(peer_ephemeral_pub, secret, total_packets);
|
.perform_rekey(peer_ephemeral_pub, secret, total_packets);
|
||||||
self.install_key(new_key);
|
self.install_key(new_key);
|
||||||
|
|
||||||
// Reset sequence counters after rekey for nonce uniqueness
|
// Reset sequence counters after rekey for nonce uniqueness.
|
||||||
|
// last_encrypt_timestamp is intentionally NOT reset — spec requires
|
||||||
|
// timestamp_ms to be monotonic across rekeys.
|
||||||
self.send_seq = 0;
|
self.send_seq = 0;
|
||||||
self.recv_seq = 0;
|
self.recv_seq = 0;
|
||||||
|
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ impl FecDecoder for RaptorQFecDecoder {
|
|||||||
fn add_symbol(
|
fn add_symbol(
|
||||||
&mut self,
|
&mut self,
|
||||||
block_id: u8,
|
block_id: u8,
|
||||||
symbol_index: u8,
|
symbol_index: u16,
|
||||||
_is_repair: bool,
|
_is_repair: bool,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
) -> Result<(), FecError> {
|
) -> Result<(), FecError> {
|
||||||
@@ -195,7 +195,7 @@ mod tests {
|
|||||||
|
|
||||||
// Feed all source symbols (using the length-prefixed padded data).
|
// Feed all source symbols (using the length-prefixed padded data).
|
||||||
for (i, pkt) in source_pkts.iter().enumerate() {
|
for (i, pkt) in source_pkts.iter().enumerate() {
|
||||||
decoder.add_symbol(0, i as u8, false, pkt.data()).unwrap();
|
decoder.add_symbol(0, i as u16, false, pkt.data()).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = decoder.try_decode(0).unwrap();
|
let result = decoder.try_decode(0).unwrap();
|
||||||
@@ -293,10 +293,10 @@ mod tests {
|
|||||||
// Interleave symbols from block 0 and block 1
|
// Interleave symbols from block 0 and block 1
|
||||||
for i in 0..FRAMES_PER_BLOCK {
|
for i in 0..FRAMES_PER_BLOCK {
|
||||||
decoder
|
decoder
|
||||||
.add_symbol(0, i as u8, false, pkts_a[i].data())
|
.add_symbol(0, i as u16, false, pkts_a[i].data())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
decoder
|
decoder
|
||||||
.add_symbol(1, i as u8, false, pkts_b[i].data())
|
.add_symbol(1, i as u16, false, pkts_b[i].data())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ impl FecEncoder for RaptorQFecEncoder {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_repair(&mut self, ratio: f32) -> Result<Vec<(u8, Vec<u8>)>, FecError> {
|
fn generate_repair(&mut self, ratio: f32) -> Result<Vec<(u16, Vec<u8>)>, FecError> {
|
||||||
if self.source_symbols.is_empty() {
|
if self.source_symbols.is_empty() {
|
||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
}
|
}
|
||||||
@@ -133,11 +133,11 @@ impl FecEncoder for RaptorQFecEncoder {
|
|||||||
// Generate repair packets starting from offset 0 (ESIs begin at num_source).
|
// Generate repair packets starting from offset 0 (ESIs begin at num_source).
|
||||||
let repair_packets: Vec<EncodingPacket> = encoder.repair_packets(0, num_repair);
|
let repair_packets: Vec<EncodingPacket> = encoder.repair_packets(0, num_repair);
|
||||||
|
|
||||||
let result: Vec<(u8, Vec<u8>)> = repair_packets
|
let result: Vec<(u16, Vec<u8>)> = repair_packets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, pkt): (usize, EncodingPacket)| {
|
.map(|(i, pkt): (usize, EncodingPacket)| {
|
||||||
let idx = (num_source as u8).wrapping_add(i as u8);
|
let idx = (num_source as u16).wrapping_add(i as u16);
|
||||||
(idx, pkt.data().to_vec())
|
(idx, pkt.data().to_vec())
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ pub trait FecEncoder: Send + Sync {
|
|||||||
///
|
///
|
||||||
/// `ratio` is the repair overhead (e.g., 0.5 = 50% more symbols than source).
|
/// `ratio` is the repair overhead (e.g., 0.5 = 50% more symbols than source).
|
||||||
/// Returns `(fec_symbol_index, repair_data)` pairs.
|
/// Returns `(fec_symbol_index, repair_data)` pairs.
|
||||||
fn generate_repair(&mut self, ratio: f32) -> Result<Vec<(u8, Vec<u8>)>, FecError>;
|
fn generate_repair(&mut self, ratio: f32) -> Result<Vec<(u16, Vec<u8>)>, FecError>;
|
||||||
|
|
||||||
/// Finalize the current block and start a new one.
|
/// Finalize the current block and start a new one.
|
||||||
/// Returns the block ID of the finalized block.
|
/// Returns the block ID of the finalized block.
|
||||||
@@ -100,7 +100,7 @@ pub trait FecDecoder: Send + Sync {
|
|||||||
fn add_symbol(
|
fn add_symbol(
|
||||||
&mut self,
|
&mut self,
|
||||||
block_id: u8,
|
block_id: u8,
|
||||||
symbol_index: u8,
|
symbol_index: u16,
|
||||||
is_repair: bool,
|
is_repair: bool,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
) -> Result<(), FecError>;
|
) -> Result<(), FecError>;
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ impl RelayPipeline {
|
|||||||
let header = &packet.header;
|
let header = &packet.header;
|
||||||
let _ = self.fec_decoder.add_symbol(
|
let _ = self.fec_decoder.add_symbol(
|
||||||
(header.fec_block & 0xFF) as u8,
|
(header.fec_block & 0xFF) as u8,
|
||||||
(header.fec_block >> 8) as u8,
|
header.fec_block >> 8,
|
||||||
header.is_repair(),
|
header.is_repair(),
|
||||||
&packet.payload,
|
&packet.payload,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -21,6 +21,8 @@ use wzp_proto::{MediaTransport, default_signal_version};
|
|||||||
use crate::conformance::ConformanceMeter;
|
use crate::conformance::ConformanceMeter;
|
||||||
use crate::metrics::RelayMetrics;
|
use crate::metrics::RelayMetrics;
|
||||||
use crate::trunk::TrunkBatcher;
|
use crate::trunk::TrunkBatcher;
|
||||||
|
use crate::verdict::Verdict;
|
||||||
|
use crate::video_scorer::VideoScorer;
|
||||||
|
|
||||||
/// Debug tap: logs packet metadata for matching rooms.
|
/// Debug tap: logs packet metadata for matching rooms.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -1194,6 +1196,9 @@ async fn run_participant_plain(
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut video_scorer = VideoScorer::new();
|
||||||
|
let mut last_bwe_kbps: Option<u32> = None;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
room = %room_name,
|
room = %room_name,
|
||||||
participant = participant_id,
|
participant = participant_id,
|
||||||
@@ -1261,10 +1266,20 @@ async fn run_participant_plain(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(T6.2-follow-up): feed video packets to VideoScorer here.
|
// Feed video packets to VideoScorer; drop if verdict is Abusive.
|
||||||
// if pkt.header.media_type == MediaType::Video {
|
if pkt.header.media_type == wzp_proto::MediaType::Video {
|
||||||
// video_scorer.observe(&pkt.header, pkt.payload.len(), now, bwe_kbps);
|
let now = std::time::Instant::now();
|
||||||
// }
|
video_scorer.observe(&pkt.header, pkt.payload.len(), now, last_bwe_kbps);
|
||||||
|
if let Some(Verdict::Abusive) = video_scorer.verdict() {
|
||||||
|
warn!(
|
||||||
|
room = %room_name,
|
||||||
|
participant = participant_id,
|
||||||
|
seq = pkt.header.seq,
|
||||||
|
"VideoScorer: Abusive verdict — dropping packet"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update per-session quality metrics if a quality report is present
|
// Update per-session quality metrics if a quality report is present
|
||||||
if let Some(ref report) = pkt.quality_report {
|
if let Some(ref report) = pkt.quality_report {
|
||||||
@@ -1274,6 +1289,7 @@ async fn run_participant_plain(
|
|||||||
// Update receiver state from this participant's quality report (if present).
|
// Update receiver state from this participant's quality report (if present).
|
||||||
if let Some(ref report) = pkt.quality_report {
|
if let Some(ref report) = pkt.quality_report {
|
||||||
let bwe_kbps = report.bitrate_cap_kbps as u32;
|
let bwe_kbps = report.bitrate_cap_kbps as u32;
|
||||||
|
last_bwe_kbps = Some(bwe_kbps);
|
||||||
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
|
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1454,6 +1470,8 @@ async fn run_participant_trunked(
|
|||||||
let mut last_log_instant = std::time::Instant::now();
|
let mut last_log_instant = std::time::Instant::now();
|
||||||
let mut conformance =
|
let mut conformance =
|
||||||
ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session());
|
ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session());
|
||||||
|
let mut video_scorer_trunked = VideoScorer::new();
|
||||||
|
let mut last_bwe_kbps_trunked: Option<u32> = None;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
room = %room_name,
|
room = %room_name,
|
||||||
@@ -1533,9 +1551,25 @@ async fn run_participant_trunked(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Feed video packets to VideoScorer; drop if verdict is Abusive.
|
||||||
|
if pkt.header.media_type == wzp_proto::MediaType::Video {
|
||||||
|
let now = std::time::Instant::now();
|
||||||
|
video_scorer_trunked.observe(&pkt.header, pkt.payload.len(), now, last_bwe_kbps_trunked);
|
||||||
|
if let Some(Verdict::Abusive) = video_scorer_trunked.verdict() {
|
||||||
|
warn!(
|
||||||
|
room = %room_name,
|
||||||
|
participant = participant_id,
|
||||||
|
seq = pkt.header.seq,
|
||||||
|
"VideoScorer: Abusive verdict — dropping packet (trunked)"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update receiver state from this participant's quality report.
|
// Update receiver state from this participant's quality report.
|
||||||
if let Some(ref report) = pkt.quality_report {
|
if let Some(ref report) = pkt.quality_report {
|
||||||
let bwe_kbps = report.bitrate_cap_kbps as u32;
|
let bwe_kbps = report.bitrate_cap_kbps as u32;
|
||||||
|
last_bwe_kbps_trunked = Some(bwe_kbps);
|
||||||
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
|
room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user