feat: IAX2-inspired protocol improvements — trunking, mini-frames, silence suppression, call control (P2-T6/T7/T8/T9)

WZP-P2-T6: Trunking
- TrunkFrame/TrunkEntry: pack N session packets into one datagram
- Wire format: [count:u16][session_id:2][len:u16][payload]...
- TrunkBatcher: batches by count (10) or bytes (1200), flushes on limit
- 5 tests: encode/decode roundtrip, empty frame, batcher fill/flush, byte limit

WZP-P2-T7: Mini-frames
- MiniHeader: 4-byte delta header (timestamp_delta + payload_len)
- FRAME_TYPE_FULL (0x00) / FRAME_TYPE_MINI (0x01) discriminator
- MiniFrameContext: expands mini-headers to full by tracking baseline
- Saves 8 bytes per packet (5 vs 13 bytes with type prefix)
- 5 tests: encode/decode, wire size, context expand, no baseline, size comparison

WZP-P2-T8: Silence suppression
- SilenceDetector: RMS-based detection with hangover (5 frames = 100ms)
- ComfortNoise: low-level random noise generator
- CodecId::ComfortNoise variant for CN packets
- CallEncoder: suppresses silent frames, sends 1-byte CN every 200ms
- CallDecoder: generates comfort noise on CN packets
- ~50% bandwidth savings in typical conversations
- 6 tests: silence/speech detection, hangover, CN generation, RMS math, suppression

WZP-P2-T9: Call control signals
- SignalMessage: Hold, Unhold, Mute, Unmute, Transfer, TransferAck
- CallSignalType mapping in featherchat.rs for all new variants
- 4 serde roundtrip tests + signal type mapping tests

255 tests passing across all crates.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-28 14:13:05 +04:00
parent a64b79d953
commit 34cd1017c1
14 changed files with 1077 additions and 2 deletions

2
Cargo.lock generated
View File

@@ -3820,6 +3820,7 @@ version = "0.1.0"
dependencies = [
"audiopus",
"codec2",
"rand 0.8.5",
"tracing",
"wzp-proto",
]
@@ -3863,6 +3864,7 @@ dependencies = [
"async-trait",
"bytes",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tracing",

View File

@@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
use bytes::Bytes;
use tracing::{debug, info, warn};
use wzp_codec::{ComfortNoise, SilenceDetector};
use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder};
use wzp_proto::jitter::{JitterBuffer, PlayoutResult};
use wzp_proto::packet::{MediaHeader, MediaPacket};
@@ -15,7 +16,7 @@ use wzp_proto::traits::{
AudioDecoder, AudioEncoder, FecDecoder, FecEncoder,
};
use wzp_proto::packet::QualityReport;
use wzp_proto::QualityProfile;
use wzp_proto::{CodecId, QualityProfile};
/// Configuration for a call session.
pub struct CallConfig {
@@ -27,6 +28,14 @@ pub struct CallConfig {
pub jitter_max: usize,
/// Jitter buffer min depth before playout.
pub jitter_min: usize,
/// Enable silence suppression (default: true).
pub suppression_enabled: bool,
/// RMS threshold for silence detection (default: 100.0 for i16 PCM).
pub silence_threshold_rms: f64,
/// Hangover frames before suppression begins (default: 5 = 100ms at 20ms frames).
pub silence_hangover_frames: u32,
/// Comfort noise amplitude (default: 50).
pub comfort_noise_level: i16,
}
impl Default for CallConfig {
@@ -36,6 +45,10 @@ impl Default for CallConfig {
jitter_target: 10,
jitter_max: 250,
jitter_min: 3, // 60ms — low latency start, still smooths jitter
suppression_enabled: true,
silence_threshold_rms: 100.0,
silence_hangover_frames: 5,
comfort_noise_level: 50,
}
}
}
@@ -58,6 +71,7 @@ impl CallConfig {
jitter_target,
jitter_max,
jitter_min,
..Default::default()
}
}
}
@@ -179,6 +193,18 @@ pub struct CallEncoder {
frame_in_block: u8,
/// Timestamp counter (ms).
timestamp_ms: u32,
/// Silence detector for suppression.
silence_detector: SilenceDetector,
/// Comfort noise generator for CN packets.
comfort_noise: ComfortNoise,
/// Whether silence suppression is enabled.
suppression_enabled: bool,
/// Total frames suppressed (telemetry).
frames_suppressed: u64,
/// Frames since last CN packet was sent.
cn_counter: u32,
/// Comfort noise amplitude level (stored for CN packet payload).
cn_level: i16,
}
impl CallEncoder {
@@ -191,6 +217,15 @@ impl CallEncoder {
block_id: 0,
frame_in_block: 0,
timestamp_ms: 0,
silence_detector: SilenceDetector::new(
config.silence_threshold_rms,
config.silence_hangover_frames,
),
comfort_noise: ComfortNoise::new(config.comfort_noise_level),
suppression_enabled: config.suppression_enabled,
frames_suppressed: 0,
cn_counter: 0,
cn_level: config.comfort_noise_level,
}
}
@@ -199,6 +234,45 @@ impl CallEncoder {
/// Input: 48kHz mono PCM, frame size depends on profile (960 for 20ms, 1920 for 40ms).
/// Output: one or more MediaPackets to send.
pub fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<MediaPacket>, anyhow::Error> {
// Silence suppression: skip encoding silent frames, periodically send CN.
if self.suppression_enabled && self.silence_detector.is_silent(pcm) {
self.frames_suppressed += 1;
self.cn_counter += 1;
// Advance timestamp even for suppressed frames.
self.timestamp_ms = self
.timestamp_ms
.wrapping_add(self.profile.frame_duration_ms as u32);
// Every 10 frames (~200ms), send a comfort noise packet.
if self.cn_counter % 10 == 0 {
let cn_pkt = MediaPacket {
header: MediaHeader {
version: 0,
is_repair: false,
codec_id: CodecId::ComfortNoise,
has_quality_report: false,
fec_ratio_encoded: 0,
seq: self.seq,
timestamp: self.timestamp_ms,
fec_block: self.block_id,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
},
payload: Bytes::from(vec![self.cn_level as u8]),
quality_report: None,
};
self.seq = self.seq.wrapping_add(1);
return Ok(vec![cn_pkt]);
}
return Ok(vec![]);
}
// Not silent — reset CN counter and proceed with normal encoding.
self.cn_counter = 0;
// Encode audio
let mut encoded = vec![0u8; self.audio_enc.max_frame_bytes()];
let enc_len = self.audio_enc.encode(pcm, &mut encoded)?;
@@ -290,6 +364,10 @@ pub struct CallDecoder {
pub quality: AdaptiveQualityController,
/// Current profile.
profile: QualityProfile,
/// Comfort noise generator for filling silent gaps.
comfort_noise: ComfortNoise,
/// Whether the last decoded frame was comfort noise.
last_was_cn: bool,
}
impl CallDecoder {
@@ -300,6 +378,8 @@ impl CallDecoder {
jitter: JitterBuffer::new(config.jitter_target, config.jitter_max, config.jitter_min),
quality: AdaptiveQualityController::new(),
profile: config.profile,
comfort_noise: ComfortNoise::new(50),
last_was_cn: false,
}
}
@@ -325,6 +405,15 @@ impl CallDecoder {
pub fn decode_next(&mut self, pcm: &mut [i16]) -> Option<usize> {
match self.jitter.pop() {
PlayoutResult::Packet(pkt) => {
// Comfort noise packet: generate CN instead of decoding audio.
if pkt.header.codec_id == CodecId::ComfortNoise {
self.comfort_noise.generate(pcm);
self.last_was_cn = true;
self.jitter.record_decode();
return Some(pcm.len());
}
self.last_was_cn = false;
let result = match self.audio_dec.decode(&pkt.payload, pcm) {
Ok(n) => Some(n),
Err(e) => {
@@ -715,4 +804,49 @@ mod tests {
let logged = telemetry.maybe_log(&stats);
assert!(!logged, "should not log before interval elapses");
}
#[test]
fn silence_suppression_skips_silent_frames() {
let config = CallConfig {
suppression_enabled: true,
silence_threshold_rms: 100.0,
silence_hangover_frames: 5,
comfort_noise_level: 50,
..Default::default()
};
let mut enc = CallEncoder::new(&config);
let silence = vec![0i16; 960];
let mut total_packets = 0;
let mut cn_packets = 0;
for _ in 0..20 {
let packets = enc.encode_frame(&silence).unwrap();
for p in &packets {
if p.header.codec_id == CodecId::ComfortNoise {
cn_packets += 1;
// CN payload should be a single byte with the noise level.
assert_eq!(p.payload.len(), 1);
}
}
total_packets += packets.len();
}
// First 5 frames are hangover (not suppressed) => 5 normal source packets
// (plus potential repair packets from FEC block completion).
// Remaining 15 frames are suppressed; CN every 10 frames => 1 CN packet
// (cn_counter hits 10 on the 10th suppressed frame).
assert!(
total_packets < 20,
"suppression should reduce packet count, got {total_packets}"
);
assert!(
cn_packets >= 1,
"should have at least one CN packet, got {cn_packets}"
);
assert!(
enc.frames_suppressed > 0,
"frames_suppressed should be > 0"
);
}
}

View File

@@ -26,6 +26,11 @@ pub enum CallSignalType {
Reject,
Ringing,
Busy,
Hold,
Unhold,
Mute,
Unmute,
Transfer,
}
/// A CallSignal as sent through featherChat's WireMessage.
@@ -96,6 +101,12 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::QualityUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::Ping { .. } | SignalMessage::Pong { .. } => CallSignalType::Offer,
SignalMessage::AuthToken { .. } => CallSignalType::Offer,
SignalMessage::Hold => CallSignalType::Hold,
SignalMessage::Unhold => CallSignalType::Unhold,
SignalMessage::Mute => CallSignalType::Mute,
SignalMessage::Unmute => CallSignalType::Unmute,
SignalMessage::Transfer { .. } => CallSignalType::Transfer,
SignalMessage::TransferAck => CallSignalType::Offer, // reuse
}
}
@@ -134,5 +145,16 @@ mod tests {
reason: wzp_proto::HangupReason::Normal,
};
assert!(matches!(signal_to_call_type(&hangup), CallSignalType::Hangup));
assert!(matches!(signal_to_call_type(&SignalMessage::Hold), CallSignalType::Hold));
assert!(matches!(signal_to_call_type(&SignalMessage::Unhold), CallSignalType::Unhold));
assert!(matches!(signal_to_call_type(&SignalMessage::Mute), CallSignalType::Mute));
assert!(matches!(signal_to_call_type(&SignalMessage::Unmute), CallSignalType::Unmute));
let transfer = SignalMessage::Transfer {
target_fingerprint: "abc".to_string(),
relay_addr: None,
};
assert!(matches!(signal_to_call_type(&transfer), CallSignalType::Transfer));
}
}

View File

@@ -89,6 +89,7 @@ pub fn run_local_sweep(config: &SweepConfig) -> Vec<SweepResult> {
jitter_target: target,
jitter_max: max,
jitter_min: target.min(3).max(1),
..Default::default()
};
let mut encoder = CallEncoder::new(&call_cfg);

View File

@@ -16,4 +16,7 @@ audiopus = { workspace = true }
# Pure-Rust Codec2 implementation
codec2 = { workspace = true }
# RNG for comfort noise generation
rand = { workspace = true }
[dev-dependencies]

View File

@@ -15,8 +15,10 @@ pub mod codec2_enc;
pub mod opus_dec;
pub mod opus_enc;
pub mod resample;
pub mod silence;
pub use adaptive::{AdaptiveDecoder, AdaptiveEncoder};
pub use silence::{ComfortNoise, SilenceDetector};
pub use wzp_proto::{AudioDecoder, AudioEncoder, CodecId, QualityProfile};
/// Create an adaptive encoder starting at the given quality profile.

View File

@@ -0,0 +1,191 @@
//! Silence suppression and comfort noise generation.
//!
//! During silent periods (~50% of a typical call), full encoded frames waste
//! bandwidth. [`SilenceDetector`] detects silent audio based on RMS energy,
//! and [`ComfortNoise`] generates low-level background noise to fill gaps on
//! the decoder side.
use rand::Rng;
/// Detects silence in PCM audio using RMS energy with a hangover period.
///
/// The hangover prevents clipping the onset of speech: after silence is first
/// detected, the detector continues reporting "not silent" for `hangover_frames`
/// additional frames before transitioning to suppression.
pub struct SilenceDetector {
/// RMS threshold below which audio is considered silent (for i16 samples).
threshold_rms: f64,
/// Number of frames to keep sending after silence starts (prevents speech clipping).
hangover_frames: u32,
/// Count of consecutive frames whose RMS is below the threshold.
silent_frames: u32,
/// Whether suppression is currently active.
is_suppressing: bool,
}
impl SilenceDetector {
/// Create a new silence detector.
///
/// * `threshold_rms` — RMS energy below which a frame is silent (default: 100.0 for i16).
/// * `hangover_frames` — frames to keep sending after silence onset (default: 5 = 100ms at 20ms frames).
pub fn new(threshold_rms: f64, hangover_frames: u32) -> Self {
Self {
threshold_rms,
hangover_frames,
silent_frames: 0,
is_suppressing: false,
}
}
/// Compute the RMS (root mean square) energy of a PCM buffer.
pub fn rms(pcm: &[i16]) -> f64 {
if pcm.is_empty() {
return 0.0;
}
let sum_sq: f64 = pcm.iter().map(|&s| (s as f64) * (s as f64)).sum();
(sum_sq / pcm.len() as f64).sqrt()
}
/// Returns `true` if the frame should be suppressed (i.e. is silence past
/// the hangover period).
///
/// Call once per frame. The detector tracks consecutive silent frames
/// internally and only reports suppression after the hangover expires.
pub fn is_silent(&mut self, pcm: &[i16]) -> bool {
let energy = Self::rms(pcm);
if energy < self.threshold_rms {
self.silent_frames = self.silent_frames.saturating_add(1);
if self.silent_frames > self.hangover_frames {
self.is_suppressing = true;
}
} else {
// Speech detected — reset.
self.silent_frames = 0;
self.is_suppressing = false;
}
self.is_suppressing
}
/// Whether the detector is currently in the suppressing state.
pub fn suppressing(&self) -> bool {
self.is_suppressing
}
}
/// Generates low-level comfort noise to fill silent periods.
///
/// When the decoder receives a comfort-noise descriptor (or detects a gap
/// caused by silence suppression), it uses this to produce a natural-sounding
/// background hiss instead of dead silence.
pub struct ComfortNoise {
/// Peak amplitude of the generated noise (default: 50).
level: i16,
}
impl ComfortNoise {
/// Create a comfort noise generator with the given amplitude level.
pub fn new(level: i16) -> Self {
Self { level }
}
/// Fill `pcm` with low-level random noise in the range `[-level, level]`.
pub fn generate(&self, pcm: &mut [i16]) {
let mut rng = rand::thread_rng();
for sample in pcm.iter_mut() {
*sample = rng.gen_range(-self.level..=self.level);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn silence_detector_detects_silence() {
let mut det = SilenceDetector::new(100.0, 5);
let silence = vec![0i16; 960];
// First 5 frames are hangover — should NOT suppress yet.
for _ in 0..5 {
assert!(!det.is_silent(&silence));
}
// Frame 6 onward: past hangover, should suppress.
assert!(det.is_silent(&silence));
assert!(det.is_silent(&silence));
}
#[test]
fn silence_detector_detects_speech() {
let mut det = SilenceDetector::new(100.0, 5);
// Generate a 1kHz sine wave at decent amplitude.
let pcm: Vec<i16> = (0..960)
.map(|i| {
let t = i as f64 / 48000.0;
(10000.0 * (2.0 * std::f64::consts::PI * 1000.0 * t).sin()) as i16
})
.collect();
// Should never report silent.
for _ in 0..20 {
assert!(!det.is_silent(&pcm));
}
}
#[test]
fn silence_detector_hangover() {
let mut det = SilenceDetector::new(100.0, 3);
let silence = vec![0i16; 960];
let speech: Vec<i16> = (0..960)
.map(|i| {
let t = i as f64 / 48000.0;
(5000.0 * (2.0 * std::f64::consts::PI * 440.0 * t).sin()) as i16
})
.collect();
// Feed silence past hangover to enter suppression.
for _ in 0..4 {
det.is_silent(&silence);
}
assert!(det.is_silent(&silence), "should be suppressing after hangover");
// Speech arrives — should immediately stop suppressing.
assert!(!det.is_silent(&speech));
assert!(!det.is_silent(&speech));
}
#[test]
fn comfort_noise_generates_nonzero() {
let cn = ComfortNoise::new(50);
let mut pcm = vec![0i16; 960];
cn.generate(&mut pcm);
// At least some samples should be non-zero.
assert!(pcm.iter().any(|&s| s != 0), "CN output should not be all zeros");
// All samples should be within [-50, 50].
assert!(pcm.iter().all(|&s| s.abs() <= 50), "CN samples out of range");
}
#[test]
fn rms_calculation() {
// All zeros → RMS 0.
assert_eq!(SilenceDetector::rms(&[0i16; 100]), 0.0);
// Constant value: RMS of [v, v, v, ...] = |v|.
let pcm = vec![100i16; 100];
let rms = SilenceDetector::rms(&pcm);
assert!((rms - 100.0).abs() < 0.01, "RMS of constant 100 should be 100, got {rms}");
// Known pattern: [3, 4] → sqrt((9+16)/2) = sqrt(12.5) ≈ 3.5355
let rms2 = SilenceDetector::rms(&[3, 4]);
assert!((rms2 - 3.5355).abs() < 0.01, "RMS of [3,4] should be ~3.5355, got {rms2}");
// Empty buffer → 0.
assert_eq!(SilenceDetector::rms(&[]), 0.0);
}
}

View File

@@ -19,3 +19,4 @@ tracing = "0.1"
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
serde_json = "1"

View File

@@ -16,6 +16,8 @@ pub enum CodecId {
Codec2_3200 = 3,
/// Codec2 at 1200bps (catastrophic conditions)
Codec2_1200 = 4,
/// Comfort noise descriptor (silence suppression)
ComfortNoise = 5,
}
impl CodecId {
@@ -27,6 +29,7 @@ impl CodecId {
Self::Opus6k => 6_000,
Self::Codec2_3200 => 3_200,
Self::Codec2_1200 => 1_200,
Self::ComfortNoise => 0,
}
}
@@ -38,6 +41,7 @@ impl CodecId {
Self::Opus6k => 40,
Self::Codec2_3200 => 20,
Self::Codec2_1200 => 40,
Self::ComfortNoise => 20,
}
}
@@ -46,6 +50,7 @@ impl CodecId {
match self {
Self::Opus24k | Self::Opus16k | Self::Opus6k => 48_000,
Self::Codec2_3200 | Self::Codec2_1200 => 8_000,
Self::ComfortNoise => 48_000,
}
}
@@ -57,6 +62,7 @@ impl CodecId {
2 => Some(Self::Opus6k),
3 => Some(Self::Codec2_3200),
4 => Some(Self::Codec2_1200),
5 => Some(Self::ComfortNoise),
_ => None,
}
}

View File

@@ -23,7 +23,10 @@ pub mod traits;
// Re-export key types at crate root for convenience.
pub use codec_id::{CodecId, QualityProfile};
pub use error::*;
pub use packet::{HangupReason, MediaHeader, MediaPacket, QualityReport, SignalMessage};
pub use packet::{
HangupReason, MediaHeader, MediaPacket, MiniFrameContext, MiniHeader, QualityReport,
SignalMessage, TrunkEntry, TrunkFrame, FRAME_TYPE_FULL, FRAME_TYPE_MINI,
};
pub use quality::{AdaptiveQualityController, Tier};
pub use session::{Session, SessionEvent, SessionState};
pub use traits::*;

View File

@@ -241,6 +241,184 @@ impl MediaPacket {
}
}
// ---------------------------------------------------------------------------
// Trunking — multiplex multiple session packets into one QUIC datagram
// ---------------------------------------------------------------------------
/// A single entry inside a [`TrunkFrame`].
#[derive(Clone, Debug)]
pub struct TrunkEntry {
/// 2-byte session identifier (up to 65 536 sessions).
pub session_id: [u8; 2],
/// Encoded MediaPacket payload (already compressed).
pub payload: Bytes,
}
impl TrunkEntry {
/// Per-entry wire overhead: 2 (session_id) + 2 (len).
pub const OVERHEAD: usize = 4;
}
/// A trunked frame carrying multiple session packets in one datagram.
///
/// Wire format:
/// ```text
/// [count:u16] [entry1] [entry2] ...
/// ```
/// Each entry:
/// ```text
/// [session_id:2] [len:u16] [payload:len]
/// ```
#[derive(Clone, Debug)]
pub struct TrunkFrame {
pub packets: Vec<TrunkEntry>,
}
impl TrunkFrame {
/// Create an empty trunk frame.
pub fn new() -> Self {
Self {
packets: Vec::new(),
}
}
/// Append a session packet to the frame.
pub fn push(&mut self, session_id: [u8; 2], payload: Bytes) {
self.packets.push(TrunkEntry {
session_id,
payload,
});
}
/// Number of entries in the frame.
pub fn len(&self) -> usize {
self.packets.len()
}
/// Whether the frame is empty.
pub fn is_empty(&self) -> bool {
self.packets.is_empty()
}
/// Total wire size of the encoded frame.
pub fn wire_size(&self) -> usize {
// 2 bytes for count + each entry
2 + self
.packets
.iter()
.map(|e| TrunkEntry::OVERHEAD + e.payload.len())
.sum::<usize>()
}
/// Encode to wire bytes.
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.wire_size());
buf.put_u16(self.packets.len() as u16);
for entry in &self.packets {
buf.put_slice(&entry.session_id);
buf.put_u16(entry.payload.len() as u16);
buf.put(entry.payload.clone());
}
buf.freeze()
}
/// Decode from wire bytes. Returns `None` on malformed input.
pub fn decode(buf: &[u8]) -> Option<Self> {
if buf.len() < 2 {
return None;
}
let mut cursor = &buf[..];
let count = cursor.get_u16() as usize;
let mut packets = Vec::with_capacity(count);
for _ in 0..count {
if cursor.remaining() < TrunkEntry::OVERHEAD {
return None;
}
let mut session_id = [0u8; 2];
session_id[0] = cursor.get_u8();
session_id[1] = cursor.get_u8();
let len = cursor.get_u16() as usize;
if cursor.remaining() < len {
return None;
}
let payload = Bytes::copy_from_slice(&cursor[..len]);
cursor.advance(len);
packets.push(TrunkEntry {
session_id,
payload,
});
}
Some(Self { packets })
}
}
// ---------------------------------------------------------------------------
// Mini-frames — compact header for steady-state media packets
// ---------------------------------------------------------------------------
/// Frame type tag: full MediaHeader follows.
pub const FRAME_TYPE_FULL: u8 = 0x00;
/// Frame type tag: MiniHeader follows (requires prior baseline).
pub const FRAME_TYPE_MINI: u8 = 0x01;
/// Compact 4-byte header used after a full MediaHeader baseline has been
/// established. Only the timestamp delta and payload length are transmitted;
/// all other fields are inherited from the last full header.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct MiniHeader {
/// Milliseconds elapsed since the last header's timestamp.
pub timestamp_delta_ms: u16,
/// Length of the payload that follows this header.
pub payload_len: u16,
}
impl MiniHeader {
/// Header size in bytes on the wire.
pub const WIRE_SIZE: usize = 4;
/// Serialize to a 4-byte buffer.
pub fn write_to(&self, buf: &mut impl BufMut) {
buf.put_u16(self.timestamp_delta_ms);
buf.put_u16(self.payload_len);
}
/// Deserialize from a buffer. Returns `None` if insufficient data.
pub fn read_from(buf: &mut impl Buf) -> Option<Self> {
if buf.remaining() < Self::WIRE_SIZE {
return None;
}
Some(Self {
timestamp_delta_ms: buf.get_u16(),
payload_len: buf.get_u16(),
})
}
}
/// Stateful context that expands [`MiniHeader`]s back into full
/// [`MediaHeader`]s by tracking the last baseline header.
#[derive(Clone, Debug, Default)]
pub struct MiniFrameContext {
last_header: Option<MediaHeader>,
}
impl MiniFrameContext {
/// Record a full header as the new baseline for subsequent mini-frames.
pub fn update(&mut self, header: &MediaHeader) {
self.last_header = Some(*header);
}
/// Expand a mini-header into a full [`MediaHeader`] using the stored
/// baseline. Returns `None` if no baseline has been set yet.
pub fn expand(&mut self, mini: &MiniHeader) -> Option<MediaHeader> {
let base = self.last_header.as_ref()?;
let mut expanded = *base;
expanded.seq = base.seq.wrapping_add(1);
expanded.timestamp = base.timestamp.wrapping_add(mini.timestamp_delta_ms as u32);
self.last_header = Some(expanded);
Some(expanded)
}
}
/// Signaling messages sent over the reliable QUIC stream.
///
/// Compatible with Warzone messenger's identity model:
@@ -301,6 +479,23 @@ pub enum SignalMessage {
/// featherChat bearer token for relay authentication.
/// Sent as the first signal message when --auth-url is configured.
AuthToken { token: String },
/// Put the call on hold (stop sending media, keep session alive).
Hold,
/// Resume a held call.
Unhold,
/// Mute request from the remote side (server-initiated mute, like IAX2 QUELCH).
Mute,
/// Unmute request from the remote side (like IAX2 UNQUELCH).
Unmute,
/// Transfer the call to another peer.
Transfer {
target_fingerprint: String,
/// Optional relay address for the transfer target.
relay_addr: Option<String>,
},
/// Acknowledge a transfer request.
TransferAck,
}
/// Reasons for ending a call.
@@ -414,6 +609,78 @@ mod tests {
assert_eq!(packet.quality_report, decoded.quality_report);
}
#[test]
fn hold_unhold_serialize() {
let hold = SignalMessage::Hold;
let json = serde_json::to_string(&hold).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(decoded, SignalMessage::Hold));
let unhold = SignalMessage::Unhold;
let json = serde_json::to_string(&unhold).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(decoded, SignalMessage::Unhold));
}
#[test]
fn mute_unmute_serialize() {
let mute = SignalMessage::Mute;
let json = serde_json::to_string(&mute).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(decoded, SignalMessage::Mute));
let unmute = SignalMessage::Unmute;
let json = serde_json::to_string(&unmute).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(decoded, SignalMessage::Unmute));
}
#[test]
fn transfer_serialize() {
let transfer = SignalMessage::Transfer {
target_fingerprint: "abc123".to_string(),
relay_addr: Some("relay.example.com:4433".to_string()),
};
let json = serde_json::to_string(&transfer).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::Transfer {
target_fingerprint,
relay_addr,
} => {
assert_eq!(target_fingerprint, "abc123");
assert_eq!(relay_addr.unwrap(), "relay.example.com:4433");
}
_ => panic!("expected Transfer variant"),
}
// Also test with relay_addr = None
let transfer_no_relay = SignalMessage::Transfer {
target_fingerprint: "def456".to_string(),
relay_addr: None,
};
let json = serde_json::to_string(&transfer_no_relay).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::Transfer {
target_fingerprint,
relay_addr,
} => {
assert_eq!(target_fingerprint, "def456");
assert!(relay_addr.is_none());
}
_ => panic!("expected Transfer variant"),
}
}
#[test]
fn transfer_ack_serialize() {
let ack = SignalMessage::TransferAck;
let json = serde_json::to_string(&ack).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(decoded, SignalMessage::TransferAck));
}
#[test]
fn fec_ratio_encode_decode() {
let ratio = 0.5;
@@ -425,4 +692,150 @@ mod tests {
let encoded_max = MediaHeader::encode_fec_ratio(ratio_max);
assert_eq!(encoded_max, 127);
}
// ---------------------------------------------------------------
// TrunkFrame tests
// ---------------------------------------------------------------
#[test]
fn trunk_frame_encode_decode() {
let mut frame = TrunkFrame::new();
frame.push([0, 1], Bytes::from_static(b"hello"));
frame.push([0, 2], Bytes::from_static(b"world!"));
frame.push([1, 0], Bytes::from_static(b"x"));
assert_eq!(frame.len(), 3);
let encoded = frame.encode();
let decoded = TrunkFrame::decode(&encoded).expect("decode failed");
assert_eq!(decoded.len(), 3);
assert_eq!(decoded.packets[0].session_id, [0, 1]);
assert_eq!(decoded.packets[0].payload, Bytes::from_static(b"hello"));
assert_eq!(decoded.packets[1].session_id, [0, 2]);
assert_eq!(decoded.packets[1].payload, Bytes::from_static(b"world!"));
assert_eq!(decoded.packets[2].session_id, [1, 0]);
assert_eq!(decoded.packets[2].payload, Bytes::from_static(b"x"));
}
#[test]
fn trunk_frame_empty() {
let frame = TrunkFrame::new();
assert!(frame.is_empty());
assert_eq!(frame.len(), 0);
let encoded = frame.encode();
// Just the 2-byte count header with value 0.
assert_eq!(encoded.len(), 2);
assert_eq!(&encoded[..], &[0, 0]);
let decoded = TrunkFrame::decode(&encoded).unwrap();
assert!(decoded.is_empty());
}
#[test]
fn trunk_entry_wire_size() {
// Each entry overhead must be exactly 4 bytes (2 session_id + 2 len).
assert_eq!(TrunkEntry::OVERHEAD, 4);
// Verify empirically: one entry with a 10-byte payload should produce
// 2 (count) + 4 (overhead) + 10 (payload) = 16 bytes total.
let mut frame = TrunkFrame::new();
frame.push([0xAB, 0xCD], Bytes::from(vec![0u8; 10]));
let encoded = frame.encode();
assert_eq!(encoded.len(), 2 + 4 + 10);
}
// ---------------------------------------------------------------
// MiniHeader / MiniFrameContext tests
// ---------------------------------------------------------------
#[test]
fn mini_header_encode_decode() {
let mini = MiniHeader {
timestamp_delta_ms: 20,
payload_len: 160,
};
let mut buf = BytesMut::new();
mini.write_to(&mut buf);
let mut cursor = &buf[..];
let decoded = MiniHeader::read_from(&mut cursor).unwrap();
assert_eq!(mini, decoded);
}
#[test]
fn mini_header_wire_size() {
let mini = MiniHeader {
timestamp_delta_ms: 0xFFFF,
payload_len: 0xFFFF,
};
let mut buf = BytesMut::new();
mini.write_to(&mut buf);
assert_eq!(buf.len(), 4);
assert_eq!(MiniHeader::WIRE_SIZE, 4);
}
#[test]
fn mini_frame_context_expand() {
let baseline = MediaHeader {
version: 0,
is_repair: false,
codec_id: CodecId::Opus24k,
has_quality_report: false,
fec_ratio_encoded: 10,
seq: 100,
timestamp: 1000,
fec_block: 5,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
};
let mut ctx = MiniFrameContext::default();
ctx.update(&baseline);
// First expansion
let mini1 = MiniHeader {
timestamp_delta_ms: 20,
payload_len: 80,
};
let h1 = ctx.expand(&mini1).unwrap();
assert_eq!(h1.seq, 101);
assert_eq!(h1.timestamp, 1020);
assert_eq!(h1.codec_id, CodecId::Opus24k);
assert_eq!(h1.fec_block, 5);
// Second expansion — builds on expanded h1
let mini2 = MiniHeader {
timestamp_delta_ms: 20,
payload_len: 80,
};
let h2 = ctx.expand(&mini2).unwrap();
assert_eq!(h2.seq, 102);
assert_eq!(h2.timestamp, 1040);
}
#[test]
fn mini_frame_context_no_baseline() {
let mut ctx = MiniFrameContext::default();
let mini = MiniHeader {
timestamp_delta_ms: 20,
payload_len: 80,
};
assert!(ctx.expand(&mini).is_none());
}
#[test]
fn full_vs_mini_size_comparison() {
// Full frame on wire: 1 byte type tag + 12 byte MediaHeader = 13
let full_size = 1 + MediaHeader::WIRE_SIZE;
assert_eq!(full_size, 13);
// Mini frame on wire: 1 byte type tag + 4 byte MiniHeader = 5
let mini_size = 1 + MiniHeader::WIRE_SIZE;
assert_eq!(mini_size, 5);
// Verify the constants match expectations
assert_eq!(FRAME_TYPE_FULL, 0x00);
assert_eq!(FRAME_TYPE_MINI, 0x01);
}
}

View File

@@ -15,8 +15,10 @@ pub mod pipeline;
pub mod probe;
pub mod room;
pub mod session_mgr;
pub mod trunk;
pub use config::RelayConfig;
pub use handshake::accept_handshake;
pub use pipeline::{PipelineConfig, PipelineStats, RelayPipeline};
pub use session_mgr::{RelaySession, SessionId, SessionInfo, SessionManager, SessionState};
pub use trunk::TrunkBatcher;

View File

@@ -0,0 +1,152 @@
//! Trunk batching — accumulates media packets from multiple sessions into
//! [`TrunkFrame`]s that fit inside a single QUIC datagram.
use std::time::Duration;
use bytes::Bytes;
use wzp_proto::packet::{TrunkEntry, TrunkFrame};
/// Batches individual session packets into [`TrunkFrame`]s.
///
/// A trunk frame is flushed when any of the following thresholds are hit:
/// - `max_entries` — maximum number of packets per trunk.
/// - `max_bytes` — maximum total wire size (should fit one UDP datagram).
///
/// The caller is responsible for timer-based flushing using [`flush_interval`]
/// and calling [`flush`] when the interval expires.
pub struct TrunkBatcher {
pending: TrunkFrame,
/// Current accumulated wire size of the pending frame.
pending_bytes: usize,
/// Maximum packets per trunk (default 10).
pub max_entries: usize,
/// Maximum total wire bytes per trunk (default 1200, fits in one UDP datagram).
pub max_bytes: usize,
/// Maximum wait before flushing (default 5 ms). Used by the caller for timer scheduling.
pub flush_interval: Duration,
}
impl TrunkBatcher {
/// Header size: the 2-byte count prefix present in every TrunkFrame.
const FRAME_HEADER: usize = 2;
pub fn new() -> Self {
Self {
pending: TrunkFrame::new(),
pending_bytes: Self::FRAME_HEADER,
max_entries: 10,
max_bytes: 1200,
flush_interval: Duration::from_millis(5),
}
}
/// Push a session packet. Returns `Some(frame)` if the batch is now full
/// and was flushed, `None` if more room remains.
pub fn push(&mut self, session_id: [u8; 2], payload: Bytes) -> Option<TrunkFrame> {
let entry_wire = TrunkEntry::OVERHEAD + payload.len();
// If adding this entry would exceed limits, flush first.
if self.should_flush_with(entry_wire) && !self.pending.is_empty() {
let frame = self.take_pending();
// Then start a new batch with this entry.
self.pending.push(session_id, payload);
self.pending_bytes += entry_wire;
return Some(frame);
}
self.pending.push(session_id, payload);
self.pending_bytes += entry_wire;
if self.should_flush() {
Some(self.take_pending())
} else {
None
}
}
/// Flush the current pending frame if non-empty.
pub fn flush(&mut self) -> Option<TrunkFrame> {
if self.pending.is_empty() {
None
} else {
Some(self.take_pending())
}
}
/// Returns `true` if the pending batch has reached `max_entries` or `max_bytes`.
pub fn should_flush(&self) -> bool {
self.pending.len() >= self.max_entries || self.pending_bytes >= self.max_bytes
}
// --- private helpers ---
/// Would adding `extra_bytes` exceed a threshold?
fn should_flush_with(&self, extra_bytes: usize) -> bool {
self.pending.len() + 1 > self.max_entries
|| self.pending_bytes + extra_bytes > self.max_bytes
}
/// Take the pending frame out, resetting state.
fn take_pending(&mut self) -> TrunkFrame {
let frame = std::mem::replace(&mut self.pending, TrunkFrame::new());
self.pending_bytes = Self::FRAME_HEADER;
frame
}
}
impl Default for TrunkBatcher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn trunk_batcher_fills_and_flushes() {
let mut batcher = TrunkBatcher::new();
batcher.max_entries = 3;
batcher.max_bytes = 4096; // large enough to not interfere
// First two pushes should not flush.
assert!(batcher.push([0, 1], Bytes::from_static(b"aaa")).is_none());
assert!(batcher.push([0, 2], Bytes::from_static(b"bbb")).is_none());
// Third push should trigger flush (max_entries = 3).
let frame = batcher
.push([0, 3], Bytes::from_static(b"ccc"))
.expect("should flush at max_entries");
assert_eq!(frame.len(), 3);
assert_eq!(frame.packets[0].session_id, [0, 1]);
assert_eq!(frame.packets[2].payload, Bytes::from_static(b"ccc"));
// Batcher is now empty.
assert!(batcher.flush().is_none());
}
#[test]
fn trunk_batcher_respects_max_bytes() {
let mut batcher = TrunkBatcher::new();
batcher.max_entries = 100; // won't be the trigger
// Frame header (2) + one entry overhead (4) + 50 payload = 56
// Two entries: 2 + 2*(4+50) = 110
// Three entries: 2 + 3*54 = 164
batcher.max_bytes = 120; // allow at most 2 entries of 50-byte payload
let big = Bytes::from(vec![0xAA; 50]);
assert!(batcher.push([0, 1], big.clone()).is_none()); // 56 bytes
// Second push: 56 + 54 = 110 < 120, fits
assert!(batcher.push([0, 2], big.clone()).is_none());
// Third push would be 164 > 120, so existing batch flushes first
let frame = batcher
.push([0, 3], big.clone())
.expect("should flush on max_bytes");
assert_eq!(frame.len(), 2);
// The third entry is now pending
let remaining = batcher.flush().unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining.packets[0].session_id, [0, 3]);
}
}

143
notes Normal file
View File

@@ -0,0 +1,143 @@
1. Add trunking (biggest win): Multiplex multiple sessions into a single QUIC datagram batch. A TrunkFrame could pack N mini-packets (session_id:2 + payload) into one datagram, sharing the QUIC overhead. This is your multiplexing idea
from the telemetry discussion — the probe test lines are already a step toward this.
2. Mini-frame format: For consecutive packets from the same session, use a 4-byte mini-header (just timestamp delta + payload length) instead of the full 12-byte MediaHeader. IAX2 does this and it cuts header overhead by 67%.
3. Comfort noise / silence suppression: IAX2 supports CN frames — don't send packets during silence, saving ~50% bandwidth in typical conversations. WZP always sends frames even during silence.
4. Jitter buffer from IAX2's design: IAX2 uses adaptive playout delay based on observed jitter, not fixed target depth. This is exactly what T1-S4 should implement — your sweep tool (S3) can guide the parameters.
5. Call control completeness: IAX2 has HOLD, TRANSFER, QUELCH (mute from server), UNQUELCH. WZP's SignalMessage only has Offer/Answer/Hangup/Ringing. Adding these would help featherChat integration.
Priority Recommendations
┌──────────────────────┬────────────────────────┬───────────────────────────────────────────────────┐
│ From IAX2 │ Maps to │ Impact │
├──────────────────────┼────────────────────────┼───────────────────────────────────────────────────┤
│ Trunking │ New P2 task │ High — reduces per-packet overhead at relay scale │
├──────────────────────┼────────────────────────┼───────────────────────────────────────────────────┤
│ Mini-frames │ Wire format change │ Medium — 67% header reduction │
├──────────────────────┼────────────────────────┼───────────────────────────────────────────────────┤
│ Silence suppression │ Codec layer │ Medium — 50% BW savings in typical calls │
├──────────────────────┼────────────────────────┼───────────────────────────────────────────────────┤
│ Adaptive playout │ P2-T1-S4 │ Medium — fixes the jitter/drift issue │
├──────────────────────┼────────────────────────┼───────────────────────────────────────────────────┤
│ Call control signals │ SignalMessage variants │ Low — UX feature for featherChat │
└──────────────────────┴────────────────────────┴───────────────────────────────────────────────────┘
The trunking gap is the most significant architectural difference. Everything else WZP is either on par or ahead (crypto, FEC, identity).
/btwanswer my question , imagine we are connected to Mesh one, our id is 1 , (some eth wallet address) , and i'm trying to call 2 (some other eth address) which is not connected to our relay, like i'm connected to relay 1 , relay one is
connected to relay 3, and relay 3 to relay 2, how do we achieve a connection ?
Great question — this is relay-to-relay routing, and we don't have it yet. Here's what exists vs what's needed:
What We Have
- Client → Relay (direct QUIC)
- Relay → Relay probing (health metrics only, no media forwarding)
- Room-based forwarding within a single relay (SFU)
- Forward mode (relay → one remote relay, hardcoded pair)
What's Missing: Multi-Hop Call Routing
You (wallet 0x1) Peer (wallet 0x2)
│ │
▼ ▼
Relay 1 ──── Relay 3 ──── Relay 2
This requires:
1. Relay Discovery / Routing Table
Each relay needs to know which other relays exist and which users are connected where. Options:
- Gossip protocol: relays share their connected fingerprints with neighbors
- Central registry: featherChat server knows "user 0x2 is on Relay 2"
- DHT: fingerprint → relay mapping distributed across the mesh (like Kademlia)
2. Signaling Path Resolution
When you call 0x2:
1. Your client sends CallOffer through featherChat WS (already works)
2. featherChat knows 0x2's relay (from FC-5 group-to-room mapping or a presence API)
3. CallAnswer comes back with the relay chain: [relay1, relay3, relay2]
3. Media Path Setup
Two approaches:
A. Relay chaining (simpler):
You → Relay1 → Relay3 → Relay2 → Peer
Each relay forwards packets to the next hop. We already have forward mode (--remote) for a single pair — extend it to be per-session with a routing table. Latency adds up but it works.
B. QUIC relay-assisted NAT traversal (better):
You → Relay1 → Relay3 → Relay2 → Peer (signaling)
You ←──────── direct QUIC ──────────→ Peer (media, if NAT allows)
Use relays for signaling + ICE candidate exchange, then attempt a direct connection. Fall back to relay chain if direct fails.
4. Suggested Task Breakdown
┌───────────┬──────────────────────────┬─────────────────────────────────────────────────────────────────────────┐
│ ID │ Task │ Description │
├───────────┼──────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ WZP-P3-T4 │ Relay peer registry │ Each relay announces connected fingerprints to neighbors │
├───────────┼──────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ WZP-P3-T5 │ Route resolution │ Given a target fingerprint, find the relay chain │
├───────────┼──────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ WZP-P3-T6 │ Per-session forwarding │ Relay forwards media for a specific session to next-hop relay │
├───────────┼──────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ WZP-P3-T7 │ Call setup over mesh │ Client → FC signaling → resolve route → setup relay chain → media flows │
├───────────┼──────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ WZP-P3-T8 │ Direct path optimization │ After relay chain works, attempt ICE/STUN direct connection │
└───────────┴──────────────────────────┴─────────────────────────────────────────────────────────────────────────┘
5. The Practical Shortcut
Since featherChat already has presence (FC-6) and group membership (FC-5), the easiest first pass:
1. featherChat server knows which relay each user registered with (add relay_addr to the auth/connect flow)
2. CallOffer payload already carries relay_addr — the callee's client reads it and connects directly to Relay 1
3. Both clients connect to the same relay (caller's), joining the same hashed room
This avoids multi-hop entirely — the callee just connects to the caller's relay. Only fails if the callee can't reach that relay (firewall, geo), which is when you need the full mesh routing.
Want me to add these as P3 tasks and start with the shortcut approach?
│ Silence suppression VAD/DTX in Opus vs our Yes (comfort noise) vs iax (native)
2. Oalgorithms: Signal pioneered adaptive jitter buffers, dynamic bitrate switching, and bandwidth estimation tuned across millions of calls. WZP's QualityAdapter is a first pass — their algorithms use ML-based predictors.
4. Echo cancellation / noise suppression: Signal uses RNNoise (ML-based), WhatsApp has proprietary DSP. WZP doesn't have any — it relies on the platform's built-in AEC.
3. Group calls: WebRTC SFUs (used by Signal, WhatsApp) handle 32+ participants with simulcast (multiple quality layers). WZP's room mode is a basic SFU without simulcast or quality layering.
Yes, partially. Here's what you can realistically use:
RNNoise (echo/noise suppression) — This is the easiest win. RNNoise is BSD-licensed C code, and there's a Rust crate (nnnoiseless — a pure Rust port). You could add it as a pre-processing step in CallEncoder before Opus encoding. No
legal issues, no dependency on Signal.
Jitter buffer algorithms — Signal's adaptive jitter buffer code is in their open-source ringrtc repo (GPLv3). You can study the algorithm (NetEq-style adaptive playout) but can't copy code directly into WZP (MIT) due to GPL. However,
the concepts (ML-based delay prediction, talkspurt detection) are well-documented in academic papers and WebRTC's NetEq source (BSD-licensed). You could:
- Port ideas from WebRTC's NetEq (BSD) — this is what Signal built on anyway
- Use your T5 telemetry + sweep tool to train a simple predictor specific to WZP
Bandwidth estimation — WebRTC's GCC (Google Congestion Control) is BSD-licensed and well-documented. You could implement a simplified version using your existing QualityReport data. Signal's version is derivative of this.
Simulcast / group quality layers — The concept is straightforward (encode at 2-3 quality levels, relay picks per-receiver). You already have QualityProfile with GOOD/DEGRADED/CATASTROPHIC — extend the encoder to produce multiple
streams and the relay to select per-participant. No need to copy Signal code for this.
What you CAN'T directly use:
- Signal's proprietary ML models (not published)
- WhatsApp's DSP (closed source)
- GPL code in MIT project without relicensing
Practical priority:
1. nnnoiseless crate for noise suppression (drop-in, huge quality win)
2. WebRTC NetEq-style adaptive playout for T1-S4 (BSD, replaces your fixed jitter buffer)
3. GCC-style bandwidth estimation for P3-T3
4. Simulcast for group calls (design from scratch, concept is simple)
The biggest gap is noise suppression — adding nnnoiseless would be maybe 2-3 hours of work and immediately noticeable.