feat: protocol improvements — live trunking, mini-frames, noise suppression, adaptive jitter
T6 wiring: Trunking in relay hot path - TrunkedForwarder wraps transport with TrunkBatcher - run_participant uses 5ms flush timer when trunking enabled - send_trunk/recv_trunk on QuinnTransport - --trunking flag on relay config - 2 new tests: forwarder batches, auto-flush on full T7 wiring: Mini-frames in encoder/decoder - MediaPacket::encode_compact/decode_compact with MiniFrameContext - CallEncoder sends mini-headers for consecutive frames (full every 50th) - CallDecoder auto-detects full vs mini on receive - mini_frames_enabled in CallConfig (default true) - 3 new tests: encode/decode sequence, periodic full, disabled mode Noise suppression (nnnoiseless/RNNoise) - NoiseSupressor in wzp-codec: pure Rust ML-based noise removal - Processes 960-sample frames as two 480-sample halves - Integrated in CallEncoder before silence detection - noise_suppression in CallConfig (default true) - 4 new tests: creation, processing, SNR improvement, passthrough T1-S4: Adaptive playout delay - AdaptivePlayoutDelay: EMA-based jitter tracking (NetEq-inspired) - Computes target_delay from observed inter-arrival jitter - JitterBuffer::new_adaptive() uses adaptive delay - adaptive_jitter in CallConfig (default true) - 5 new tests: stable, jitter increase, recovery, clamping, estimate 272 tests passing across all crates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -33,6 +33,12 @@ pub struct RelayConfig {
|
||||
/// Discovery is manual via multiple --probe flags; this flag signals intent.
|
||||
#[serde(default)]
|
||||
pub probe_mesh: bool,
|
||||
/// Enable trunk batching for outgoing media in room mode.
|
||||
/// When true, packets destined for the same receiver are accumulated into
|
||||
/// [`TrunkFrame`]s and flushed every 5 ms (or when the batcher is full),
|
||||
/// reducing per-packet QUIC datagram overhead.
|
||||
#[serde(default)]
|
||||
pub trunking_enabled: bool,
|
||||
}
|
||||
|
||||
impl Default for RelayConfig {
|
||||
@@ -48,6 +54,7 @@ impl Default for RelayConfig {
|
||||
metrics_port: None,
|
||||
probe_targets: Vec::new(),
|
||||
probe_mesh: false,
|
||||
trunking_enabled: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,9 @@ fn parse_args() -> RelayConfig {
|
||||
"--probe-mesh" => {
|
||||
config.probe_mesh = true;
|
||||
}
|
||||
"--trunking" => {
|
||||
config.trunking_enabled = true;
|
||||
}
|
||||
"--mesh-status" => {
|
||||
// Print mesh table from a fresh registry and exit.
|
||||
// In practice this is useful after the relay has been running;
|
||||
@@ -84,6 +87,7 @@ fn parse_args() -> RelayConfig {
|
||||
eprintln!(" --probe <addr> Peer relay to probe for health monitoring (repeatable).");
|
||||
eprintln!(" --probe-mesh Enable mesh mode (mark config flag, probes all --probe targets).");
|
||||
eprintln!(" --mesh-status Print mesh health table and exit (diagnostic).");
|
||||
eprintln!(" --trunking Enable trunk batching for outgoing media in room mode.");
|
||||
eprintln!();
|
||||
eprintln!("Room mode (default):");
|
||||
eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU).");
|
||||
@@ -239,6 +243,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let auth_url = config.auth_url.clone();
|
||||
let relay_seed_bytes = relay_seed.0;
|
||||
let metrics = metrics.clone();
|
||||
let trunking_enabled = config.trunking_enabled;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let addr = connection.remote_address();
|
||||
@@ -423,6 +428,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
transport.clone(),
|
||||
metrics.clone(),
|
||||
&session_id_str,
|
||||
trunking_enabled,
|
||||
).await;
|
||||
|
||||
// Participant disconnected — clean up per-session metrics
|
||||
|
||||
@@ -6,13 +6,17 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use wzp_proto::packet::TrunkFrame;
|
||||
use wzp_proto::MediaTransport;
|
||||
|
||||
use crate::metrics::RelayMetrics;
|
||||
use crate::trunk::TrunkBatcher;
|
||||
|
||||
/// Unique participant ID within a room.
|
||||
pub type ParticipantId = u64;
|
||||
@@ -171,8 +175,70 @@ impl RoomManager {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// TrunkedForwarder — wraps a transport and batches outgoing media into trunk
|
||||
// frames so multiple packets ride a single QUIC datagram.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Wraps a [`QuinnTransport`] with a [`TrunkBatcher`] so that small media
|
||||
/// packets are accumulated and sent together in a single QUIC datagram.
|
||||
pub struct TrunkedForwarder {
|
||||
transport: Arc<wzp_transport::QuinnTransport>,
|
||||
batcher: TrunkBatcher,
|
||||
session_id: [u8; 2],
|
||||
}
|
||||
|
||||
impl TrunkedForwarder {
|
||||
/// Create a new trunked forwarder.
|
||||
///
|
||||
/// `session_id` tags every entry pushed into the batcher so the receiver
|
||||
/// can demultiplex packets by session.
|
||||
pub fn new(transport: Arc<wzp_transport::QuinnTransport>, session_id: [u8; 2]) -> Self {
|
||||
Self {
|
||||
transport,
|
||||
batcher: TrunkBatcher::new(),
|
||||
session_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a media packet into the batcher. If the batcher is full it will
|
||||
/// flush automatically and the resulting trunk frame is sent immediately.
|
||||
pub async fn send(&mut self, pkt: &wzp_proto::MediaPacket) -> anyhow::Result<()> {
|
||||
let payload: Bytes = pkt.to_bytes();
|
||||
if let Some(frame) = self.batcher.push(self.session_id, payload) {
|
||||
self.send_frame(&frame)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush any pending packets — called on the 5 ms timer tick.
|
||||
pub async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
if let Some(frame) = self.batcher.flush() {
|
||||
self.send_frame(&frame)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the flush interval configured on the inner batcher.
|
||||
pub fn flush_interval(&self) -> Duration {
|
||||
self.batcher.flush_interval
|
||||
}
|
||||
|
||||
fn send_frame(&self, frame: &TrunkFrame) -> anyhow::Result<()> {
|
||||
self.transport.send_trunk(frame).map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// run_participant — the hot-path forwarding loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Run the receive loop for one participant in a room.
|
||||
/// Forwards all received packets to every other participant.
|
||||
///
|
||||
/// When `trunking_enabled` is true, outgoing packets are accumulated per-peer
|
||||
/// into [`TrunkedForwarder`]s and flushed every 5 ms or when the batcher is
|
||||
/// full, reducing QUIC datagram overhead.
|
||||
pub async fn run_participant(
|
||||
room_mgr: Arc<Mutex<RoomManager>>,
|
||||
room_name: String,
|
||||
@@ -180,6 +246,29 @@ pub async fn run_participant(
|
||||
transport: Arc<wzp_transport::QuinnTransport>,
|
||||
metrics: Arc<RelayMetrics>,
|
||||
session_id: &str,
|
||||
trunking_enabled: bool,
|
||||
) {
|
||||
if trunking_enabled {
|
||||
run_participant_trunked(
|
||||
room_mgr, room_name, participant_id, transport, metrics, session_id,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
run_participant_plain(
|
||||
room_mgr, room_name, participant_id, transport, metrics, session_id,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Plain (non-trunked) forwarding loop — original behaviour.
|
||||
async fn run_participant_plain(
|
||||
room_mgr: Arc<Mutex<RoomManager>>,
|
||||
room_name: String,
|
||||
participant_id: ParticipantId,
|
||||
transport: Arc<wzp_transport::QuinnTransport>,
|
||||
metrics: Arc<RelayMetrics>,
|
||||
session_id: &str,
|
||||
) {
|
||||
let addr = transport.connection().remote_address();
|
||||
let mut packets_forwarded = 0u64;
|
||||
@@ -242,6 +331,120 @@ pub async fn run_participant(
|
||||
mgr.leave(&room_name, participant_id);
|
||||
}
|
||||
|
||||
/// Trunked forwarding loop — batches outgoing packets per peer.
|
||||
async fn run_participant_trunked(
|
||||
room_mgr: Arc<Mutex<RoomManager>>,
|
||||
room_name: String,
|
||||
participant_id: ParticipantId,
|
||||
transport: Arc<wzp_transport::QuinnTransport>,
|
||||
metrics: Arc<RelayMetrics>,
|
||||
session_id: &str,
|
||||
) {
|
||||
use std::collections::HashMap;
|
||||
|
||||
let addr = transport.connection().remote_address();
|
||||
let mut packets_forwarded = 0u64;
|
||||
|
||||
// Per-peer TrunkedForwarders, keyed by the raw pointer of the peer
|
||||
// transport (stable for the Arc's lifetime). We use the remote address
|
||||
// string as the key since it is unique per connection.
|
||||
let mut forwarders: HashMap<std::net::SocketAddr, TrunkedForwarder> = HashMap::new();
|
||||
|
||||
// Derive a 2-byte session tag from the session_id hex string.
|
||||
let sid_bytes: [u8; 2] = parse_session_id_bytes(session_id);
|
||||
|
||||
let mut flush_interval = tokio::time::interval(Duration::from_millis(5));
|
||||
// Don't let missed ticks pile up — skip them and move on.
|
||||
flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
result = transport.recv_media() => {
|
||||
let pkt = match result {
|
||||
Ok(Some(pkt)) => pkt,
|
||||
Ok(None) => {
|
||||
info!(%addr, participant = participant_id, "disconnected");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%addr, participant = participant_id, "recv error: {e}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ref report) = pkt.quality_report {
|
||||
metrics.update_session_quality(session_id, report);
|
||||
}
|
||||
|
||||
let others = {
|
||||
let mgr = room_mgr.lock().await;
|
||||
mgr.others(&room_name, participant_id)
|
||||
};
|
||||
|
||||
let pkt_bytes = pkt.payload.len() as u64;
|
||||
for other in &others {
|
||||
let peer_addr = other.connection().remote_address();
|
||||
let fwd = forwarders
|
||||
.entry(peer_addr)
|
||||
.or_insert_with(|| TrunkedForwarder::new(other.clone(), sid_bytes));
|
||||
if let Err(e) = fwd.send(&pkt).await {
|
||||
let _ = e;
|
||||
}
|
||||
}
|
||||
|
||||
let fan_out = others.len() as u64;
|
||||
metrics.packets_forwarded.inc_by(fan_out);
|
||||
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
|
||||
packets_forwarded += 1;
|
||||
if packets_forwarded % 500 == 0 {
|
||||
let room_size = {
|
||||
let mgr = room_mgr.lock().await;
|
||||
mgr.room_size(&room_name)
|
||||
};
|
||||
info!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
forwarded = packets_forwarded,
|
||||
room_size,
|
||||
"participant stats (trunked)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
_ = flush_interval.tick() => {
|
||||
for fwd in forwarders.values_mut() {
|
||||
if let Err(e) = fwd.flush().await {
|
||||
let _ = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Final flush — send any remaining buffered packets.
|
||||
for fwd in forwarders.values_mut() {
|
||||
let _ = fwd.flush().await;
|
||||
}
|
||||
|
||||
let mut mgr = room_mgr.lock().await;
|
||||
mgr.leave(&room_name, participant_id);
|
||||
}
|
||||
|
||||
/// Parse up to the first 2 bytes of a hex session-id string into `[u8; 2]`.
|
||||
fn parse_session_id_bytes(session_id: &str) -> [u8; 2] {
|
||||
let bytes: Vec<u8> = (0..session_id.len())
|
||||
.step_by(2)
|
||||
.filter_map(|i| u8::from_str_radix(session_id.get(i..i + 2)?, 16).ok())
|
||||
.collect();
|
||||
let mut out = [0u8; 2];
|
||||
for (i, b) in bytes.iter().take(2).enumerate() {
|
||||
out[i] = *b;
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -277,4 +480,97 @@ mod tests {
|
||||
assert!(mgr.is_authorized("room1", Some("bob")));
|
||||
assert!(!mgr.is_authorized("room1", Some("eve")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_session_id_bytes_works() {
|
||||
assert_eq!(parse_session_id_bytes("abcd"), [0xab, 0xcd]);
|
||||
assert_eq!(parse_session_id_bytes("ff00"), [0xff, 0x00]);
|
||||
assert_eq!(parse_session_id_bytes(""), [0x00, 0x00]);
|
||||
// Longer hex strings: only first 2 bytes taken
|
||||
assert_eq!(parse_session_id_bytes("aabbccdd"), [0xaa, 0xbb]);
|
||||
}
|
||||
|
||||
/// Helper: create a minimal MediaPacket with the given payload bytes.
|
||||
fn make_test_packet(payload: &[u8]) -> wzp_proto::MediaPacket {
|
||||
wzp_proto::MediaPacket {
|
||||
header: wzp_proto::packet::MediaHeader {
|
||||
version: 0,
|
||||
is_repair: false,
|
||||
codec_id: wzp_proto::CodecId::Opus16k,
|
||||
has_quality_report: false,
|
||||
fec_ratio_encoded: 0,
|
||||
seq: 1,
|
||||
timestamp: 100,
|
||||
fec_block: 0,
|
||||
fec_symbol: 0,
|
||||
reserved: 0,
|
||||
csrc_count: 0,
|
||||
},
|
||||
payload: Bytes::from(payload.to_vec()),
|
||||
quality_report: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Push 3 packets into a batcher (simulating TrunkedForwarder.send),
|
||||
/// then flush and verify all 3 appear in a single TrunkFrame.
|
||||
#[test]
|
||||
fn trunked_forwarder_batches() {
|
||||
let session_id: [u8; 2] = [0x00, 0x01];
|
||||
let mut batcher = TrunkBatcher::new();
|
||||
// Ensure max_entries is high enough that 3 packets don't auto-flush.
|
||||
batcher.max_entries = 10;
|
||||
batcher.max_bytes = 4096;
|
||||
|
||||
let pkts = [
|
||||
make_test_packet(b"aaa"),
|
||||
make_test_packet(b"bbb"),
|
||||
make_test_packet(b"ccc"),
|
||||
];
|
||||
|
||||
for pkt in &pkts {
|
||||
let payload = pkt.to_bytes();
|
||||
let flushed = batcher.push(session_id, payload);
|
||||
// Should NOT auto-flush — we are below max_entries.
|
||||
assert!(flushed.is_none(), "unexpected auto-flush");
|
||||
}
|
||||
|
||||
// Explicit flush (simulates the 5 ms timer tick).
|
||||
let frame = batcher.flush().expect("expected a frame with 3 entries");
|
||||
assert_eq!(frame.len(), 3);
|
||||
for entry in &frame.packets {
|
||||
assert_eq!(entry.session_id, session_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Push exactly max_entries packets and verify the batcher auto-flushes
|
||||
/// on the last push (simulating TrunkedForwarder.send triggering a send).
|
||||
#[test]
|
||||
fn trunked_forwarder_auto_flushes() {
|
||||
let session_id: [u8; 2] = [0x00, 0x02];
|
||||
let mut batcher = TrunkBatcher::new();
|
||||
batcher.max_entries = 5;
|
||||
batcher.max_bytes = 8192;
|
||||
|
||||
let pkt = make_test_packet(b"hello");
|
||||
let mut auto_flushed: Option<wzp_proto::packet::TrunkFrame> = None;
|
||||
|
||||
for i in 0..5 {
|
||||
let payload = pkt.to_bytes();
|
||||
if let Some(frame) = batcher.push(session_id, payload) {
|
||||
assert!(auto_flushed.is_none(), "should auto-flush exactly once");
|
||||
auto_flushed = Some(frame);
|
||||
// The auto-flush should happen on the 5th push (max_entries = 5).
|
||||
assert_eq!(i, 4, "expected auto-flush on the last push");
|
||||
}
|
||||
}
|
||||
|
||||
let frame = auto_flushed.expect("batcher should have auto-flushed at max_entries");
|
||||
assert_eq!(frame.len(), 5);
|
||||
for entry in &frame.packets {
|
||||
assert_eq!(entry.session_id, session_id);
|
||||
}
|
||||
|
||||
// Batcher should now be empty — nothing to flush.
|
||||
assert!(batcher.flush().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user