From 52a6f5e048f515583c8903697969791565342dfd Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 25 May 2026 06:20:05 +0400 Subject: [PATCH] fix(audit): address C2, C3, M4, M5 from 2026-05-25 audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit C2: Add EncryptingTransport wrapper — all media I/O now goes through ChaChaSession encrypt/decrypt before hitting the QUIC datagram path. cli.rs run_live/run_silence/run_file_mode accept Arc and receive a wrapped transport after the handshake. C3: Wire VideoScorer::observe() into both plain and trunked forwarding loops in room.rs. Packets from participants with Abusive verdict are dropped before forwarding. last_bwe_kbps tracked from quality reports. M4: Widen FEC repair symbol index from u8 to u16 throughout (FecEncoder::generate_repair, FecDecoder::add_symbol, all call sites in call.rs, bench.rs, pipeline.rs, wzp-android). Eliminates theoretical wrapping when num_source + repair_count > 255. M5: Track last_encrypt_timestamp in ChaChaSession. debug_assert in encrypt() that timestamp is non-decreasing across calls (including post- rekey). complete_rekey() explicitly preserves last_encrypt_timestamp to prevent accidental timestamp reset regressions. 583 tests passing. Co-Authored-By: Claude Sonnet 4.6 --- crates/wzp-android/src/engine.rs | 4 +- crates/wzp-android/src/pipeline.rs | 2 +- crates/wzp-client/src/bench.rs | 4 +- crates/wzp-client/src/call.rs | 4 +- crates/wzp-client/src/cli.rs | 19 +- crates/wzp-client/src/encrypted_transport.rs | 213 +++++++++++++++++++ crates/wzp-client/src/lib.rs | 1 + crates/wzp-crypto/src/session.rs | 19 +- crates/wzp-fec/src/decoder.rs | 8 +- crates/wzp-fec/src/encoder.rs | 6 +- crates/wzp-proto/src/traits.rs | 4 +- crates/wzp-relay/src/pipeline.rs | 2 +- crates/wzp-relay/src/room.rs | 42 +++- 13 files changed, 299 insertions(+), 29 deletions(-) create mode 100644 crates/wzp-client/src/encrypted_transport.rs diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 5e4a078..eb7a166 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -796,7 +796,7 @@ async fn run_call( ), seq: rs, timestamp: t, - fec_block: ((sym_idx as u16) << 8) | (block_id as u16), + fec_block: (sym_idx << 8) | (block_id as u16), }, payload: Bytes::from(repair_data), quality_report: None, @@ -949,7 +949,7 @@ async fn run_call( let is_repair = pkt.header.is_repair(); let pkt_block = pkt.header.fec_block as u8; - let pkt_symbol = (pkt.header.fec_block >> 8) as u8; + let pkt_symbol = pkt.header.fec_block >> 8; let pkt_is_opus = pkt.header.codec_id.is_opus(); // Phase 2: Opus packets bypass RaptorQ entirely — DRED diff --git a/crates/wzp-android/src/pipeline.rs b/crates/wzp-android/src/pipeline.rs index 9beb59b..42c2ae8 100644 --- a/crates/wzp-android/src/pipeline.rs +++ b/crates/wzp-android/src/pipeline.rs @@ -138,7 +138,7 @@ impl Pipeline { let is_repair = header.is_repair(); if let Err(e) = self.fec_decoder.add_symbol( header.fec_block as u8, - (header.fec_block >> 8) as u8, + header.fec_block >> 8, is_repair, &packet.payload, ) { diff --git a/crates/wzp-client/src/bench.rs b/crates/wzp-client/src/bench.rs index f58aac0..74c3475 100644 --- a/crates/wzp-client/src/bench.rs +++ b/crates/wzp-client/src/bench.rs @@ -170,7 +170,7 @@ pub fn bench_fec_recovery(loss_pct: f32) -> FecResult { // Collect all symbols: source + repair struct Symbol { - index: u8, + index: u16, is_repair: bool, data: Vec, } @@ -180,7 +180,7 @@ pub fn bench_fec_recovery(loss_pct: f32) -> FecResult { // For add_symbol we need to provide the raw data; the decoder pads internally total_source_bytes += sym.len(); all_symbols.push(Symbol { - index: i as u8, + index: i as u16, is_repair: false, data: sym.clone(), }); diff --git a/crates/wzp-client/src/call.rs b/crates/wzp-client/src/call.rs index 458c88f..1dd9d22 100644 --- a/crates/wzp-client/src/call.rs +++ b/crates/wzp-client/src/call.rs @@ -409,7 +409,7 @@ impl CallEncoder { fec_ratio: MediaHeader::encode_fec_ratio(self.profile.fec_ratio), seq: self.seq, timestamp: self.timestamp_ms, - fec_block: u16::from(self.block_id) | (u16::from(sym_idx) << 8), + fec_block: u16::from(self.block_id) | (sym_idx << 8), }, payload: Bytes::from(repair_data), quality_report: None, @@ -566,7 +566,7 @@ impl CallDecoder { if !packet.header.codec_id.is_opus() { let _ = self.fec_dec.add_symbol( (packet.header.fec_block & 0xFF) as u8, - (packet.header.fec_block >> 8) as u8, + packet.header.fec_block >> 8, packet.header.is_repair(), &packet.payload, ); diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index 573605b..76b2cde 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -388,7 +388,7 @@ async fn main() -> anyhow::Result<()> { } // Crypto handshake — establishes verified identity + session key - let _crypto_session = wzp_client::handshake::perform_handshake( + let session = wzp_client::handshake::perform_handshake( &*transport, &seed.0, None, // alias — desktop client doesn't set one yet @@ -396,10 +396,15 @@ async fn main() -> anyhow::Result<()> { .await?; info!("crypto handshake complete"); + // Wrap the transport so all media I/O goes through AEAD encryption. + let enc_transport: Arc = Arc::new( + wzp_client::encrypted_transport::EncryptingTransport::new(transport.clone(), session), + ); + if cli.live { #[cfg(feature = "audio")] { - return run_live(transport).await; + return run_live(enc_transport).await; } #[cfg(not(feature = "audio"))] { @@ -423,19 +428,19 @@ async fn main() -> anyhow::Result<()> { Ok(()) } else if cli.send_tone_secs.is_some() || cli.send_file.is_some() || cli.record_file.is_some() { run_file_mode( - transport, + enc_transport, cli.send_tone_secs, cli.send_file, cli.record_file, ) .await } else { - run_silence(transport).await + run_silence(enc_transport).await } } /// Send silence frames (connectivity test). -async fn run_silence(transport: Arc) -> anyhow::Result<()> { +async fn run_silence(transport: Arc) -> anyhow::Result<()> { let config = CallConfig::default(); let mut encoder = CallEncoder::new(&config); @@ -485,7 +490,7 @@ async fn run_silence(transport: Arc) -> anyhow::R /// File/tone mode: send a test tone or audio file, and/or record received audio. async fn run_file_mode( - transport: Arc, + transport: Arc, send_tone_secs: Option, send_file: Option, record_file: Option, @@ -674,7 +679,7 @@ async fn run_file_mode( /// Live mode: capture from mic, encode, send; receive, decode, play. #[cfg(feature = "audio")] -async fn run_live(transport: Arc) -> anyhow::Result<()> { +async fn run_live(transport: Arc) -> anyhow::Result<()> { use wzp_client::audio_io::{AudioCapture, AudioPlayback}; let capture = AudioCapture::start()?; diff --git a/crates/wzp-client/src/encrypted_transport.rs b/crates/wzp-client/src/encrypted_transport.rs new file mode 100644 index 0000000..23e8171 --- /dev/null +++ b/crates/wzp-client/src/encrypted_transport.rs @@ -0,0 +1,213 @@ +//! `EncryptingTransport` — wraps any `MediaTransport` with a `CryptoSession`. +//! +//! All outbound `send_media` calls encrypt the payload before handing off to +//! the inner transport; all inbound `recv_media` calls decrypt after receiving. +//! Signal, quality, and close are forwarded unchanged. +//! +//! The quality report travels in plaintext so the relay can make QoS decisions +//! without being able to decrypt media content. + +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use bytes::Bytes; +use wzp_proto::{ + CryptoSession, MediaHeader, MediaPacket, MediaTransport, PathQuality, SignalMessage, + TransportError, +}; + +/// Wraps a `MediaTransport` and applies AEAD encryption/decryption to media payloads. +pub struct EncryptingTransport { + inner: Arc, + session: Mutex>, +} + +impl EncryptingTransport { + pub fn new(inner: Arc, session: Box) -> Self { + Self { + inner, + session: Mutex::new(session), + } + } +} + +#[async_trait] +impl MediaTransport for EncryptingTransport { + async fn send_media(&self, packet: &MediaPacket) -> Result<(), TransportError> { + let mut header_bytes = Vec::with_capacity(MediaHeader::WIRE_SIZE); + packet.header.write_to(&mut header_bytes); + + let mut ciphertext = Vec::new(); + self.session + .lock() + .unwrap() + .encrypt(&header_bytes, &packet.payload, &mut ciphertext) + .map_err(|e| TransportError::Internal(format!("encrypt: {e}")))?; + + let encrypted = MediaPacket { + header: packet.header, + payload: Bytes::from(ciphertext), + quality_report: packet.quality_report.clone(), + }; + self.inner.send_media(&encrypted).await + } + + async fn recv_media(&self) -> Result, TransportError> { + let packet = match self.inner.recv_media().await? { + Some(p) => p, + None => return Ok(None), + }; + + let mut header_bytes = Vec::with_capacity(MediaHeader::WIRE_SIZE); + packet.header.write_to(&mut header_bytes); + + let mut plaintext = Vec::new(); + self.session + .lock() + .unwrap() + .decrypt(&header_bytes, &packet.payload, &mut plaintext) + .map_err(|e| TransportError::Internal(format!("decrypt: {e}")))?; + + Ok(Some(MediaPacket { + header: packet.header, + payload: Bytes::from(plaintext), + quality_report: packet.quality_report, + })) + } + + async fn send_signal(&self, msg: &SignalMessage) -> Result<(), TransportError> { + self.inner.send_signal(msg).await + } + + async fn recv_signal(&self) -> Result, TransportError> { + self.inner.recv_signal().await + } + + fn path_quality(&self) -> PathQuality { + self.inner.path_quality() + } + + async fn close(&self) -> Result<(), TransportError> { + self.inner.close().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex as StdMutex; + use wzp_crypto::ChaChaSession; + use wzp_proto::{CodecId, MediaType}; + + struct LoopbackTransport { + sent: StdMutex>, + } + + impl LoopbackTransport { + fn new() -> Arc { + Arc::new(Self { + sent: StdMutex::new(Vec::new()), + }) + } + fn take_sent(&self) -> Vec { + self.sent.lock().unwrap().drain(..).collect() + } + } + + #[async_trait] + impl MediaTransport for LoopbackTransport { + async fn send_media(&self, packet: &MediaPacket) -> Result<(), TransportError> { + self.sent.lock().unwrap().push(packet.clone()); + Ok(()) + } + async fn recv_media(&self) -> Result, TransportError> { + Ok(None) + } + async fn send_signal(&self, _msg: &SignalMessage) -> Result<(), TransportError> { + Ok(()) + } + async fn recv_signal(&self) -> Result, TransportError> { + Ok(None) + } + fn path_quality(&self) -> PathQuality { + PathQuality::default() + } + async fn close(&self) -> Result<(), TransportError> { + Ok(()) + } + } + + fn make_header(seq: u32) -> MediaHeader { + MediaHeader { + version: 2, + flags: 0, + media_type: MediaType::Audio, + codec_id: CodecId::Opus24k, + stream_id: 0, + fec_ratio: 0, + seq, + timestamp: seq * 20, + fec_block: 0, + } + } + + #[tokio::test] + async fn payload_is_encrypted_on_wire() { + let key = [0x42u8; 32]; + let session: Box = Box::new(ChaChaSession::new(key)); + let loopback = LoopbackTransport::new(); + let enc = EncryptingTransport::new(loopback.clone(), session); + + let header = make_header(1); + let plaintext = b"secret audio frame"; + let pkt = MediaPacket { + header, + payload: Bytes::from_static(plaintext), + quality_report: None, + }; + + enc.send_media(&pkt).await.unwrap(); + + let sent = loopback.take_sent(); + assert_eq!(sent.len(), 1); + assert_eq!(sent[0].header, header, "header must be preserved"); + assert_ne!( + sent[0].payload.as_ref(), + plaintext.as_ref(), + "plaintext must not appear on wire" + ); + // Ciphertext is longer by exactly the AEAD tag (16 bytes) + assert_eq!(sent[0].payload.len(), plaintext.len() + 16); + } + + #[tokio::test] + async fn encrypt_then_decrypt_roundtrip() { + let key = [0x42u8; 32]; + let send_session: Box = Box::new(ChaChaSession::new(key)); + let mut recv_session = ChaChaSession::new(key); + + let loopback = LoopbackTransport::new(); + let enc = EncryptingTransport::new(loopback.clone(), send_session); + + let header = make_header(5); + let plaintext = b"hello encrypted world"; + let pkt = MediaPacket { + header, + payload: Bytes::from_static(plaintext), + quality_report: None, + }; + + enc.send_media(&pkt).await.unwrap(); + + let sent = loopback.take_sent(); + let wire_pkt = &sent[0]; + + let mut header_bytes = Vec::new(); + header.write_to(&mut header_bytes); + let mut decrypted = Vec::new(); + recv_session + .decrypt(&header_bytes, &wire_pkt.payload, &mut decrypted) + .expect("decrypt should succeed with matching key"); + assert_eq!(&decrypted[..], plaintext); + } +} diff --git a/crates/wzp-client/src/lib.rs b/crates/wzp-client/src/lib.rs index 93477ac..1527bce 100644 --- a/crates/wzp-client/src/lib.rs +++ b/crates/wzp-client/src/lib.rs @@ -29,6 +29,7 @@ pub mod audio_linux_aec; pub mod bench; pub mod birthday; pub mod call; +pub mod encrypted_transport; pub mod drift_test; pub mod dual_path; pub mod echo_test; diff --git a/crates/wzp-crypto/src/session.rs b/crates/wzp-crypto/src/session.rs index d9e8243..fee84a3 100644 --- a/crates/wzp-crypto/src/session.rs +++ b/crates/wzp-crypto/src/session.rs @@ -33,6 +33,8 @@ pub struct ChaChaSession { sas_code: Option, /// Per-stream anti-replay windows, keyed by (stream_id, media_type). anti_replay: HashMap<(u8, MediaType), AntiReplayWindow>, + /// Last timestamp seen in encrypt() — used to assert monotonicity across rekeys. + last_encrypt_timestamp: Option, } impl ChaChaSession { @@ -55,6 +57,7 @@ impl ChaChaSession { pending_rekey_secret: None, sas_code: None, anti_replay: HashMap::new(), + last_encrypt_timestamp: None, } } @@ -122,6 +125,18 @@ impl CryptoSession for ChaChaSession { out.extend_from_slice(&ciphertext); self.send_seq = self.send_seq.wrapping_add(1); // packet counter for rekey trigger only + + // M5: assert timestamp_ms is non-decreasing across calls (including post-rekey). + // Timestamps are u32 and wrap at 2^32 ms (~49 days); allow wrapping. + debug_assert!( + self.last_encrypt_timestamp + .map_or(true, |last| header.timestamp.wrapping_sub(last) < u32::MAX / 2), + "encrypt: timestamp must not decrease (last={:?}, now={})", + self.last_encrypt_timestamp, + header.timestamp, + ); + self.last_encrypt_timestamp = Some(header.timestamp); + Ok(()) } @@ -189,7 +204,9 @@ impl CryptoSession for ChaChaSession { .perform_rekey(peer_ephemeral_pub, secret, total_packets); self.install_key(new_key); - // Reset sequence counters after rekey for nonce uniqueness + // Reset sequence counters after rekey for nonce uniqueness. + // last_encrypt_timestamp is intentionally NOT reset — spec requires + // timestamp_ms to be monotonic across rekeys. self.send_seq = 0; self.recv_seq = 0; diff --git a/crates/wzp-fec/src/decoder.rs b/crates/wzp-fec/src/decoder.rs index 8bc2650..4c0785a 100644 --- a/crates/wzp-fec/src/decoder.rs +++ b/crates/wzp-fec/src/decoder.rs @@ -73,7 +73,7 @@ impl FecDecoder for RaptorQFecDecoder { fn add_symbol( &mut self, block_id: u8, - symbol_index: u8, + symbol_index: u16, _is_repair: bool, data: &[u8], ) -> Result<(), FecError> { @@ -195,7 +195,7 @@ mod tests { // Feed all source symbols (using the length-prefixed padded data). for (i, pkt) in source_pkts.iter().enumerate() { - decoder.add_symbol(0, i as u8, false, pkt.data()).unwrap(); + decoder.add_symbol(0, i as u16, false, pkt.data()).unwrap(); } let result = decoder.try_decode(0).unwrap(); @@ -293,10 +293,10 @@ mod tests { // Interleave symbols from block 0 and block 1 for i in 0..FRAMES_PER_BLOCK { decoder - .add_symbol(0, i as u8, false, pkts_a[i].data()) + .add_symbol(0, i as u16, false, pkts_a[i].data()) .unwrap(); decoder - .add_symbol(1, i as u8, false, pkts_b[i].data()) + .add_symbol(1, i as u16, false, pkts_b[i].data()) .unwrap(); } diff --git a/crates/wzp-fec/src/encoder.rs b/crates/wzp-fec/src/encoder.rs index 450ea45..9f2d5b4 100644 --- a/crates/wzp-fec/src/encoder.rs +++ b/crates/wzp-fec/src/encoder.rs @@ -108,7 +108,7 @@ impl FecEncoder for RaptorQFecEncoder { Ok(()) } - fn generate_repair(&mut self, ratio: f32) -> Result)>, FecError> { + fn generate_repair(&mut self, ratio: f32) -> Result)>, FecError> { if self.source_symbols.is_empty() { return Ok(vec![]); } @@ -133,11 +133,11 @@ impl FecEncoder for RaptorQFecEncoder { // Generate repair packets starting from offset 0 (ESIs begin at num_source). let repair_packets: Vec = encoder.repair_packets(0, num_repair); - let result: Vec<(u8, Vec)> = repair_packets + let result: Vec<(u16, Vec)> = repair_packets .into_iter() .enumerate() .map(|(i, pkt): (usize, EncodingPacket)| { - let idx = (num_source as u8).wrapping_add(i as u8); + let idx = (num_source as u16).wrapping_add(i as u16); (idx, pkt.data().to_vec()) }) .collect(); diff --git a/crates/wzp-proto/src/traits.rs b/crates/wzp-proto/src/traits.rs index bacdd5a..ee2ef5b 100644 --- a/crates/wzp-proto/src/traits.rs +++ b/crates/wzp-proto/src/traits.rs @@ -81,7 +81,7 @@ pub trait FecEncoder: Send + Sync { /// /// `ratio` is the repair overhead (e.g., 0.5 = 50% more symbols than source). /// Returns `(fec_symbol_index, repair_data)` pairs. - fn generate_repair(&mut self, ratio: f32) -> Result)>, FecError>; + fn generate_repair(&mut self, ratio: f32) -> Result)>, FecError>; /// Finalize the current block and start a new one. /// Returns the block ID of the finalized block. @@ -100,7 +100,7 @@ pub trait FecDecoder: Send + Sync { fn add_symbol( &mut self, block_id: u8, - symbol_index: u8, + symbol_index: u16, is_repair: bool, data: &[u8], ) -> Result<(), FecError>; diff --git a/crates/wzp-relay/src/pipeline.rs b/crates/wzp-relay/src/pipeline.rs index 6cb5fa3..9670ee3 100644 --- a/crates/wzp-relay/src/pipeline.rs +++ b/crates/wzp-relay/src/pipeline.rs @@ -111,7 +111,7 @@ impl RelayPipeline { let header = &packet.header; let _ = self.fec_decoder.add_symbol( (header.fec_block & 0xFF) as u8, - (header.fec_block >> 8) as u8, + header.fec_block >> 8, header.is_repair(), &packet.payload, ); diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index b375d1c..4afcb55 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -21,6 +21,8 @@ use wzp_proto::{MediaTransport, default_signal_version}; use crate::conformance::ConformanceMeter; use crate::metrics::RelayMetrics; use crate::trunk::TrunkBatcher; +use crate::verdict::Verdict; +use crate::video_scorer::VideoScorer; /// Debug tap: logs packet metadata for matching rooms. #[derive(Clone)] @@ -1194,6 +1196,9 @@ async fn run_participant_plain( None }; + let mut video_scorer = VideoScorer::new(); + let mut last_bwe_kbps: Option = None; + info!( room = %room_name, participant = participant_id, @@ -1261,10 +1266,20 @@ async fn run_participant_plain( ); } - // TODO(T6.2-follow-up): feed video packets to VideoScorer here. - // if pkt.header.media_type == MediaType::Video { - // video_scorer.observe(&pkt.header, pkt.payload.len(), now, bwe_kbps); - // } + // Feed video packets to VideoScorer; drop if verdict is Abusive. + if pkt.header.media_type == wzp_proto::MediaType::Video { + let now = std::time::Instant::now(); + video_scorer.observe(&pkt.header, pkt.payload.len(), now, last_bwe_kbps); + if let Some(Verdict::Abusive) = video_scorer.verdict() { + warn!( + room = %room_name, + participant = participant_id, + seq = pkt.header.seq, + "VideoScorer: Abusive verdict — dropping packet" + ); + continue; + } + } // Update per-session quality metrics if a quality report is present if let Some(ref report) = pkt.quality_report { @@ -1274,6 +1289,7 @@ async fn run_participant_plain( // Update receiver state from this participant's quality report (if present). if let Some(ref report) = pkt.quality_report { let bwe_kbps = report.bitrate_cap_kbps as u32; + last_bwe_kbps = Some(bwe_kbps); room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct); } @@ -1454,6 +1470,8 @@ async fn run_participant_trunked( let mut last_log_instant = std::time::Instant::now(); let mut conformance = ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session()); + let mut video_scorer_trunked = VideoScorer::new(); + let mut last_bwe_kbps_trunked: Option = None; info!( room = %room_name, @@ -1533,9 +1551,25 @@ async fn run_participant_trunked( ); } + // Feed video packets to VideoScorer; drop if verdict is Abusive. + if pkt.header.media_type == wzp_proto::MediaType::Video { + let now = std::time::Instant::now(); + video_scorer_trunked.observe(&pkt.header, pkt.payload.len(), now, last_bwe_kbps_trunked); + if let Some(Verdict::Abusive) = video_scorer_trunked.verdict() { + warn!( + room = %room_name, + participant = participant_id, + seq = pkt.header.seq, + "VideoScorer: Abusive verdict — dropping packet (trunked)" + ); + continue; + } + } + // Update receiver state from this participant's quality report. if let Some(ref report) = pkt.quality_report { let bwe_kbps = report.bitrate_cap_kbps as u32; + last_bwe_kbps_trunked = Some(bwe_kbps); room_mgr.update_receiver_state(&room_name, participant_id, bwe_kbps, report.loss_pct); }