From 860c90394d39370e61d6d4de23e75cf7c443cdc5 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 6 Apr 2026 09:04:51 +0400 Subject: [PATCH] feat: rewrite desktop audio I/O with lock-free ring buffers - Replace Mutex-based CPAL callbacks with atomic SPSC ring buffers - Proper async send/recv loops (no block_on), 20ms playout tick - Add signal task for RoomUpdate presence display - Add --alias, --raw-room flags and key persistence (~/.wzp/identity) - Add SetAlias signal variant and relay-side handling - Graceful Ctrl+C shutdown with force-quit on second press Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/audio_io.rs | 159 ++++++-------- crates/wzp-client/src/audio_ring.rs | 89 ++++++++ crates/wzp-client/src/cli.rs | 309 ++++++++++++++++++++++----- crates/wzp-client/src/featherchat.rs | 1 + crates/wzp-client/src/lib.rs | 2 + crates/wzp-proto/src/packet.rs | 5 + crates/wzp-relay/src/room.rs | 188 +++++++++++----- 7 files changed, 552 insertions(+), 201 deletions(-) create mode 100644 crates/wzp-client/src/audio_ring.rs diff --git a/crates/wzp-client/src/audio_io.rs b/crates/wzp-client/src/audio_io.rs index 665cf0c..f713e66 100644 --- a/crates/wzp-client/src/audio_io.rs +++ b/crates/wzp-client/src/audio_io.rs @@ -3,12 +3,10 @@ //! Both structs use 48 kHz, mono, i16 format to match the WarzonePhone codec //! pipeline. Frames are 960 samples (20 ms at 48 kHz). //! -//! The cpal `Stream` type is not `Send`, so each struct spawns a dedicated OS -//! thread that owns the stream. The public API exposes only `Send + Sync` -//! channel handles. +//! Audio callbacks are **lock-free**: they read/write directly to an `AudioRing` +//! (atomic SPSC ring buffer). No Mutex, no channel, no allocation on the hot path. use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; use std::sync::Arc; use anyhow::{anyhow, Context}; @@ -16,6 +14,8 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig}; use tracing::{info, warn}; +use crate::audio_ring::AudioRing; + /// Number of samples per 20 ms frame at 48 kHz mono. pub const FRAME_SAMPLES: usize = 960; @@ -23,22 +23,24 @@ pub const FRAME_SAMPLES: usize = 960; // AudioCapture // --------------------------------------------------------------------------- -/// Captures microphone input and yields 960-sample PCM frames. +/// Captures microphone input via CPAL and writes PCM into a lock-free ring buffer. /// /// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`. pub struct AudioCapture { - rx: mpsc::Receiver>, + ring: Arc, running: Arc, } impl AudioCapture { /// Create and start capturing from the default input device at 48 kHz mono. pub fn start() -> Result { - let (tx, rx) = mpsc::sync_channel::>(64); + let ring = Arc::new(AudioRing::new()); let running = Arc::new(AtomicBool::new(true)); - let running_clone = running.clone(); - let (init_tx, init_rx) = mpsc::sync_channel::>(1); + let (init_tx, init_rx) = std::sync::mpsc::sync_channel::>(1); + + let ring_cb = ring.clone(); + let running_clone = running.clone(); std::thread::Builder::new() .name("wzp-audio-capture".into()) @@ -54,21 +56,17 @@ impl AudioCapture { let config = StreamConfig { channels: 1, sample_rate: SampleRate(48_000), - buffer_size: cpal::BufferSize::Default, + buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32), }; let use_f32 = !supports_i16_input(&device)?; - let buf = Arc::new(std::sync::Mutex::new( - Vec::::with_capacity(FRAME_SAMPLES), - )); let err_cb = |e: cpal::StreamError| { warn!("input stream error: {e}"); }; let stream = if use_f32 { - let buf = buf.clone(); - let tx = tx.clone(); + let ring = ring_cb.clone(); let running = running_clone.clone(); device.build_input_stream( &config, @@ -76,21 +74,22 @@ impl AudioCapture { if !running.load(Ordering::Relaxed) { return; } - let mut lock = buf.lock().unwrap(); - for &s in data { - lock.push(f32_to_i16(s)); - if lock.len() == FRAME_SAMPLES { - let frame = lock.drain(..).collect(); - let _ = tx.try_send(frame); + // Batch convert f32 → i16, then write entire slice to ring. + // Stack alloc for typical callback sizes (≤ 960 samples). + let mut tmp = [0i16; FRAME_SAMPLES]; + for chunk in data.chunks(FRAME_SAMPLES) { + let n = chunk.len(); + for i in 0..n { + tmp[i] = f32_to_i16(chunk[i]); } + ring.write(&tmp[..n]); } }, err_cb, None, )? } else { - let buf = buf.clone(); - let tx = tx.clone(); + let ring = ring_cb.clone(); let running = running_clone.clone(); device.build_input_stream( &config, @@ -98,14 +97,7 @@ impl AudioCapture { if !running.load(Ordering::Relaxed) { return; } - let mut lock = buf.lock().unwrap(); - for &s in data { - lock.push(s); - if lock.len() == FRAME_SAMPLES { - let frame = lock.drain(..).collect(); - let _ = tx.try_send(frame); - } - } + ring.write(data); }, err_cb, None, @@ -114,7 +106,6 @@ impl AudioCapture { stream.play().context("failed to start input stream")?; - // Signal success to the caller before parking. let _ = init_tx.send(Ok(())); // Keep stream alive until stopped. @@ -135,15 +126,12 @@ impl AudioCapture { .map_err(|_| anyhow!("capture thread exited before signaling"))? .map_err(|e| anyhow!("{e}"))?; - Ok(Self { rx, running }) + Ok(Self { ring, running }) } - /// Read the next frame of 960 PCM samples (blocking until available). - /// - /// Returns `None` when the stream has been stopped or the channel is - /// disconnected. - pub fn read_frame(&self) -> Option> { - self.rx.recv().ok() + /// Get a reference to the capture ring buffer for direct polling. + pub fn ring(&self) -> &Arc { + &self.ring } /// Stop capturing. @@ -152,26 +140,34 @@ impl AudioCapture { } } +impl Drop for AudioCapture { + fn drop(&mut self) { + self.stop(); + } +} + // --------------------------------------------------------------------------- // AudioPlayback // --------------------------------------------------------------------------- -/// Plays PCM frames through the default output device at 48 kHz mono. +/// Plays PCM through the default output device, reading from a lock-free ring buffer. /// /// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`. pub struct AudioPlayback { - tx: mpsc::SyncSender>, + ring: Arc, running: Arc, } impl AudioPlayback { /// Create and start playback on the default output device at 48 kHz mono. pub fn start() -> Result { - let (tx, rx) = mpsc::sync_channel::>(64); + let ring = Arc::new(AudioRing::new()); let running = Arc::new(AtomicBool::new(true)); - let running_clone = running.clone(); - let (init_tx, init_rx) = mpsc::sync_channel::>(1); + let (init_tx, init_rx) = std::sync::mpsc::sync_channel::>(1); + + let ring_cb = ring.clone(); + let running_clone = running.clone(); std::thread::Builder::new() .name("wzp-audio-playback".into()) @@ -187,67 +183,45 @@ impl AudioPlayback { let config = StreamConfig { channels: 1, sample_rate: SampleRate(48_000), - buffer_size: cpal::BufferSize::Default, + buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32), }; let use_f32 = !supports_i16_output(&device)?; - // Shared ring of samples the cpal callback drains from. - let ring = Arc::new(std::sync::Mutex::new( - std::collections::VecDeque::::with_capacity(FRAME_SAMPLES * 8), - )); - - // Background drainer: moves frames from the mpsc channel into the ring. - { - let ring = ring.clone(); - let running = running_clone.clone(); - std::thread::Builder::new() - .name("wzp-playback-drain".into()) - .spawn(move || { - while running.load(Ordering::Relaxed) { - match rx.recv_timeout(std::time::Duration::from_millis(100)) { - Ok(frame) => { - let mut lock = ring.lock().unwrap(); - lock.extend(frame); - while lock.len() > FRAME_SAMPLES * 16 { - lock.pop_front(); - } - } - Err(mpsc::RecvTimeoutError::Timeout) => {} - Err(mpsc::RecvTimeoutError::Disconnected) => break, - } - } - })?; - } - let err_cb = |e: cpal::StreamError| { warn!("output stream error: {e}"); }; let stream = if use_f32 { - let ring = ring.clone(); + let ring = ring_cb.clone(); device.build_output_stream( &config, move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - let mut lock = ring.lock().unwrap(); - for sample in data.iter_mut() { - *sample = match lock.pop_front() { - Some(s) => i16_to_f32(s), - None => 0.0, - }; + let mut tmp = [0i16; FRAME_SAMPLES]; + for chunk in data.chunks_mut(FRAME_SAMPLES) { + let n = chunk.len(); + let read = ring.read(&mut tmp[..n]); + for i in 0..read { + chunk[i] = i16_to_f32(tmp[i]); + } + // Fill remainder with silence if ring underran + for i in read..n { + chunk[i] = 0.0; + } } }, err_cb, None, )? } else { - let ring = ring.clone(); + let ring = ring_cb.clone(); device.build_output_stream( &config, move |data: &mut [i16], _: &cpal::OutputCallbackInfo| { - let mut lock = ring.lock().unwrap(); - for sample in data.iter_mut() { - *sample = lock.pop_front().unwrap_or(0); + let read = ring.read(data); + // Fill remainder with silence if ring underran + for sample in &mut data[read..] { + *sample = 0; } }, err_cb, @@ -257,7 +231,6 @@ impl AudioPlayback { stream.play().context("failed to start output stream")?; - // Signal success to the caller before parking. let _ = init_tx.send(Ok(())); // Keep stream alive until stopped. @@ -278,12 +251,12 @@ impl AudioPlayback { .map_err(|_| anyhow!("playback thread exited before signaling"))? .map_err(|e| anyhow!("{e}"))?; - Ok(Self { tx, running }) + Ok(Self { ring, running }) } - /// Write a frame of PCM samples for playback. - pub fn write_frame(&self, pcm: &[i16]) { - let _ = self.tx.try_send(pcm.to_vec()); + /// Get a reference to the playout ring buffer for direct writing. + pub fn ring(&self) -> &Arc { + &self.ring } /// Stop playback. @@ -292,11 +265,16 @@ impl AudioPlayback { } } +impl Drop for AudioPlayback { + fn drop(&mut self) { + self.stop(); + } +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -/// Check if the input device supports i16 at 48 kHz mono. fn supports_i16_input(device: &cpal::Device) -> Result { let supported = device .supported_input_configs() @@ -313,7 +291,6 @@ fn supports_i16_input(device: &cpal::Device) -> Result { Ok(false) } -/// Check if the output device supports i16 at 48 kHz mono. fn supports_i16_output(device: &cpal::Device) -> Result { let supported = device .supported_output_configs() diff --git a/crates/wzp-client/src/audio_ring.rs b/crates/wzp-client/src/audio_ring.rs new file mode 100644 index 0000000..fac1479 --- /dev/null +++ b/crates/wzp-client/src/audio_ring.rs @@ -0,0 +1,89 @@ +//! Lock-free SPSC ring buffer for audio PCM transfer between +//! CPAL audio callbacks and the Rust engine. +//! +//! Identical design to wzp-android's audio_ring: the producer writes and +//! advances a write cursor, the consumer reads and advances a read cursor. +//! Both cursors are atomic — no mutex, no blocking on the audio thread. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Ring buffer capacity in i16 samples. +/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono. +const RING_CAPACITY: usize = 960 * 10; + +/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples. +pub struct AudioRing { + buf: Box<[i16; RING_CAPACITY]>, + write_pos: AtomicUsize, + read_pos: AtomicUsize, +} + +// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads. +// The atomics ensure visibility. The buffer itself is never accessed +// from the same index by both threads simultaneously because the +// producer only writes to positions between write_pos and read_pos, +// and the consumer only reads from positions between read_pos and write_pos. +unsafe impl Send for AudioRing {} +unsafe impl Sync for AudioRing {} + +impl AudioRing { + pub fn new() -> Self { + Self { + buf: Box::new([0i16; RING_CAPACITY]), + write_pos: AtomicUsize::new(0), + read_pos: AtomicUsize::new(0), + } + } + + /// Number of samples available to read. + pub fn available(&self) -> usize { + let w = self.write_pos.load(Ordering::Acquire); + let r = self.read_pos.load(Ordering::Acquire); + w.wrapping_sub(r) + } + + /// Write samples into the ring. Returns number of samples written. + /// Drops oldest samples if the ring is full. + pub fn write(&self, samples: &[i16]) -> usize { + let w = self.write_pos.load(Ordering::Relaxed); + let count = samples.len().min(RING_CAPACITY); + + for i in 0..count { + let idx = (w + i) % RING_CAPACITY; + unsafe { + let ptr = self.buf.as_ptr() as *mut i16; + *ptr.add(idx) = samples[i]; + } + } + + self.write_pos + .store(w.wrapping_add(count), Ordering::Release); + + // If we overwrote unread data, advance read_pos + if self.available() > RING_CAPACITY { + let new_read = self + .write_pos + .load(Ordering::Relaxed) + .wrapping_sub(RING_CAPACITY); + self.read_pos.store(new_read, Ordering::Release); + } + + count + } + + /// Read samples from the ring into `out`. Returns number of samples read. + pub fn read(&self, out: &mut [i16]) -> usize { + let avail = self.available(); + let count = out.len().min(avail); + + let r = self.read_pos.load(Ordering::Relaxed); + for i in 0..count { + let idx = (r + i) % RING_CAPACITY; + out[i] = unsafe { *self.buf.as_ptr().add(idx) }; + } + + self.read_pos + .store(r.wrapping_add(count), Ordering::Release); + count + } +} diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index 6ab3451..7ddefb0 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -45,12 +45,22 @@ struct CliArgs { seed_hex: Option, mnemonic: Option, room: Option, + raw_room: bool, + alias: Option, token: Option, _metrics_file: Option, } +/// Default identity file path: ~/.wzp/identity +fn default_identity_path() -> std::path::PathBuf { + let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + std::path::PathBuf::from(home).join(".wzp").join("identity") +} + impl CliArgs { - /// Resolve the identity seed from --seed, --mnemonic, or generate a new one. + /// Resolve the identity seed from --seed, --mnemonic, or persistent file. + /// + /// Priority: --seed > --mnemonic > ~/.wzp/identity > generate + save. pub fn resolve_seed(&self) -> wzp_crypto::Seed { if let Some(ref hex_str) = self.seed_hex { let seed = wzp_crypto::Seed::from_hex(hex_str).expect("invalid --seed hex"); @@ -65,10 +75,30 @@ impl CliArgs { info!(fingerprint = %fp, "identity from --mnemonic"); seed } else { + let path = default_identity_path(); + // Try loading existing identity + if path.exists() { + if let Ok(hex_str) = std::fs::read_to_string(&path) { + let hex_str = hex_str.trim(); + if let Ok(seed) = wzp_crypto::Seed::from_hex(hex_str) { + let id = seed.derive_identity(); + let fp = id.public_identity().fingerprint; + info!(fingerprint = %fp, path = %path.display(), "loaded persistent identity"); + return seed; + } + } + } + // Generate new and save let seed = wzp_crypto::Seed::generate(); let id = seed.derive_identity(); let fp = id.public_identity().fingerprint; - info!(fingerprint = %fp, "generated ephemeral identity"); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).ok(); + } + // Encode seed as hex manually (avoid dep on `hex` crate in binary) + let hex_str: String = seed.0.iter().map(|b| format!("{b:02x}")).collect(); + std::fs::write(&path, hex_str).ok(); + info!(fingerprint = %fp, path = %path.display(), "generated and saved new identity"); seed } } @@ -86,6 +116,8 @@ fn parse_args() -> CliArgs { let mut seed_hex = None; let mut mnemonic = None; let mut room = None; + let mut raw_room = false; + let mut alias = None; let mut token = None; let mut metrics_file = None; let mut relay_str = None; @@ -130,6 +162,11 @@ fn parse_args() -> CliArgs { i += 1; room = Some(args.get(i).expect("--room requires a name").to_string()); } + "--raw-room" => raw_room = true, + "--alias" => { + i += 1; + alias = Some(args.get(i).expect("--alias requires a name").to_string()); + } "--token" => { i += 1; token = Some(args.get(i).expect("--token requires a value").to_string()); @@ -183,10 +220,13 @@ fn parse_args() -> CliArgs { eprintln!(" --seed Identity seed (64 hex chars, featherChat compatible)"); eprintln!(" --mnemonic Identity seed as BIP39 mnemonic (24 words)"); eprintln!(" --room Room name (hashed for privacy before sending)"); + eprintln!(" --raw-room Send room name as-is (no hash, for Android compat)"); + eprintln!(" --alias Display name shown to other participants"); eprintln!(" --token featherChat bearer token for relay auth"); eprintln!(" --metrics-file Write JSONL telemetry to file (1 line/sec)"); eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)"); eprintln!(); + eprintln!("Identity is auto-saved to ~/.wzp/identity on first run."); eprintln!("Default relay: 127.0.0.1:4433"); std::process::exit(0); } @@ -219,6 +259,8 @@ fn parse_args() -> CliArgs { seed_hex, mnemonic, room, + raw_room, + alias, token, _metrics_file: metrics_file, } @@ -250,8 +292,14 @@ async fn main() -> anyhow::Result<()> { "WarzonePhone client" ); - // Hash room name for SNI privacy (or "default" if none specified) + // Compute SNI from room name. + // --raw-room sends the name as-is (for Android compat — Android doesn't hash). + // Default behaviour hashes for privacy. let sni = match &cli.room { + Some(name) if cli.raw_room => { + info!(room = %name, "using raw room name as SNI (no hash)"); + name.clone() + } Some(name) => { let hashed = wzp_crypto::hash_room_name(name); info!(room = %name, hashed = %hashed, "room name hashed for SNI"); @@ -293,7 +341,7 @@ async fn main() -> anyhow::Result<()> { if cli.live { #[cfg(feature = "audio")] { - return run_live(transport).await; + return run_live(transport, cli.alias).await; } #[cfg(not(feature = "audio"))] { @@ -548,78 +596,233 @@ async fn run_file_mode( } /// Live mode: capture from mic, encode, send; receive, decode, play. +/// +/// Architecture (mirrors wzp-android/engine.rs): +/// CPAL capture callback → AudioRing → send task (5ms poll) → QUIC +/// QUIC → recv task → jitter buffer → decode tick (20ms) → AudioRing → CPAL playback callback +/// +/// All lock-free: CPAL callbacks use atomic ring buffers, no Mutex on the audio path. #[cfg(feature = "audio")] -async fn run_live(transport: Arc) -> anyhow::Result<()> { +async fn run_live( + transport: Arc, + alias: Option, +) -> anyhow::Result<()> { + use std::sync::Arc as StdArc; + use std::sync::atomic::{AtomicBool, Ordering}; use wzp_client::audio_io::{AudioCapture, AudioPlayback}; + use wzp_client::call::JitterTelemetry; + + // Send alias to relay so other participants can see our display name + if let Some(ref name) = alias { + let msg = wzp_proto::SignalMessage::SetAlias { alias: name.clone() }; + transport.send_signal(&msg).await?; + info!(alias = %name, "alias sent to relay"); + } let capture = AudioCapture::start()?; let playback = AudioPlayback::start()?; - info!("Audio I/O started — press Ctrl+C to stop"); + info!("audio I/O started (lock-free ring buffers) — press Ctrl+C to stop"); + let capture_ring = capture.ring().clone(); + let playout_ring = playback.ring().clone(); + + let running = StdArc::new(AtomicBool::new(true)); + + // --- Signal handler: set running=false on first Ctrl+C, force-quit on second --- + let signal_running = running.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + eprintln!(); // newline after ^C + info!("Ctrl+C received, shutting down..."); + signal_running.store(false, Ordering::SeqCst); + + tokio::signal::ctrl_c().await.ok(); + eprintln!("\nForce quit"); + std::process::exit(1); + }); + + let config = CallConfig::default(); + + // --- Send task: poll capture ring → encode → send via async --- let send_transport = transport.clone(); - let rt_handle = tokio::runtime::Handle::current(); - let send_handle = std::thread::Builder::new() - .name("wzp-send-loop".into()) - .spawn(move || { - let config = CallConfig::default(); - let mut encoder = CallEncoder::new(&config); - loop { - let frame = match capture.read_frame() { - Some(f) => f, - None => break, - }; - let packets = match encoder.encode_frame(&frame) { - Ok(p) => p, - Err(e) => { - error!("encode error: {e}"); - continue; - } - }; - for pkt in &packets { - if let Err(e) = rt_handle.block_on(send_transport.send_media(pkt)) { - error!("send error: {e}"); - return; - } + let send_running = running.clone(); + let send_task = async move { + let mut encoder = CallEncoder::new(&config); + let mut capture_buf = vec![0i16; FRAME_SAMPLES]; + let mut frames_sent: u64 = 0; + + loop { + if !send_running.load(Ordering::Relaxed) { + break; + } + + let avail = capture_ring.available(); + if avail < FRAME_SAMPLES { + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + continue; + } + + let read = capture_ring.read(&mut capture_buf); + if read < FRAME_SAMPLES { + continue; + } + + let packets = match encoder.encode_frame(&capture_buf) { + Ok(p) => p, + Err(e) => { + error!("encode error: {e}"); + continue; + } + }; + + for pkt in &packets { + if let Err(e) = send_transport.send_media(pkt).await { + error!("send error: {e}"); + return; } } - })?; + frames_sent += 1; + if frames_sent == 1 || frames_sent % 500 == 0 { + info!(frames_sent, "send progress"); + } + } + }; + + // --- Recv task: receive packets → ingest into jitter buffer --- + // Uses timeout so it can check the running flag and exit on Ctrl+C. let recv_transport = transport.clone(); - let recv_handle = tokio::spawn(async move { - let config = CallConfig::default(); - let mut decoder = CallDecoder::new(&config); - let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; + let recv_running = running.clone(); + let config = CallConfig::default(); + let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config))); + let decoder_recv = decoder.clone(); + + let recv_task = async move { + let mut packets_received: u64 = 0; loop { - match recv_transport.recv_media().await { - Ok(Some(pkt)) => { - let is_repair = pkt.header.is_repair; - decoder.ingest(pkt); - // Only decode for source packets (1 source = 1 audio frame). - // Repair packets feed the FEC decoder but don't produce audio. - if !is_repair { - if let Some(_n) = decoder.decode_next(&mut pcm_buf) { - playback.write_frame(&pcm_buf); - } + if !recv_running.load(Ordering::Relaxed) { + break; + } + // Timeout so we can check running flag periodically + let result = tokio::time::timeout( + std::time::Duration::from_millis(100), + recv_transport.recv_media(), + ) + .await; + match result { + Ok(Ok(Some(pkt))) => { + let mut dec = decoder_recv.lock().await; + dec.ingest(pkt); + packets_received += 1; + if packets_received == 1 || packets_received % 500 == 0 { + info!(packets_received, depth = dec.stats().current_depth, "recv progress"); } } - Ok(None) => { + Ok(Ok(None)) => { info!("connection closed"); break; } - Err(e) => { + Ok(Err(e)) => { error!("recv error: {e}"); break; } + Err(_) => {} // timeout — loop and check running flag } } - }); + }; - tokio::signal::ctrl_c().await?; - info!("Shutting down..."); + // --- Playout tick: decode from jitter buffer at steady 20ms intervals --- + let playout_running = running.clone(); + let decoder_playout = decoder.clone(); + let playout_task = async move { + let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; + let mut interval = tokio::time::interval(std::time::Duration::from_millis(20)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut telemetry = JitterTelemetry::new(5); + loop { + interval.tick().await; + if !playout_running.load(Ordering::Relaxed) { + break; + } - recv_handle.abort(); - drop(send_handle); - transport.close().await?; - info!("done"); + let mut dec = decoder_playout.lock().await; + + // Drain ready frames from jitter buffer into playout ring. + let mut decoded_this_tick = 0; + while let Some(n) = dec.decode_next(&mut pcm_buf) { + playout_ring.write(&pcm_buf[..n]); + decoded_this_tick += 1; + if decoded_this_tick >= 2 { + break; // Don't drain too aggressively in one tick + } + } + + telemetry.maybe_log(dec.stats()); + } + }; + + // --- Signal task: listen for RoomUpdate and display presence --- + let signal_transport = transport.clone(); + let signal_running = running.clone(); + let signal_task = async move { + loop { + if !signal_running.load(Ordering::Relaxed) { + break; + } + let result = tokio::time::timeout( + std::time::Duration::from_millis(200), + signal_transport.recv_signal(), + ) + .await; + match result { + Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate { count, participants }))) => { + info!(count, "room update"); + for p in &participants { + let name = p + .alias + .as_deref() + .unwrap_or("(no alias)"); + let fp = if p.fingerprint.is_empty() { + "(no fingerprint)" + } else { + &p.fingerprint + }; + info!(" participant: {name} [{fp}]"); + } + } + Ok(Ok(Some(msg))) => { + info!("signal: {:?}", std::mem::discriminant(&msg)); + } + Ok(Ok(None)) => { + info!("signal stream closed"); + break; + } + Ok(Err(e)) => { + error!("signal recv error: {e}"); + break; + } + Err(_) => {} // timeout — loop and check running flag + } + } + }; + + // --- Run all tasks, exit when any finishes (or running flag cleared by Ctrl+C) --- + tokio::select! { + _ = send_task => info!("send task ended"), + _ = recv_task => info!("recv task ended"), + _ = playout_task => info!("playout task ended"), + _ = signal_task => info!("signal task ended"), + } + + running.store(false, Ordering::SeqCst); + capture.stop(); + playback.stop(); + + // Give transport 2s to close gracefully, then bail + match tokio::time::timeout(std::time::Duration::from_secs(2), transport.close()).await { + Ok(Ok(())) => info!("done"), + Ok(Err(e)) => info!("close error (non-fatal): {e}"), + Err(_) => info!("close timed out, exiting anyway"), + } Ok(()) } diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index 677e388..91a202e 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -110,6 +110,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse + SignalMessage::SetAlias { .. } => CallSignalType::Offer, // reuse } } diff --git a/crates/wzp-client/src/lib.rs b/crates/wzp-client/src/lib.rs index 8afe631..fece291 100644 --- a/crates/wzp-client/src/lib.rs +++ b/crates/wzp-client/src/lib.rs @@ -8,6 +8,8 @@ #[cfg(feature = "audio")] pub mod audio_io; +#[cfg(feature = "audio")] +pub mod audio_ring; pub mod bench; pub mod call; pub mod drift_test; diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index 807efab..f696b13 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -653,6 +653,11 @@ pub enum SignalMessage { /// List of participants currently in the room. participants: Vec, }, + + /// Set or update the client's display name. + /// Sent by client after joining; relay updates the participant entry and + /// re-broadcasts a RoomUpdate to all participants. + SetAlias { alias: String }, } /// A participant entry in a RoomUpdate message. diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 616538f..73b7891 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -141,6 +141,17 @@ impl Room { self.participants.iter().map(|p| p.sender.clone()).collect() } + /// Update a participant's alias. Returns true if the participant was found. + fn set_alias(&mut self, id: ParticipantId, alias: String) -> bool { + if let Some(p) = self.participants.iter_mut().find(|p| p.id == id) { + info!(participant = id, %alias, "alias updated"); + p.alias = Some(alias); + true + } else { + false + } + } + fn is_empty(&self) -> bool { self.participants.is_empty() } @@ -255,6 +266,26 @@ impl RoomManager { } } + /// Update a participant's alias and return a RoomUpdate + senders for broadcasting. + pub fn set_alias( + &mut self, + room_name: &str, + participant_id: ParticipantId, + alias: String, + ) -> Option<(wzp_proto::SignalMessage, Vec)> { + if let Some(room) = self.rooms.get_mut(room_name) { + if room.set_alias(participant_id, alias) { + let update = wzp_proto::SignalMessage::RoomUpdate { + count: room.len() as u32, + participants: room.participant_list(), + }; + let senders = room.all_senders(); + return Some((update, senders)); + } + } + None + } + /// Get senders for all OTHER participants in a room. pub fn others( &self, @@ -374,68 +405,111 @@ async fn run_participant_plain( session_id: &str, ) { let addr = transport.connection().remote_address(); - let mut packets_forwarded = 0u64; - loop { - let pkt = match transport.recv_media().await { - Ok(Some(pkt)) => pkt, - Ok(None) => { - info!(%addr, participant = participant_id, "disconnected"); - break; - } - Err(e) => { - let msg = e.to_string(); - if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") { - info!(%addr, participant = participant_id, "connection closed: {e}"); - } else { - error!(%addr, participant = participant_id, "recv error: {e}"); + // Media forwarding task + let media_room_mgr = room_mgr.clone(); + let media_room_name = room_name.clone(); + let media_transport = transport.clone(); + let media_metrics = metrics.clone(); + let media_session_id = session_id.to_string(); + let media_task = async move { + let mut packets_forwarded = 0u64; + loop { + let pkt = match media_transport.recv_media().await { + Ok(Some(pkt)) => pkt, + Ok(None) => { + info!(%addr, participant = participant_id, "disconnected"); + break; } - break; - } - }; - - // Update per-session quality metrics if a quality report is present - if let Some(ref report) = pkt.quality_report { - metrics.update_session_quality(session_id, report); - } - - // Get current list of other participants - let others = { - let mgr = room_mgr.lock().await; - mgr.others(&room_name, participant_id) - }; - - // Forward to all others - let pkt_bytes = pkt.payload.len() as u64; - for other in &others { - match other { - ParticipantSender::Quic(t) => { - let _ = t.send_media(&pkt).await; + Err(e) => { + let msg = e.to_string(); + if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") { + info!(%addr, participant = participant_id, "connection closed: {e}"); + } else { + error!(%addr, participant = participant_id, "recv error: {e}"); + } + break; } - ParticipantSender::WebSocket(_) => { - // WS clients receive raw payload bytes - let _ = other.send_raw(&pkt.payload).await; - } - } - } - - let fan_out = others.len() as u64; - metrics.packets_forwarded.inc_by(fan_out); - metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); - packets_forwarded += 1; - if packets_forwarded % 500 == 0 { - let room_size = { - let mgr = room_mgr.lock().await; - mgr.room_size(&room_name) }; - info!( - room = %room_name, - participant = participant_id, - forwarded = packets_forwarded, - room_size, - "participant stats" - ); + + if let Some(ref report) = pkt.quality_report { + media_metrics.update_session_quality(&media_session_id, report); + } + + let others = { + let mgr = media_room_mgr.lock().await; + mgr.others(&media_room_name, participant_id) + }; + + let pkt_bytes = pkt.payload.len() as u64; + for other in &others { + match other { + ParticipantSender::Quic(t) => { + let _ = t.send_media(&pkt).await; + } + ParticipantSender::WebSocket(_) => { + let _ = other.send_raw(&pkt.payload).await; + } + } + } + + let fan_out = others.len() as u64; + media_metrics.packets_forwarded.inc_by(fan_out); + media_metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); + packets_forwarded += 1; + if packets_forwarded % 500 == 0 { + let room_size = { + let mgr = media_room_mgr.lock().await; + mgr.room_size(&media_room_name) + }; + info!( + room = %media_room_name, + participant = participant_id, + forwarded = packets_forwarded, + room_size, + "participant stats" + ); + } } + }; + + // Signal handling task — processes SetAlias and other in-call signals + let signal_room_mgr = room_mgr.clone(); + let signal_room_name = room_name.clone(); + let signal_transport = transport.clone(); + let signal_task = async move { + loop { + match signal_transport.recv_signal().await { + Ok(Some(wzp_proto::SignalMessage::SetAlias { alias })) => { + info!(%addr, participant = participant_id, %alias, "SetAlias received"); + let mut mgr = signal_room_mgr.lock().await; + if let Some((update, senders)) = + mgr.set_alias(&signal_room_name, participant_id, alias) + { + drop(mgr); + broadcast_signal(&senders, &update).await; + } + } + Ok(Some(wzp_proto::SignalMessage::Hangup { .. })) => { + info!(%addr, participant = participant_id, "hangup received"); + break; + } + Ok(Some(msg)) => { + info!(%addr, participant = participant_id, "signal: {:?}", std::mem::discriminant(&msg)); + } + Ok(None) => break, + Err(e) => { + warn!(%addr, participant = participant_id, "signal recv error: {e}"); + break; + } + } + } + }; + + // Run both in parallel — exit when either finishes (disconnection) + tokio::select! { + _ = media_task => {} + _ = signal_task => {} } // Clean up — leave room and broadcast update to remaining participants