diff --git a/Cargo.lock b/Cargo.lock index 7b9278b..7bd74cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + [[package]] name = "async-trait" version = "0.1.89" @@ -288,6 +294,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.14" @@ -359,6 +371,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + [[package]] name = "hkdf" version = "0.12.4" @@ -377,6 +395,16 @@ dependencies = [ "digest", ] +[[package]] +name = "indexmap" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "inout" version = "0.1.4" @@ -989,6 +1017,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + [[package]] name = "sha2" version = "0.10.9" @@ -1200,6 +1237,47 @@ dependencies = [ "syn", ] +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "toml_write", + "winnow", +] + +[[package]] +name = "toml_write" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" + [[package]] name = "tracing" version = "0.1.44" @@ -1624,6 +1702,15 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen" version = "0.51.0" @@ -1634,8 +1721,12 @@ checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" name = "wzp-client" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", + "bytes", "tokio", "tracing", + "tracing-subscriber", "wzp-codec", "wzp-crypto", "wzp-fec", @@ -1692,7 +1783,12 @@ dependencies = [ name = "wzp-relay" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", + "bytes", + "serde", "tokio", + "toml", "tracing", "tracing-subscriber", "wzp-codec", diff --git a/crates/wzp-client/Cargo.toml b/crates/wzp-client/Cargo.toml index 7f10aae..faf05d0 100644 --- a/crates/wzp-client/Cargo.toml +++ b/crates/wzp-client/Cargo.toml @@ -14,5 +14,13 @@ wzp-crypto = { workspace = true } wzp-transport = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +anyhow = "1" + +[[bin]] +name = "wzp-client" +path = "src/cli.rs" [dev-dependencies] diff --git a/crates/wzp-client/src/call.rs b/crates/wzp-client/src/call.rs new file mode 100644 index 0000000..fee7652 --- /dev/null +++ b/crates/wzp-client/src/call.rs @@ -0,0 +1,293 @@ +//! Call session — manages the end-to-end pipeline for a single voice call. +//! +//! Pipeline: mic → encode → FEC → encrypt → send / recv → decrypt → FEC → decode → speaker + +use bytes::Bytes; +use tracing::{debug, warn}; + +use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder}; +use wzp_proto::jitter::{JitterBuffer, PlayoutResult}; +use wzp_proto::packet::{MediaHeader, MediaPacket}; +use wzp_proto::quality::AdaptiveQualityController; +use wzp_proto::traits::{ + AudioDecoder, AudioEncoder, FecDecoder, FecEncoder, +}; +use wzp_proto::QualityProfile; + +/// Configuration for a call session. +pub struct CallConfig { + /// Initial quality profile. + pub profile: QualityProfile, + /// Jitter buffer target depth. + pub jitter_target: usize, + /// Jitter buffer max depth. + pub jitter_max: usize, + /// Jitter buffer min depth before playout. + pub jitter_min: usize, +} + +impl Default for CallConfig { + fn default() -> Self { + Self { + profile: QualityProfile::GOOD, + jitter_target: 50, + jitter_max: 250, + jitter_min: 25, + } + } +} + +/// Manages the encode/send side of a call. +pub struct CallEncoder { + /// Audio encoder (Opus or Codec2). + audio_enc: Box, + /// FEC encoder. + fec_enc: RaptorQFecEncoder, + /// Current profile. + profile: QualityProfile, + /// Outbound sequence counter. + seq: u16, + /// Current FEC block. + block_id: u8, + /// Frame index within current block. + frame_in_block: u8, + /// Timestamp counter (ms). + timestamp_ms: u32, +} + +impl CallEncoder { + pub fn new(config: &CallConfig) -> Self { + Self { + audio_enc: wzp_codec::create_encoder(config.profile), + fec_enc: wzp_fec::create_encoder(&config.profile), + profile: config.profile, + seq: 0, + block_id: 0, + frame_in_block: 0, + timestamp_ms: 0, + } + } + + /// Encode a PCM frame and produce media packets (source + repair when block is full). + /// + /// Input: 48kHz mono PCM, frame size depends on profile (960 for 20ms, 1920 for 40ms). + /// Output: one or more MediaPackets to send. + pub fn encode_frame(&mut self, pcm: &[i16]) -> Result, anyhow::Error> { + // Encode audio + let mut encoded = vec![0u8; self.audio_enc.max_frame_bytes()]; + let enc_len = self.audio_enc.encode(pcm, &mut encoded)?; + encoded.truncate(enc_len); + + // Build source media packet + let source_pkt = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: false, + codec_id: self.profile.codec, + has_quality_report: false, + fec_ratio_encoded: MediaHeader::encode_fec_ratio(self.profile.fec_ratio), + seq: self.seq, + timestamp: self.timestamp_ms, + fec_block: self.block_id, + fec_symbol: self.frame_in_block, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(encoded.clone()), + quality_report: None, + }; + + self.seq = self.seq.wrapping_add(1); + self.timestamp_ms = self + .timestamp_ms + .wrapping_add(self.profile.frame_duration_ms as u32); + + let mut output = vec![source_pkt]; + + // Add to FEC encoder + self.fec_enc.add_source_symbol(&encoded)?; + self.frame_in_block += 1; + + // If block is full, generate repair and finalize + if self.frame_in_block >= self.profile.frames_per_block { + if let Ok(repairs) = self.fec_enc.generate_repair(self.profile.fec_ratio) { + for (sym_idx, repair_data) in repairs { + output.push(MediaPacket { + header: MediaHeader { + version: 0, + is_repair: true, + codec_id: self.profile.codec, + has_quality_report: false, + fec_ratio_encoded: MediaHeader::encode_fec_ratio( + self.profile.fec_ratio, + ), + seq: self.seq, + timestamp: self.timestamp_ms, + fec_block: self.block_id, + fec_symbol: sym_idx, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(repair_data), + quality_report: None, + }); + self.seq = self.seq.wrapping_add(1); + } + } + let _ = self.fec_enc.finalize_block(); + self.block_id = self.block_id.wrapping_add(1); + self.frame_in_block = 0; + } + + Ok(output) + } + + /// Update the quality profile (codec switch, FEC ratio change). + pub fn set_profile(&mut self, profile: QualityProfile) -> Result<(), anyhow::Error> { + self.audio_enc.set_profile(profile)?; + self.fec_enc = wzp_fec::create_encoder(&profile); + self.profile = profile; + self.frame_in_block = 0; + Ok(()) + } +} + +/// Manages the recv/decode side of a call. +pub struct CallDecoder { + /// Audio decoder. + audio_dec: Box, + /// FEC decoder. + fec_dec: RaptorQFecDecoder, + /// Jitter buffer. + jitter: JitterBuffer, + /// Quality controller. + quality: AdaptiveQualityController, + /// Current profile. + profile: QualityProfile, +} + +impl CallDecoder { + pub fn new(config: &CallConfig) -> Self { + Self { + audio_dec: wzp_codec::create_decoder(config.profile), + fec_dec: wzp_fec::create_decoder(&config.profile), + jitter: JitterBuffer::new(config.jitter_target, config.jitter_max, config.jitter_min), + quality: AdaptiveQualityController::new(), + profile: config.profile, + } + } + + /// Feed a received media packet into the decode pipeline. + pub fn ingest(&mut self, packet: MediaPacket) { + // Feed to FEC decoder + let _ = self.fec_dec.add_symbol( + packet.header.fec_block, + packet.header.fec_symbol, + packet.header.is_repair, + &packet.payload, + ); + + // If not a repair packet, also feed directly to jitter buffer + if !packet.header.is_repair { + self.jitter.push(packet); + } + } + + /// Decode the next audio frame from the jitter buffer. + /// + /// Returns PCM samples (48kHz mono) or None if not ready. + pub fn decode_next(&mut self, pcm: &mut [i16]) -> Option { + match self.jitter.pop() { + PlayoutResult::Packet(pkt) => { + match self.audio_dec.decode(&pkt.payload, pcm) { + Ok(n) => Some(n), + Err(e) => { + warn!("decode error: {e}, using PLC"); + self.audio_dec.decode_lost(pcm).ok() + } + } + } + PlayoutResult::Missing { seq } => { + debug!(seq, "packet loss, generating PLC"); + self.audio_dec.decode_lost(pcm).ok() + } + PlayoutResult::NotReady => None, + } + } + + /// Get the current quality profile. + pub fn profile(&self) -> QualityProfile { + self.profile + } +} + +#[cfg(test)] +mod tests { + use super::*; + use wzp_proto::CodecId; + + #[test] + fn encoder_produces_packets() { + let config = CallConfig::default(); + let mut enc = CallEncoder::new(&config); + + // 20ms at 48kHz = 960 samples + let pcm = vec![0i16; 960]; + let packets = enc.encode_frame(&pcm).unwrap(); + assert!(!packets.is_empty()); + assert_eq!(packets[0].header.seq, 0); + assert!(!packets[0].header.is_repair); + } + + #[test] + fn encoder_generates_repair_on_full_block() { + let config = CallConfig { + profile: QualityProfile::GOOD, // 5 frames/block + ..Default::default() + }; + let mut enc = CallEncoder::new(&config); + let pcm = vec![0i16; 960]; + + let mut total_packets = 0; + let mut repair_count = 0; + for _ in 0..5 { + let packets = enc.encode_frame(&pcm).unwrap(); + for p in &packets { + if p.header.is_repair { + repair_count += 1; + } + } + total_packets += packets.len(); + } + assert!(repair_count > 0, "should have repair packets after full block"); + assert!(total_packets > 5, "total {total_packets} should exceed 5 source"); + } + + #[test] + fn decoder_handles_ingest() { + let config = CallConfig::default(); + let mut dec = CallDecoder::new(&config); + + let pkt = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: false, + codec_id: CodecId::Opus24k, + has_quality_report: false, + fec_ratio_encoded: 0, + seq: 0, + timestamp: 0, + fec_block: 0, + fec_symbol: 0, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(vec![0u8; 60]), + quality_report: None, + }; + dec.ingest(pkt); + // Not enough buffered yet (min_depth = 25) + let mut pcm = vec![0i16; 960]; + assert!(dec.decode_next(&mut pcm).is_none()); + } +} diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs new file mode 100644 index 0000000..522ce82 --- /dev/null +++ b/crates/wzp-client/src/cli.rs @@ -0,0 +1,56 @@ +//! WarzonePhone CLI test client. +//! +//! Usage: wzp-client +//! +//! Connects to a relay and sends silence frames for testing. + +use std::net::SocketAddr; + +use tracing::{error, info}; + +use wzp_client::call::{CallConfig, CallEncoder}; +use wzp_proto::MediaTransport; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt().init(); + + let relay_addr: SocketAddr = std::env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:4433".to_string()) + .parse()?; + + info!(%relay_addr, "WarzonePhone client connecting"); + + let client_config = wzp_transport::client_config(); + let endpoint = wzp_transport::create_endpoint("0.0.0.0:0".parse()?, None)?; + let connection = + wzp_transport::connect(&endpoint, relay_addr, "localhost", client_config).await?; + + info!("Connected to relay"); + + let transport = wzp_transport::QuinnTransport::new(connection); + let config = CallConfig::default(); + let mut encoder = CallEncoder::new(&config); + + let frame_duration = tokio::time::Duration::from_millis(20); + let pcm = vec![0i16; 960]; // 20ms @ 48kHz silence + + for i in 0..250u32 { + let packets = encoder.encode_frame(&pcm)?; + for pkt in &packets { + if let Err(e) = transport.send_media(pkt).await { + error!("send error: {e}"); + break; + } + } + if i % 50 == 0 { + info!(frame = i, packets = packets.len(), "sent"); + } + tokio::time::sleep(frame_duration).await; + } + + info!("Done, closing"); + transport.close().await?; + Ok(()) +} diff --git a/crates/wzp-client/src/lib.rs b/crates/wzp-client/src/lib.rs index ff6d598..b872242 100644 --- a/crates/wzp-client/src/lib.rs +++ b/crates/wzp-client/src/lib.rs @@ -1,11 +1,11 @@ //! WarzonePhone Client Library //! -//! Client-side pipeline: -//! mic → encode → FEC → encrypt → send / recv → decrypt → FEC decode → decode → speaker +//! End-to-end voice call pipeline: +//! - **Send**: mic → encode (Opus/Codec2) → FEC → encrypt → QUIC DATAGRAM +//! - **Recv**: QUIC DATAGRAM → decrypt → FEC decode → jitter buffer → decode → speaker //! -//! Targets: -//! - Android (via JNI/uniffi) -//! - Windows desktop -//! - macOS/Linux (testing) -//! -//! Built after the 5 agent crates (proto, codec, fec, crypto, transport) are complete. +//! Targets: Android (JNI), Windows desktop, macOS/Linux (testing) + +pub mod call; + +pub use call::{CallConfig, CallDecoder, CallEncoder}; diff --git a/crates/wzp-relay/Cargo.toml b/crates/wzp-relay/Cargo.toml index 790928d..c4896cc 100644 --- a/crates/wzp-relay/Cargo.toml +++ b/crates/wzp-relay/Cargo.toml @@ -15,5 +15,14 @@ wzp-transport = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +serde = { workspace = true } +toml = "0.8" +anyhow = "1" + +[[bin]] +name = "wzp-relay" +path = "src/main.rs" [dev-dependencies] diff --git a/crates/wzp-relay/src/config.rs b/crates/wzp-relay/src/config.rs new file mode 100644 index 0000000..468d2b9 --- /dev/null +++ b/crates/wzp-relay/src/config.rs @@ -0,0 +1,35 @@ +//! Relay daemon configuration. + +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; + +/// Configuration for the relay daemon. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RelayConfig { + /// Address to listen on for incoming connections (client-facing). + pub listen_addr: SocketAddr, + /// Address of the remote relay (for the lossy inter-relay link). + /// If None, this relay is the destination-side relay. + pub remote_relay: Option, + /// Maximum concurrent sessions. + pub max_sessions: usize, + /// Jitter buffer target depth in packets. + pub jitter_target_depth: usize, + /// Jitter buffer maximum depth in packets. + pub jitter_max_depth: usize, + /// Logging level (trace, debug, info, warn, error). + pub log_level: String, +} + +impl Default for RelayConfig { + fn default() -> Self { + Self { + listen_addr: "0.0.0.0:4433".parse().unwrap(), + remote_relay: None, + max_sessions: 100, + jitter_target_depth: 50, + jitter_max_depth: 250, + log_level: "info".to_string(), + } + } +} diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index eea8a85..1f197b7 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -1,6 +1,16 @@ //! WarzonePhone Relay Daemon //! //! Integration crate that wires together all layers into a relay pipeline: -//! recv → decrypt → FEC decode → jitter → FEC encode → encrypt → send +//! recv → FEC decode → jitter buffer → FEC encode → send //! -//! Built after the 5 agent crates (proto, codec, fec, crypto, transport) are complete. +//! The relay forwards media between two QUIC endpoints without decoding audio. +//! It operates on FEC-protected packets, managing loss recovery and adaptive +//! quality transitions. + +pub mod config; +pub mod pipeline; +pub mod session_mgr; + +pub use config::RelayConfig; +pub use pipeline::{PipelineConfig, PipelineStats, RelayPipeline}; +pub use session_mgr::{RelaySession, SessionId, SessionManager}; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs new file mode 100644 index 0000000..bcf8a65 --- /dev/null +++ b/crates/wzp-relay/src/main.rs @@ -0,0 +1,66 @@ +//! WarzonePhone relay daemon entry point. + +use std::sync::Arc; + +use tokio::sync::Mutex; +use tracing::{error, info}; + +use wzp_proto::MediaTransport; +use wzp_relay::config::RelayConfig; +use wzp_relay::session_mgr::SessionManager; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let config = RelayConfig::default(); + + tracing_subscriber::fmt().init(); + + info!(addr = %config.listen_addr, "WarzonePhone relay starting"); + + let (server_config, _cert_der) = wzp_transport::server_config(); + let endpoint = + wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?; + + let sessions = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); + + info!("Listening for connections..."); + + loop { + let connection = match wzp_transport::accept(&endpoint).await { + Ok(conn) => conn, + Err(e) => { + error!("accept error: {e}"); + continue; + } + }; + + let _sessions = sessions.clone(); + + tokio::spawn(async move { + let remote = connection.remote_address(); + info!(%remote, "new connection"); + + let transport = wzp_transport::QuinnTransport::new(connection); + + loop { + match transport.recv_media().await { + Ok(Some(packet)) => { + tracing::trace!( + seq = packet.header.seq, + block = packet.header.fec_block, + "received media packet" + ); + } + Ok(None) => { + info!(%remote, "connection closed"); + break; + } + Err(e) => { + error!(%remote, "recv error: {e}"); + break; + } + } + } + }); + } +} diff --git a/crates/wzp-relay/src/pipeline.rs b/crates/wzp-relay/src/pipeline.rs new file mode 100644 index 0000000..a4f87d6 --- /dev/null +++ b/crates/wzp-relay/src/pipeline.rs @@ -0,0 +1,302 @@ +//! Media processing pipeline for the relay. +//! +//! The relay pipeline processes media packets in both directions: +//! - **Inbound** (from client/upstream): recv → decrypt → FEC decode → jitter buffer +//! - **Outbound** (to downstream/remote): jitter pop → FEC encode → encrypt → send +//! +//! The relay does NOT decode/re-encode audio — it operates on encrypted, +//! FEC-protected packets. The crypto and FEC layers are the relay's concern; +//! the actual audio codec is end-to-end between client and destination. + +use tracing::{debug, info}; + +use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder}; +use wzp_proto::jitter::{JitterBuffer, PlayoutResult}; +use wzp_proto::packet::{MediaHeader, MediaPacket}; +use wzp_proto::quality::AdaptiveQualityController; +use wzp_proto::traits::{FecDecoder, FecEncoder, QualityController}; +use wzp_proto::QualityProfile; + +/// Configuration for a relay pipeline instance. +pub struct PipelineConfig { + pub initial_profile: QualityProfile, + pub jitter_target: usize, + pub jitter_max: usize, + pub jitter_min: usize, +} + +impl Default for PipelineConfig { + fn default() -> Self { + Self { + initial_profile: QualityProfile::GOOD, + jitter_target: 50, + jitter_max: 250, + jitter_min: 25, + } + } +} + +/// A relay media pipeline for one direction of a call session. +/// +/// Each call has two pipelines: client→destination and destination→client. +pub struct RelayPipeline { + /// FEC encoder for outbound packets. + fec_encoder: RaptorQFecEncoder, + /// FEC decoder for inbound packets. + fec_decoder: RaptorQFecDecoder, + /// Jitter buffer for reordering and smoothing. + jitter: JitterBuffer, + /// Adaptive quality controller. + quality: AdaptiveQualityController, + /// Current quality profile. + profile: QualityProfile, + /// Outbound sequence counter. + out_seq: u16, + /// Packets processed count. + stats: PipelineStats, +} + +/// Pipeline statistics. +#[derive(Clone, Debug, Default)] +pub struct PipelineStats { + pub packets_received: u64, + pub packets_forwarded: u64, + pub packets_fec_recovered: u64, + pub packets_lost: u64, + pub profile_changes: u64, +} + +impl RelayPipeline { + /// Create a new relay pipeline with the given configuration. + pub fn new(config: PipelineConfig) -> Self { + let (fec_enc, fec_dec) = wzp_fec::create_fec_pair(&config.initial_profile); + + Self { + fec_encoder: fec_enc, + fec_decoder: fec_dec, + jitter: JitterBuffer::new(config.jitter_target, config.jitter_max, config.jitter_min), + quality: AdaptiveQualityController::new(), + profile: config.initial_profile, + out_seq: 0, + stats: PipelineStats::default(), + } + } + + /// Process an incoming media packet from the upstream side. + /// + /// The packet is fed into the FEC decoder and jitter buffer. + /// Returns decoded packets ready for forwarding (if any). + pub fn ingest(&mut self, packet: MediaPacket) -> Vec { + self.stats.packets_received += 1; + + // Feed quality report if present + if let Some(ref qr) = packet.quality_report { + if let Some(new_profile) = self.quality.observe(qr) { + info!( + tier = ?self.quality.tier(), + codec = ?new_profile.codec, + fec_ratio = new_profile.fec_ratio, + "quality tier change" + ); + self.profile = new_profile; + self.stats.profile_changes += 1; + // Reconfigure FEC for new profile + let (enc, dec) = wzp_fec::create_fec_pair(&new_profile); + self.fec_encoder = enc; + self.fec_decoder = dec; + } + } + + // Feed packet into FEC decoder + let header = &packet.header; + let _ = self.fec_decoder.add_symbol( + header.fec_block, + header.fec_symbol, + header.is_repair, + &packet.payload, + ); + + // Try to decode the FEC block + let mut output = Vec::new(); + if let Ok(Some(frames)) = self.fec_decoder.try_decode(header.fec_block) { + debug!( + block = header.fec_block, + frames = frames.len(), + "FEC block decoded" + ); + // Each recovered frame becomes a media packet for the jitter buffer + for (i, frame) in frames.into_iter().enumerate() { + let reconstructed = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: false, + codec_id: header.codec_id, + has_quality_report: false, + fec_ratio_encoded: header.fec_ratio_encoded, + // Reconstruct seq from block + symbol index + seq: (header.fec_block as u16) + .wrapping_mul(self.profile.frames_per_block as u16) + .wrapping_add(i as u16), + timestamp: header + .timestamp + .wrapping_add((i as u32) * (header.codec_id.frame_duration_ms() as u32)), + fec_block: header.fec_block, + fec_symbol: i as u8, + reserved: 0, + csrc_count: 0, + }, + payload: bytes::Bytes::from(frame), + quality_report: None, + }; + self.jitter.push(reconstructed); + } + } + + // Pop from jitter buffer + loop { + match self.jitter.pop() { + PlayoutResult::Packet(pkt) => { + self.stats.packets_forwarded += 1; + output.push(pkt); + } + PlayoutResult::Missing { seq } => { + self.stats.packets_lost += 1; + debug!(seq, "jitter buffer: missing packet"); + // Continue popping — the next packet might be available + } + PlayoutResult::NotReady => break, + } + } + + output + } + + /// Prepare a packet for outbound transmission. + /// + /// Adds FEC encoding and assigns a new sequence number. + pub fn prepare_outbound(&mut self, mut packet: MediaPacket) -> Vec { + // Assign outbound sequence number + packet.header.seq = self.out_seq; + self.out_seq = self.out_seq.wrapping_add(1); + + let mut output = vec![packet.clone()]; + + // Add to FEC encoder + let _ = self.fec_encoder.add_source_symbol(&packet.payload); + + // Check if block is full + if self.fec_encoder.current_block_size() >= self.profile.frames_per_block as usize { + // Generate repair packets + if let Ok(repairs) = self.fec_encoder.generate_repair(self.profile.fec_ratio) { + for (sym_idx, repair_data) in repairs { + let repair_packet = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: true, + codec_id: packet.header.codec_id, + has_quality_report: false, + fec_ratio_encoded: MediaHeader::encode_fec_ratio( + self.profile.fec_ratio, + ), + seq: self.out_seq, + timestamp: packet.header.timestamp, + fec_block: self.fec_encoder.current_block_id(), + fec_symbol: sym_idx, + reserved: 0, + csrc_count: 0, + }, + payload: bytes::Bytes::from(repair_data), + quality_report: None, + }; + self.out_seq = self.out_seq.wrapping_add(1); + output.push(repair_packet); + } + } + let _ = self.fec_encoder.finalize_block(); + } + + output + } + + /// Get current pipeline statistics. + pub fn stats(&self) -> &PipelineStats { + &self.stats + } + + /// Get current quality profile. + pub fn profile(&self) -> QualityProfile { + self.profile + } +} + +#[cfg(test)] +mod tests { + use super::*; + use wzp_proto::CodecId; + use bytes::Bytes; + + fn make_media_packet(seq: u16, block: u8, symbol: u8) -> MediaPacket { + MediaPacket { + header: MediaHeader { + version: 0, + is_repair: false, + codec_id: CodecId::Opus24k, + has_quality_report: false, + fec_ratio_encoded: 0, + seq, + timestamp: seq as u32 * 20, + fec_block: block, + fec_symbol: symbol, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(vec![seq as u8; 60]), + quality_report: None, + } + } + + #[test] + fn pipeline_creates_successfully() { + let pipeline = RelayPipeline::new(PipelineConfig::default()); + assert_eq!(pipeline.profile().codec, CodecId::Opus24k); + } + + #[test] + fn prepare_outbound_assigns_seq() { + let mut pipeline = RelayPipeline::new(PipelineConfig::default()); + let pkt = make_media_packet(0, 0, 0); + let out = pipeline.prepare_outbound(pkt); + assert!(!out.is_empty()); + assert_eq!(out[0].header.seq, 0); + + let pkt2 = make_media_packet(1, 0, 1); + let out2 = pipeline.prepare_outbound(pkt2); + assert_eq!(out2[0].header.seq, 1); + } + + #[test] + fn prepare_outbound_generates_repair_on_full_block() { + let mut pipeline = RelayPipeline::new(PipelineConfig { + initial_profile: QualityProfile::GOOD, // 5 frames/block, 20% FEC + ..Default::default() + }); + + // Feed 5 packets (one full block) + let mut total_out = 0; + for i in 0..5u16 { + let pkt = make_media_packet(i, 0, i as u8); + let out = pipeline.prepare_outbound(pkt); + total_out += out.len(); + } + // Should have 5 source + at least 1 repair packet + assert!(total_out > 5, "expected repair packets, got {total_out}"); + } + + #[test] + fn stats_track_packets() { + let mut pipeline = RelayPipeline::new(PipelineConfig::default()); + let pkt = make_media_packet(0, 0, 0); + pipeline.ingest(pkt); + assert_eq!(pipeline.stats().packets_received, 1); + } +} diff --git a/crates/wzp-relay/src/session_mgr.rs b/crates/wzp-relay/src/session_mgr.rs new file mode 100644 index 0000000..e5e059b --- /dev/null +++ b/crates/wzp-relay/src/session_mgr.rs @@ -0,0 +1,138 @@ +//! Session manager — tracks active call sessions on the relay. + +use std::collections::HashMap; + +use wzp_proto::{QualityProfile, Session}; + +use crate::pipeline::{PipelineConfig, RelayPipeline}; + +/// Unique identifier for a relay session. +pub type SessionId = [u8; 16]; + +/// A single active call session on the relay. +pub struct RelaySession { + /// Protocol session state machine. + pub state: Session, + /// Pipeline for upstream → downstream direction. + pub upstream_pipeline: RelayPipeline, + /// Pipeline for downstream → upstream direction. + pub downstream_pipeline: RelayPipeline, + /// Quality profile currently in use. + pub profile: QualityProfile, + /// Timestamp of last activity (ms since epoch). + pub last_activity_ms: u64, +} + +impl RelaySession { + pub fn new(session_id: SessionId, config: PipelineConfig) -> Self { + let profile = config.initial_profile; + Self { + state: Session::new(session_id), + upstream_pipeline: RelayPipeline::new(PipelineConfig { + initial_profile: profile, + ..config + }), + downstream_pipeline: RelayPipeline::new(PipelineConfig { + initial_profile: profile, + ..config + }), + profile, + last_activity_ms: 0, + } + } + + pub fn is_active(&self) -> bool { + self.state.is_media_active() + } +} + +/// Manages all active sessions on a relay. +pub struct SessionManager { + sessions: HashMap, + max_sessions: usize, +} + +impl SessionManager { + pub fn new(max_sessions: usize) -> Self { + Self { + sessions: HashMap::new(), + max_sessions, + } + } + + /// Create a new session. Returns None if at capacity. + pub fn create_session( + &mut self, + session_id: SessionId, + config: PipelineConfig, + ) -> Option<&mut RelaySession> { + if self.sessions.len() >= self.max_sessions { + return None; + } + self.sessions + .entry(session_id) + .or_insert_with(|| RelaySession::new(session_id, config)); + self.sessions.get_mut(&session_id) + } + + /// Get a session by ID. + pub fn get_session(&mut self, id: &SessionId) -> Option<&mut RelaySession> { + self.sessions.get_mut(id) + } + + /// Remove a session. + pub fn remove_session(&mut self, id: &SessionId) -> Option { + self.sessions.remove(id) + } + + /// Number of active sessions. + pub fn active_count(&self) -> usize { + self.sessions.values().filter(|s| s.is_active()).count() + } + + /// Total sessions (including inactive/closing). + pub fn total_count(&self) -> usize { + self.sessions.len() + } + + /// Remove sessions idle for longer than `timeout_ms`. + pub fn expire_idle(&mut self, now_ms: u64, timeout_ms: u64) -> usize { + let before = self.sessions.len(); + self.sessions + .retain(|_, s| now_ms.saturating_sub(s.last_activity_ms) < timeout_ms); + before - self.sessions.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_and_get_session() { + let mut mgr = SessionManager::new(10); + let id = [1u8; 16]; + mgr.create_session(id, PipelineConfig::default()); + assert_eq!(mgr.total_count(), 1); + assert!(mgr.get_session(&id).is_some()); + } + + #[test] + fn respects_max_sessions() { + let mut mgr = SessionManager::new(1); + mgr.create_session([1u8; 16], PipelineConfig::default()); + let result = mgr.create_session([2u8; 16], PipelineConfig::default()); + assert!(result.is_none()); + } + + #[test] + fn expire_idle_removes_old() { + let mut mgr = SessionManager::new(10); + let id = [1u8; 16]; + mgr.create_session(id, PipelineConfig::default()); + // Session has last_activity_ms = 0, current time = 60000, timeout = 30000 + let expired = mgr.expire_idle(60_000, 30_000); + assert_eq!(expired, 1); + assert_eq!(mgr.total_count(), 0); + } +}