feat: direct playout mode, AEC far-end, audio processing switches
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m28s

- Add --android/--direct-playout: bypass jitter buffer, decode on recv
  (matches Android engine architecture)
- Wire AEC far-end reference from decoded playout to encoder
- Add --no-aec, --no-agc, --no-fec, --no-silence, --no-denoise switches
- Fix BufferSize::Fixed(960) → Default for macOS CoreAudio compat
- Optimize wzp-codec, wzp-fec, audiopus, nnnoiseless in debug profile
- Add capture callback size diagnostic logging

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-06 09:48:34 +04:00
parent ba29d8354f
commit cfb48df1ef
3 changed files with 229 additions and 50 deletions

View File

@@ -53,3 +53,18 @@ wzp-fec = { path = "crates/wzp-fec" }
wzp-crypto = { path = "crates/wzp-crypto" } wzp-crypto = { path = "crates/wzp-crypto" }
wzp-transport = { path = "crates/wzp-transport" } wzp-transport = { path = "crates/wzp-transport" }
wzp-client = { path = "crates/wzp-client" } wzp-client = { path = "crates/wzp-client" }
# Optimize heavy compute deps even in debug builds —
# real-time audio needs < 20ms per frame, impossible unoptimized.
[profile.dev.package.nnnoiseless]
opt-level = 3
[profile.dev.package.audiopus_sys]
opt-level = 3
[profile.dev.package.audiopus]
opt-level = 3
[profile.dev.package.raptorq]
opt-level = 3
[profile.dev.package.wzp-codec]
opt-level = 3
[profile.dev.package.wzp-fec]
opt-level = 3

View File

@@ -56,7 +56,7 @@ impl AudioCapture {
let config = StreamConfig { let config = StreamConfig {
channels: 1, channels: 1,
sample_rate: SampleRate(48_000), sample_rate: SampleRate(48_000),
buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32), buffer_size: cpal::BufferSize::Default,
}; };
let use_f32 = !supports_i16_input(&device)?; let use_f32 = !supports_i16_input(&device)?;
@@ -65,17 +65,21 @@ impl AudioCapture {
warn!("input stream error: {e}"); warn!("input stream error: {e}");
}; };
let logged_cb_size = Arc::new(AtomicBool::new(false));
let stream = if use_f32 { let stream = if use_f32 {
let ring = ring_cb.clone(); let ring = ring_cb.clone();
let running = running_clone.clone(); let running = running_clone.clone();
let logged = logged_cb_size.clone();
device.build_input_stream( device.build_input_stream(
&config, &config,
move |data: &[f32], _: &cpal::InputCallbackInfo| { move |data: &[f32], _: &cpal::InputCallbackInfo| {
if !running.load(Ordering::Relaxed) { if !running.load(Ordering::Relaxed) {
return; return;
} }
// Batch convert f32 → i16, then write entire slice to ring. if !logged.swap(true, Ordering::Relaxed) {
// Stack alloc for typical callback sizes (≤ 960 samples). eprintln!("[audio] capture callback: {} f32 samples", data.len());
}
let mut tmp = [0i16; FRAME_SAMPLES]; let mut tmp = [0i16; FRAME_SAMPLES];
for chunk in data.chunks(FRAME_SAMPLES) { for chunk in data.chunks(FRAME_SAMPLES) {
let n = chunk.len(); let n = chunk.len();
@@ -91,12 +95,16 @@ impl AudioCapture {
} else { } else {
let ring = ring_cb.clone(); let ring = ring_cb.clone();
let running = running_clone.clone(); let running = running_clone.clone();
let logged = logged_cb_size.clone();
device.build_input_stream( device.build_input_stream(
&config, &config,
move |data: &[i16], _: &cpal::InputCallbackInfo| { move |data: &[i16], _: &cpal::InputCallbackInfo| {
if !running.load(Ordering::Relaxed) { if !running.load(Ordering::Relaxed) {
return; return;
} }
if !logged.swap(true, Ordering::Relaxed) {
eprintln!("[audio] capture callback: {} i16 samples", data.len());
}
ring.write(data); ring.write(data);
}, },
err_cb, err_cb,
@@ -183,7 +191,7 @@ impl AudioPlayback {
let config = StreamConfig { let config = StreamConfig {
channels: 1, channels: 1,
sample_rate: SampleRate(48_000), sample_rate: SampleRate(48_000),
buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32), buffer_size: cpal::BufferSize::Default,
}; };
let use_f32 = !supports_i16_output(&device)?; let use_f32 = !supports_i16_output(&device)?;

View File

@@ -14,7 +14,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tracing::{error, info}; use tracing::{error, info, warn};
use wzp_client::call::{CallConfig, CallDecoder, CallEncoder}; use wzp_client::call::{CallConfig, CallDecoder, CallEncoder};
use wzp_proto::MediaTransport; use wzp_proto::MediaTransport;
@@ -47,6 +47,12 @@ struct CliArgs {
room: Option<String>, room: Option<String>,
raw_room: bool, raw_room: bool,
alias: Option<String>, alias: Option<String>,
no_denoise: bool,
no_aec: bool,
no_agc: bool,
no_fec: bool,
no_silence: bool,
direct_playout: bool,
token: Option<String>, token: Option<String>,
_metrics_file: Option<String>, _metrics_file: Option<String>,
} }
@@ -118,6 +124,12 @@ fn parse_args() -> CliArgs {
let mut room = None; let mut room = None;
let mut raw_room = false; let mut raw_room = false;
let mut alias = None; let mut alias = None;
let mut no_denoise = false;
let mut no_aec = false;
let mut no_agc = false;
let mut no_fec = false;
let mut no_silence = false;
let mut direct_playout = false;
let mut token = None; let mut token = None;
let mut metrics_file = None; let mut metrics_file = None;
let mut relay_str = None; let mut relay_str = None;
@@ -163,6 +175,12 @@ fn parse_args() -> CliArgs {
room = Some(args.get(i).expect("--room requires a name").to_string()); room = Some(args.get(i).expect("--room requires a name").to_string());
} }
"--raw-room" => raw_room = true, "--raw-room" => raw_room = true,
"--no-denoise" => no_denoise = true,
"--no-aec" => no_aec = true,
"--no-agc" => no_agc = true,
"--no-fec" => no_fec = true,
"--no-silence" => no_silence = true,
"--direct-playout" | "--android" => direct_playout = true,
"--alias" => { "--alias" => {
i += 1; i += 1;
alias = Some(args.get(i).expect("--alias requires a name").to_string()); alias = Some(args.get(i).expect("--alias requires a name").to_string());
@@ -222,6 +240,13 @@ fn parse_args() -> CliArgs {
eprintln!(" --room <name> Room name (hashed for privacy before sending)"); eprintln!(" --room <name> Room name (hashed for privacy before sending)");
eprintln!(" --raw-room Send room name as-is (no hash, for Android compat)"); eprintln!(" --raw-room Send room name as-is (no hash, for Android compat)");
eprintln!(" --alias <name> Display name shown to other participants"); eprintln!(" --alias <name> Display name shown to other participants");
eprintln!(" --no-denoise Disable RNNoise noise suppression");
eprintln!(" --no-aec Disable acoustic echo cancellation");
eprintln!(" --no-agc Disable automatic gain control");
eprintln!(" --no-fec Disable forward error correction");
eprintln!(" --no-silence Disable silence suppression");
eprintln!(" --direct-playout Bypass jitter buffer (decode on recv, like Android)");
eprintln!(" --android Alias for --no-denoise --no-aec --no-silence --direct-playout");
eprintln!(" --token <token> featherChat bearer token for relay auth"); eprintln!(" --token <token> featherChat bearer token for relay auth");
eprintln!(" --metrics-file <path> Write JSONL telemetry to file (1 line/sec)"); eprintln!(" --metrics-file <path> Write JSONL telemetry to file (1 line/sec)");
eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)"); eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)");
@@ -261,6 +286,12 @@ fn parse_args() -> CliArgs {
room, room,
raw_room, raw_room,
alias, alias,
no_denoise,
no_aec,
no_agc,
no_fec,
no_silence,
direct_playout,
token, token,
_metrics_file: metrics_file, _metrics_file: metrics_file,
} }
@@ -342,7 +373,15 @@ async fn main() -> anyhow::Result<()> {
if cli.live { if cli.live {
#[cfg(feature = "audio")] #[cfg(feature = "audio")]
{ {
return run_live(transport).await; let audio_opts = AudioOpts {
no_denoise: cli.no_denoise || cli.direct_playout,
no_aec: cli.no_aec,
no_agc: cli.no_agc,
no_fec: cli.no_fec,
no_silence: cli.no_silence || cli.direct_playout,
direct_playout: cli.direct_playout,
};
return run_live(transport, audio_opts).await;
} }
#[cfg(not(feature = "audio"))] #[cfg(not(feature = "audio"))]
{ {
@@ -603,13 +642,24 @@ async fn run_file_mode(
/// QUIC → recv task → jitter buffer → decode tick (20ms) → AudioRing → CPAL playback callback /// QUIC → recv task → jitter buffer → decode tick (20ms) → AudioRing → CPAL playback callback
/// ///
/// All lock-free: CPAL callbacks use atomic ring buffers, no Mutex on the audio path. /// All lock-free: CPAL callbacks use atomic ring buffers, no Mutex on the audio path.
struct AudioOpts {
no_denoise: bool,
no_aec: bool,
no_agc: bool,
no_fec: bool,
no_silence: bool,
direct_playout: bool,
}
#[cfg(feature = "audio")] #[cfg(feature = "audio")]
async fn run_live( async fn run_live(
transport: Arc<wzp_transport::QuinnTransport>, transport: Arc<wzp_transport::QuinnTransport>,
opts: AudioOpts,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
use std::sync::Arc as StdArc; use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use wzp_client::audio_io::{AudioCapture, AudioPlayback}; use wzp_client::audio_io::{AudioCapture, AudioPlayback};
use wzp_client::audio_ring::AudioRing;
use wzp_client::call::JitterTelemetry; use wzp_client::call::JitterTelemetry;
let capture = AudioCapture::start()?; let capture = AudioCapture::start()?;
@@ -619,6 +669,10 @@ async fn run_live(
let capture_ring = capture.ring().clone(); let capture_ring = capture.ring().clone();
let playout_ring = playback.ring().clone(); let playout_ring = playback.ring().clone();
// Far-end reference ring: recv task writes decoded audio here,
// send task reads it to feed the AEC echo canceller.
let farend_ring = StdArc::new(AudioRing::new());
let running = StdArc::new(AtomicBool::new(true)); let running = StdArc::new(AtomicBool::new(true));
// --- Signal handler: set running=false on first Ctrl+C, force-quit on second --- // --- Signal handler: set running=false on first Ctrl+C, force-quit on second ---
@@ -634,15 +688,40 @@ async fn run_live(
std::process::exit(1); std::process::exit(1);
}); });
let config = CallConfig::default(); let config = CallConfig {
noise_suppression: !opts.no_denoise,
suppression_enabled: !opts.no_silence,
..CallConfig::default()
};
{
let mut flags = Vec::new();
if opts.no_denoise { flags.push("denoise"); }
if opts.no_aec { flags.push("aec"); }
if opts.no_agc { flags.push("agc"); }
if opts.no_fec { flags.push("fec"); }
if opts.no_silence { flags.push("silence"); }
if opts.direct_playout { flags.push("jitter-buffer (direct playout)"); }
if !flags.is_empty() {
info!(disabled = %flags.join(", "), "audio processing overrides");
}
}
// --- Send task: poll capture ring → encode → send via async --- // --- Send task: poll capture ring → encode → send via async ---
let send_transport = transport.clone(); let send_transport = transport.clone();
let send_running = running.clone(); let send_running = running.clone();
let no_aec = opts.no_aec;
let no_agc = opts.no_agc;
let _no_fec = opts.no_fec;
let send_farend = farend_ring.clone();
let send_task = async move { let send_task = async move {
let mut encoder = CallEncoder::new(&config); let mut encoder = CallEncoder::new(&config);
if no_aec { encoder.set_aec_enabled(false); }
if no_agc { encoder.set_agc_enabled(false); }
let mut capture_buf = vec![0i16; FRAME_SAMPLES]; let mut capture_buf = vec![0i16; FRAME_SAMPLES];
let mut farend_buf = vec![0i16; FRAME_SAMPLES];
let mut frames_sent: u64 = 0; let mut frames_sent: u64 = 0;
let mut polls: u64 = 0;
let mut last_diag = std::time::Instant::now();
loop { loop {
if !send_running.load(Ordering::Relaxed) { if !send_running.load(Ordering::Relaxed) {
@@ -652,6 +731,12 @@ async fn run_live(
let avail = capture_ring.available(); let avail = capture_ring.available();
if avail < FRAME_SAMPLES { if avail < FRAME_SAMPLES {
tokio::time::sleep(std::time::Duration::from_millis(5)).await; tokio::time::sleep(std::time::Duration::from_millis(5)).await;
polls += 1;
// Diagnostic every 2 seconds
if last_diag.elapsed().as_secs() >= 2 {
info!(avail, polls, frames_sent, "send: ring starved (avail < {FRAME_SAMPLES})");
last_diag = std::time::Instant::now();
}
continue; continue;
} }
@@ -660,6 +745,16 @@ async fn run_live(
continue; continue;
} }
// Feed AEC far-end reference: what was played through the speaker.
// Must be called BEFORE encode_frame processes the mic signal.
if !no_aec {
while send_farend.available() >= FRAME_SAMPLES {
send_farend.read(&mut farend_buf);
encoder.feed_aec_farend(&farend_buf);
}
}
let t0 = std::time::Instant::now();
let packets = match encoder.encode_frame(&capture_buf) { let packets = match encoder.encode_frame(&capture_buf) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
@@ -667,6 +762,7 @@ async fn run_live(
continue; continue;
} }
}; };
let encode_us = t0.elapsed().as_micros();
for pkt in &packets { for pkt in &packets {
if let Err(e) = send_transport.send_media(pkt).await { if let Err(e) = send_transport.send_media(pkt).await {
@@ -676,58 +772,121 @@ async fn run_live(
} }
frames_sent += 1; frames_sent += 1;
if frames_sent == 1 || frames_sent % 500 == 0 { if frames_sent <= 5 || frames_sent % 500 == 0 {
info!(frames_sent, "send progress"); info!(frames_sent, encode_us, pkts = packets.len(), "send progress");
} }
} }
}; };
// --- Recv task: receive packets → ingest into jitter buffer --- // --- Recv + playout ---
// Uses timeout so it can check the running flag and exit on Ctrl+C.
let recv_transport = transport.clone(); let recv_transport = transport.clone();
let recv_running = running.clone(); let recv_running = running.clone();
let config = CallConfig::default(); let direct_playout = opts.direct_playout;
let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config)));
let decoder_recv = decoder.clone();
let recv_task = async move { // Direct playout: decode on recv, write straight to playout ring (like Android).
let mut packets_received: u64 = 0; // Jitter buffer mode: ingest into jitter buffer, decode on 20ms tick.
loop { let recv_task = {
if !recv_running.load(Ordering::Relaxed) { let playout_ring = playout_ring.clone();
break; let farend_ring = farend_ring.clone();
} let config = CallConfig::default();
// Timeout so we can check running flag periodically let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config)));
let result = tokio::time::timeout( let decoder_recv = decoder.clone();
std::time::Duration::from_millis(100),
recv_transport.recv_media(), async move {
) let mut packets_received: u64 = 0;
.await; let mut timeouts: u64 = 0;
match result { // For direct playout: raw Opus decoder + AGC
Ok(Ok(Some(pkt))) => { let mut opus_dec = if direct_playout {
let mut dec = decoder_recv.lock().await; Some(wzp_codec::create_decoder(wzp_proto::QualityProfile::GOOD))
dec.ingest(pkt); } else {
packets_received += 1; None
if packets_received == 1 || packets_received % 500 == 0 { };
info!(packets_received, depth = dec.stats().current_depth, "recv progress"); let mut playout_agc = wzp_codec::AutoGainControl::new();
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
loop {
if !recv_running.load(Ordering::Relaxed) {
break;
}
let result = tokio::time::timeout(
std::time::Duration::from_millis(100),
recv_transport.recv_media(),
)
.await;
match result {
Ok(Ok(Some(pkt))) => {
packets_received += 1;
if direct_playout {
// Android path: decode immediately, AGC, write to ring
if !pkt.header.is_repair {
if let Some(ref mut dec) = opus_dec {
match dec.decode(&pkt.payload, &mut pcm_buf) {
Ok(n) => {
if !no_agc {
playout_agc.process_frame(&mut pcm_buf[..n]);
}
playout_ring.write(&pcm_buf[..n]);
// Feed far-end ring for AEC
farend_ring.write(&pcm_buf[..n]);
}
Err(e) => {
if let Ok(n) = dec.decode_lost(&mut pcm_buf) {
playout_ring.write(&pcm_buf[..n]);
}
if packets_received < 10 {
warn!("decode error: {e}");
}
}
}
}
}
} else {
// Jitter buffer path
let mut dec = decoder_recv.lock().await;
dec.ingest(pkt);
}
if packets_received == 1 || packets_received % 500 == 0 {
info!(packets_received, direct_playout, "recv progress");
}
timeouts = 0;
}
Ok(Ok(None)) => {
info!("connection closed");
break;
}
Ok(Err(e)) => {
error!("recv error: {e}");
break;
}
Err(_) => {
timeouts += 1;
if timeouts == 50 {
info!("recv: no media packets received in 5s");
}
} }
} }
Ok(Ok(None)) => {
info!("connection closed");
break;
}
Ok(Err(e)) => {
error!("recv error: {e}");
break;
}
Err(_) => {} // timeout — loop and check running flag
} }
} }
}; };
// --- Playout tick: decode from jitter buffer at steady 20ms intervals --- // Playout tick — only used when NOT in direct playout mode
let playout_running = running.clone(); let playout_running = running.clone();
let decoder_playout = decoder.clone();
let playout_task = async move { let playout_task = async move {
if direct_playout {
// Direct playout handles everything in recv_task — just park here
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if !playout_running.load(Ordering::Relaxed) {
break;
}
}
return;
}
let config = CallConfig::default();
let mut decoder = CallDecoder::new(&config);
let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
let mut interval = tokio::time::interval(std::time::Duration::from_millis(20)); let mut interval = tokio::time::interval(std::time::Duration::from_millis(20));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -738,19 +897,16 @@ async fn run_live(
break; break;
} }
let mut dec = decoder_playout.lock().await;
// Drain ready frames from jitter buffer into playout ring.
let mut decoded_this_tick = 0; let mut decoded_this_tick = 0;
while let Some(n) = dec.decode_next(&mut pcm_buf) { while let Some(n) = decoder.decode_next(&mut pcm_buf) {
playout_ring.write(&pcm_buf[..n]); playout_ring.write(&pcm_buf[..n]);
decoded_this_tick += 1; decoded_this_tick += 1;
if decoded_this_tick >= 2 { if decoded_this_tick >= 2 {
break; // Don't drain too aggressively in one tick break;
} }
} }
telemetry.maybe_log(dec.stats()); telemetry.maybe_log(decoder.stats());
} }
}; };