From 2263e898e587e33787dd314086fcd849a885a22a Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 6 Apr 2026 13:42:33 +0400 Subject: [PATCH] fix: port AudioRing reader-detects-lap fix to desktop client Same fix as Android (4af7c5f): writer never touches read_pos, reader self-corrects when lapped. Power-of-2 capacity (16384), bitmask indexing, overflow/underrun counters. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/audio_ring.rs | 105 ++++++++++++++++++---------- 1 file changed, 69 insertions(+), 36 deletions(-) diff --git a/crates/wzp-client/src/audio_ring.rs b/crates/wzp-client/src/audio_ring.rs index fac1479..e6eeae0 100644 --- a/crates/wzp-client/src/audio_ring.rs +++ b/crates/wzp-client/src/audio_ring.rs @@ -1,89 +1,122 @@ -//! Lock-free SPSC ring buffer for audio PCM transfer between -//! CPAL audio callbacks and the Rust engine. +//! Lock-free SPSC ring buffer — "Reader-Detects-Lap" architecture. //! -//! Identical design to wzp-android's audio_ring: the producer writes and -//! advances a write cursor, the consumer reads and advances a read cursor. -//! Both cursors are atomic — no mutex, no blocking on the audio thread. +//! SPSC invariant: the producer ONLY writes `write_pos`, the consumer +//! ONLY writes `read_pos`. Neither thread touches the other's cursor. +//! +//! On overflow (writer laps the reader), the writer simply overwrites +//! old buffer data. The reader detects the lap via `available() > +//! RING_CAPACITY` and snaps its own `read_pos` forward. +//! +//! Capacity is a power of 2 for bitmask indexing (no modulo). -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -/// Ring buffer capacity in i16 samples. -/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono. -const RING_CAPACITY: usize = 960 * 10; +/// Ring buffer capacity — power of 2 for bitmask indexing. +/// 16384 samples = 341.3ms at 48kHz mono. +const RING_CAPACITY: usize = 16384; // 2^14 +const RING_MASK: usize = RING_CAPACITY - 1; /// Lock-free single-producer single-consumer ring buffer for i16 PCM samples. pub struct AudioRing { buf: Box<[i16; RING_CAPACITY]>, + /// Monotonically increasing write cursor. ONLY written by producer. write_pos: AtomicUsize, + /// Monotonically increasing read cursor. ONLY written by consumer. read_pos: AtomicUsize, + /// Incremented by reader when it detects it was lapped (overflow). + overflow_count: AtomicU64, + /// Incremented by reader when ring is empty (underrun). + underrun_count: AtomicU64, } -// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads. -// The atomics ensure visibility. The buffer itself is never accessed -// from the same index by both threads simultaneously because the -// producer only writes to positions between write_pos and read_pos, -// and the consumer only reads from positions between read_pos and write_pos. +// SAFETY: AudioRing is SPSC — one thread writes (producer), one reads (consumer). +// The producer only writes write_pos. The consumer only writes read_pos. +// Neither thread writes the other's cursor. Buffer indices are derived from +// the owning thread's cursor, ensuring no concurrent access to the same index. unsafe impl Send for AudioRing {} unsafe impl Sync for AudioRing {} impl AudioRing { pub fn new() -> Self { + debug_assert!(RING_CAPACITY.is_power_of_two()); Self { buf: Box::new([0i16; RING_CAPACITY]), write_pos: AtomicUsize::new(0), read_pos: AtomicUsize::new(0), + overflow_count: AtomicU64::new(0), + underrun_count: AtomicU64::new(0), } } - /// Number of samples available to read. + /// Number of samples available to read (clamped to capacity). pub fn available(&self) -> usize { let w = self.write_pos.load(Ordering::Acquire); - let r = self.read_pos.load(Ordering::Acquire); - w.wrapping_sub(r) + let r = self.read_pos.load(Ordering::Relaxed); + w.wrapping_sub(r).min(RING_CAPACITY) } /// Write samples into the ring. Returns number of samples written. - /// Drops oldest samples if the ring is full. + /// + /// If the ring is full, old data is silently overwritten. The reader + /// will detect the lap and self-correct. The writer NEVER touches + /// `read_pos`. pub fn write(&self, samples: &[i16]) -> usize { - let w = self.write_pos.load(Ordering::Relaxed); let count = samples.len().min(RING_CAPACITY); + let w = self.write_pos.load(Ordering::Relaxed); for i in 0..count { - let idx = (w + i) % RING_CAPACITY; unsafe { let ptr = self.buf.as_ptr() as *mut i16; - *ptr.add(idx) = samples[i]; + *ptr.add((w + i) & RING_MASK) = samples[i]; } } self.write_pos .store(w.wrapping_add(count), Ordering::Release); - - // If we overwrote unread data, advance read_pos - if self.available() > RING_CAPACITY { - let new_read = self - .write_pos - .load(Ordering::Relaxed) - .wrapping_sub(RING_CAPACITY); - self.read_pos.store(new_read, Ordering::Release); - } - count } /// Read samples from the ring into `out`. Returns number of samples read. + /// + /// If the writer has lapped the reader (overflow), `read_pos` is snapped + /// forward to the oldest valid data. pub fn read(&self, out: &mut [i16]) -> usize { - let avail = self.available(); - let count = out.len().min(avail); + let w = self.write_pos.load(Ordering::Acquire); + let mut r = self.read_pos.load(Ordering::Relaxed); + + let mut avail = w.wrapping_sub(r); + + // Lap detection: writer has overwritten our unread data. + if avail > RING_CAPACITY { + r = w.wrapping_sub(RING_CAPACITY); + avail = RING_CAPACITY; + self.overflow_count.fetch_add(1, Ordering::Relaxed); + } + + let count = out.len().min(avail); + if count == 0 { + if w == r { + self.underrun_count.fetch_add(1, Ordering::Relaxed); + } + return 0; + } - let r = self.read_pos.load(Ordering::Relaxed); for i in 0..count { - let idx = (r + i) % RING_CAPACITY; - out[i] = unsafe { *self.buf.as_ptr().add(idx) }; + out[i] = unsafe { *self.buf.as_ptr().add((r + i) & RING_MASK) }; } self.read_pos .store(r.wrapping_add(count), Ordering::Release); count } + + /// Number of overflow events (reader was lapped by writer). + pub fn overflow_count(&self) -> u64 { + self.overflow_count.load(Ordering::Relaxed) + } + + /// Number of underrun events (reader found empty buffer). + pub fn underrun_count(&self) -> u64 { + self.underrun_count.load(Ordering::Relaxed) + } }