From cfb48df1efc4fd5bfa7410d71b30ad34af9b0755 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 6 Apr 2026 09:48:34 +0400 Subject: [PATCH] feat: direct playout mode, AEC far-end, audio processing switches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add --android/--direct-playout: bypass jitter buffer, decode on recv (matches Android engine architecture) - Wire AEC far-end reference from decoded playout to encoder - Add --no-aec, --no-agc, --no-fec, --no-silence, --no-denoise switches - Fix BufferSize::Fixed(960) → Default for macOS CoreAudio compat - Optimize wzp-codec, wzp-fec, audiopus, nnnoiseless in debug profile - Add capture callback size diagnostic logging Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.toml | 15 ++ crates/wzp-client/src/audio_io.rs | 16 +- crates/wzp-client/src/cli.rs | 248 ++++++++++++++++++++++++------ 3 files changed, 229 insertions(+), 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1daa196..31338d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,3 +53,18 @@ wzp-fec = { path = "crates/wzp-fec" } wzp-crypto = { path = "crates/wzp-crypto" } wzp-transport = { path = "crates/wzp-transport" } wzp-client = { path = "crates/wzp-client" } + +# Optimize heavy compute deps even in debug builds — +# real-time audio needs < 20ms per frame, impossible unoptimized. +[profile.dev.package.nnnoiseless] +opt-level = 3 +[profile.dev.package.audiopus_sys] +opt-level = 3 +[profile.dev.package.audiopus] +opt-level = 3 +[profile.dev.package.raptorq] +opt-level = 3 +[profile.dev.package.wzp-codec] +opt-level = 3 +[profile.dev.package.wzp-fec] +opt-level = 3 diff --git a/crates/wzp-client/src/audio_io.rs b/crates/wzp-client/src/audio_io.rs index f713e66..b787264 100644 --- a/crates/wzp-client/src/audio_io.rs +++ b/crates/wzp-client/src/audio_io.rs @@ -56,7 +56,7 @@ impl AudioCapture { let config = StreamConfig { channels: 1, sample_rate: SampleRate(48_000), - buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32), + buffer_size: cpal::BufferSize::Default, }; let use_f32 = !supports_i16_input(&device)?; @@ -65,17 +65,21 @@ impl AudioCapture { warn!("input stream error: {e}"); }; + let logged_cb_size = Arc::new(AtomicBool::new(false)); + let stream = if use_f32 { let ring = ring_cb.clone(); let running = running_clone.clone(); + let logged = logged_cb_size.clone(); device.build_input_stream( &config, move |data: &[f32], _: &cpal::InputCallbackInfo| { if !running.load(Ordering::Relaxed) { return; } - // Batch convert f32 → i16, then write entire slice to ring. - // Stack alloc for typical callback sizes (≤ 960 samples). + if !logged.swap(true, Ordering::Relaxed) { + eprintln!("[audio] capture callback: {} f32 samples", data.len()); + } let mut tmp = [0i16; FRAME_SAMPLES]; for chunk in data.chunks(FRAME_SAMPLES) { let n = chunk.len(); @@ -91,12 +95,16 @@ impl AudioCapture { } else { let ring = ring_cb.clone(); let running = running_clone.clone(); + let logged = logged_cb_size.clone(); device.build_input_stream( &config, move |data: &[i16], _: &cpal::InputCallbackInfo| { if !running.load(Ordering::Relaxed) { return; } + if !logged.swap(true, Ordering::Relaxed) { + eprintln!("[audio] capture callback: {} i16 samples", data.len()); + } ring.write(data); }, err_cb, @@ -183,7 +191,7 @@ impl AudioPlayback { let config = StreamConfig { channels: 1, sample_rate: SampleRate(48_000), - buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32), + buffer_size: cpal::BufferSize::Default, }; let use_f32 = !supports_i16_output(&device)?; diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index 3e0c0d7..d7783db 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -14,7 +14,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use tracing::{error, info}; +use tracing::{error, info, warn}; use wzp_client::call::{CallConfig, CallDecoder, CallEncoder}; use wzp_proto::MediaTransport; @@ -47,6 +47,12 @@ struct CliArgs { room: Option, raw_room: bool, alias: Option, + no_denoise: bool, + no_aec: bool, + no_agc: bool, + no_fec: bool, + no_silence: bool, + direct_playout: bool, token: Option, _metrics_file: Option, } @@ -118,6 +124,12 @@ fn parse_args() -> CliArgs { let mut room = None; let mut raw_room = false; let mut alias = None; + let mut no_denoise = false; + let mut no_aec = false; + let mut no_agc = false; + let mut no_fec = false; + let mut no_silence = false; + let mut direct_playout = false; let mut token = None; let mut metrics_file = None; let mut relay_str = None; @@ -163,6 +175,12 @@ fn parse_args() -> CliArgs { room = Some(args.get(i).expect("--room requires a name").to_string()); } "--raw-room" => raw_room = true, + "--no-denoise" => no_denoise = true, + "--no-aec" => no_aec = true, + "--no-agc" => no_agc = true, + "--no-fec" => no_fec = true, + "--no-silence" => no_silence = true, + "--direct-playout" | "--android" => direct_playout = true, "--alias" => { i += 1; alias = Some(args.get(i).expect("--alias requires a name").to_string()); @@ -222,6 +240,13 @@ fn parse_args() -> CliArgs { eprintln!(" --room Room name (hashed for privacy before sending)"); eprintln!(" --raw-room Send room name as-is (no hash, for Android compat)"); eprintln!(" --alias Display name shown to other participants"); + eprintln!(" --no-denoise Disable RNNoise noise suppression"); + eprintln!(" --no-aec Disable acoustic echo cancellation"); + eprintln!(" --no-agc Disable automatic gain control"); + eprintln!(" --no-fec Disable forward error correction"); + eprintln!(" --no-silence Disable silence suppression"); + eprintln!(" --direct-playout Bypass jitter buffer (decode on recv, like Android)"); + eprintln!(" --android Alias for --no-denoise --no-aec --no-silence --direct-playout"); eprintln!(" --token featherChat bearer token for relay auth"); eprintln!(" --metrics-file Write JSONL telemetry to file (1 line/sec)"); eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)"); @@ -261,6 +286,12 @@ fn parse_args() -> CliArgs { room, raw_room, alias, + no_denoise, + no_aec, + no_agc, + no_fec, + no_silence, + direct_playout, token, _metrics_file: metrics_file, } @@ -342,7 +373,15 @@ async fn main() -> anyhow::Result<()> { if cli.live { #[cfg(feature = "audio")] { - return run_live(transport).await; + let audio_opts = AudioOpts { + no_denoise: cli.no_denoise || cli.direct_playout, + no_aec: cli.no_aec, + no_agc: cli.no_agc, + no_fec: cli.no_fec, + no_silence: cli.no_silence || cli.direct_playout, + direct_playout: cli.direct_playout, + }; + return run_live(transport, audio_opts).await; } #[cfg(not(feature = "audio"))] { @@ -603,13 +642,24 @@ async fn run_file_mode( /// QUIC → recv task → jitter buffer → decode tick (20ms) → AudioRing → CPAL playback callback /// /// All lock-free: CPAL callbacks use atomic ring buffers, no Mutex on the audio path. +struct AudioOpts { + no_denoise: bool, + no_aec: bool, + no_agc: bool, + no_fec: bool, + no_silence: bool, + direct_playout: bool, +} + #[cfg(feature = "audio")] async fn run_live( transport: Arc, + opts: AudioOpts, ) -> anyhow::Result<()> { use std::sync::Arc as StdArc; use std::sync::atomic::{AtomicBool, Ordering}; use wzp_client::audio_io::{AudioCapture, AudioPlayback}; + use wzp_client::audio_ring::AudioRing; use wzp_client::call::JitterTelemetry; let capture = AudioCapture::start()?; @@ -619,6 +669,10 @@ async fn run_live( let capture_ring = capture.ring().clone(); let playout_ring = playback.ring().clone(); + // Far-end reference ring: recv task writes decoded audio here, + // send task reads it to feed the AEC echo canceller. + let farend_ring = StdArc::new(AudioRing::new()); + let running = StdArc::new(AtomicBool::new(true)); // --- Signal handler: set running=false on first Ctrl+C, force-quit on second --- @@ -634,15 +688,40 @@ async fn run_live( std::process::exit(1); }); - let config = CallConfig::default(); + let config = CallConfig { + noise_suppression: !opts.no_denoise, + suppression_enabled: !opts.no_silence, + ..CallConfig::default() + }; + { + let mut flags = Vec::new(); + if opts.no_denoise { flags.push("denoise"); } + if opts.no_aec { flags.push("aec"); } + if opts.no_agc { flags.push("agc"); } + if opts.no_fec { flags.push("fec"); } + if opts.no_silence { flags.push("silence"); } + if opts.direct_playout { flags.push("jitter-buffer (direct playout)"); } + if !flags.is_empty() { + info!(disabled = %flags.join(", "), "audio processing overrides"); + } + } // --- Send task: poll capture ring → encode → send via async --- let send_transport = transport.clone(); let send_running = running.clone(); + let no_aec = opts.no_aec; + let no_agc = opts.no_agc; + let _no_fec = opts.no_fec; + let send_farend = farend_ring.clone(); let send_task = async move { let mut encoder = CallEncoder::new(&config); + if no_aec { encoder.set_aec_enabled(false); } + if no_agc { encoder.set_agc_enabled(false); } let mut capture_buf = vec![0i16; FRAME_SAMPLES]; + let mut farend_buf = vec![0i16; FRAME_SAMPLES]; let mut frames_sent: u64 = 0; + let mut polls: u64 = 0; + let mut last_diag = std::time::Instant::now(); loop { if !send_running.load(Ordering::Relaxed) { @@ -652,6 +731,12 @@ async fn run_live( let avail = capture_ring.available(); if avail < FRAME_SAMPLES { tokio::time::sleep(std::time::Duration::from_millis(5)).await; + polls += 1; + // Diagnostic every 2 seconds + if last_diag.elapsed().as_secs() >= 2 { + info!(avail, polls, frames_sent, "send: ring starved (avail < {FRAME_SAMPLES})"); + last_diag = std::time::Instant::now(); + } continue; } @@ -660,6 +745,16 @@ async fn run_live( continue; } + // Feed AEC far-end reference: what was played through the speaker. + // Must be called BEFORE encode_frame processes the mic signal. + if !no_aec { + while send_farend.available() >= FRAME_SAMPLES { + send_farend.read(&mut farend_buf); + encoder.feed_aec_farend(&farend_buf); + } + } + + let t0 = std::time::Instant::now(); let packets = match encoder.encode_frame(&capture_buf) { Ok(p) => p, Err(e) => { @@ -667,6 +762,7 @@ async fn run_live( continue; } }; + let encode_us = t0.elapsed().as_micros(); for pkt in &packets { if let Err(e) = send_transport.send_media(pkt).await { @@ -676,58 +772,121 @@ async fn run_live( } frames_sent += 1; - if frames_sent == 1 || frames_sent % 500 == 0 { - info!(frames_sent, "send progress"); + if frames_sent <= 5 || frames_sent % 500 == 0 { + info!(frames_sent, encode_us, pkts = packets.len(), "send progress"); } } }; - // --- Recv task: receive packets → ingest into jitter buffer --- - // Uses timeout so it can check the running flag and exit on Ctrl+C. + // --- Recv + playout --- let recv_transport = transport.clone(); let recv_running = running.clone(); - let config = CallConfig::default(); - let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config))); - let decoder_recv = decoder.clone(); + let direct_playout = opts.direct_playout; - let recv_task = async move { - let mut packets_received: u64 = 0; - loop { - if !recv_running.load(Ordering::Relaxed) { - break; - } - // Timeout so we can check running flag periodically - let result = tokio::time::timeout( - std::time::Duration::from_millis(100), - recv_transport.recv_media(), - ) - .await; - match result { - Ok(Ok(Some(pkt))) => { - let mut dec = decoder_recv.lock().await; - dec.ingest(pkt); - packets_received += 1; - if packets_received == 1 || packets_received % 500 == 0 { - info!(packets_received, depth = dec.stats().current_depth, "recv progress"); + // Direct playout: decode on recv, write straight to playout ring (like Android). + // Jitter buffer mode: ingest into jitter buffer, decode on 20ms tick. + let recv_task = { + let playout_ring = playout_ring.clone(); + let farend_ring = farend_ring.clone(); + let config = CallConfig::default(); + let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config))); + let decoder_recv = decoder.clone(); + + async move { + let mut packets_received: u64 = 0; + let mut timeouts: u64 = 0; + // For direct playout: raw Opus decoder + AGC + let mut opus_dec = if direct_playout { + Some(wzp_codec::create_decoder(wzp_proto::QualityProfile::GOOD)) + } else { + None + }; + let mut playout_agc = wzp_codec::AutoGainControl::new(); + let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; + + loop { + if !recv_running.load(Ordering::Relaxed) { + break; + } + let result = tokio::time::timeout( + std::time::Duration::from_millis(100), + recv_transport.recv_media(), + ) + .await; + match result { + Ok(Ok(Some(pkt))) => { + packets_received += 1; + + if direct_playout { + // Android path: decode immediately, AGC, write to ring + if !pkt.header.is_repair { + if let Some(ref mut dec) = opus_dec { + match dec.decode(&pkt.payload, &mut pcm_buf) { + Ok(n) => { + if !no_agc { + playout_agc.process_frame(&mut pcm_buf[..n]); + } + playout_ring.write(&pcm_buf[..n]); + // Feed far-end ring for AEC + farend_ring.write(&pcm_buf[..n]); + } + Err(e) => { + if let Ok(n) = dec.decode_lost(&mut pcm_buf) { + playout_ring.write(&pcm_buf[..n]); + } + if packets_received < 10 { + warn!("decode error: {e}"); + } + } + } + } + } + } else { + // Jitter buffer path + let mut dec = decoder_recv.lock().await; + dec.ingest(pkt); + } + + if packets_received == 1 || packets_received % 500 == 0 { + info!(packets_received, direct_playout, "recv progress"); + } + timeouts = 0; + } + Ok(Ok(None)) => { + info!("connection closed"); + break; + } + Ok(Err(e)) => { + error!("recv error: {e}"); + break; + } + Err(_) => { + timeouts += 1; + if timeouts == 50 { + info!("recv: no media packets received in 5s"); + } } } - Ok(Ok(None)) => { - info!("connection closed"); - break; - } - Ok(Err(e)) => { - error!("recv error: {e}"); - break; - } - Err(_) => {} // timeout — loop and check running flag } } }; - // --- Playout tick: decode from jitter buffer at steady 20ms intervals --- + // Playout tick — only used when NOT in direct playout mode let playout_running = running.clone(); - let decoder_playout = decoder.clone(); let playout_task = async move { + if direct_playout { + // Direct playout handles everything in recv_task — just park here + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if !playout_running.load(Ordering::Relaxed) { + break; + } + } + return; + } + + let config = CallConfig::default(); + let mut decoder = CallDecoder::new(&config); let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; let mut interval = tokio::time::interval(std::time::Duration::from_millis(20)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -738,19 +897,16 @@ async fn run_live( break; } - let mut dec = decoder_playout.lock().await; - - // Drain ready frames from jitter buffer into playout ring. let mut decoded_this_tick = 0; - while let Some(n) = dec.decode_next(&mut pcm_buf) { + while let Some(n) = decoder.decode_next(&mut pcm_buf) { playout_ring.write(&pcm_buf[..n]); decoded_this_tick += 1; if decoded_this_tick >= 2 { - break; // Don't drain too aggressively in one tick + break; } } - telemetry.maybe_log(dec.stats()); + telemetry.maybe_log(decoder.stats()); } };