From 4af7c5f94ccc1de04b14eaf9bdfabe983abc9855 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 6 Apr 2026 13:28:34 +0400 Subject: [PATCH] fix: AudioRing cursor desync + capture thread use-after-free MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AudioRing (reader-detects-lap architecture): - Writer NEVER touches read_pos — fixes SPSC invariant violation - Reader self-corrects when lapped (snaps read_pos forward) - Power-of-2 capacity (16384 = 341ms) with bitmask indexing - Added overflow_count and underrun_count diagnostics - Wired ring health into engine stats and periodic logging Capture thread use-after-free (drain latch): - Added CountDownLatch(2) to AudioPipeline - Audio threads count down after exiting their loops - teardown() awaits latch (200ms timeout) before destroy() - Guarantees no in-flight JNI calls when native handle is freed - stopAudio() no longer nulls pipeline (teardown handles it) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/java/com/wzp/audio/AudioPipeline.kt | 22 +++- .../java/com/wzp/ui/call/CallViewModel.kt | 14 ++- crates/wzp-android/src/audio_ring.rs | 111 ++++++++++++------ crates/wzp-android/src/engine.rs | 6 + crates/wzp-android/src/stats.rs | 6 + 5 files changed, 117 insertions(+), 42 deletions(-) diff --git a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt index 7126f66..9223e50 100644 --- a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt +++ b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt @@ -19,6 +19,8 @@ import java.io.FileOutputStream import java.io.OutputStreamWriter import java.nio.ByteBuffer import java.nio.ByteOrder +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import kotlin.math.pow import kotlin.math.sqrt @@ -58,6 +60,9 @@ class AudioPipeline(private val context: Context) { var debugRecording: Boolean = true private var captureThread: Thread? = null private var playoutThread: Thread? = null + /** Latch counted down by each audio thread after exiting its loop. + * stop() does NOT wait on this — teardown waits via awaitDrain(). */ + private var drainLatch: CountDownLatch? = null private val debugDir: File by lazy { File(context.cacheDir, "wzp_debug").also { it.mkdirs() } @@ -66,9 +71,11 @@ class AudioPipeline(private val context: Context) { fun start(engine: WzpEngine) { if (running) return running = true + drainLatch = CountDownLatch(2) // one for capture, one for playout captureThread = Thread({ runCapture(engine) + drainLatch?.countDown() // signal: capture loop exited, no more JNI calls // Park thread forever — exiting triggers a libcrypto TLS destructor // crash (SIGSEGV in OPENSSL_free) on Android when a JNI-calling thread exits. parkThread() @@ -80,6 +87,7 @@ class AudioPipeline(private val context: Context) { playoutThread = Thread({ runPlayout(engine) + drainLatch?.countDown() // signal: playout loop exited parkThread() }, "wzp-playout").apply { isDaemon = true @@ -92,10 +100,20 @@ class AudioPipeline(private val context: Context) { fun stop() { running = false - // Don't join — threads are parked as daemons to avoid native TLS crash + // Don't join threads — they are parked as daemons to avoid native TLS crash. + // Don't null thread refs or drainLatch — teardown() needs awaitDrain(). + Log.i(TAG, "audio pipeline stopped (running=false)") + } + + /** Block until both audio threads have exited their loops (max 200ms). + * After this returns, no more JNI calls to the engine will be made. */ + fun awaitDrain(): Boolean { + val ok = drainLatch?.await(200, TimeUnit.MILLISECONDS) ?: true + if (!ok) Log.w(TAG, "awaitDrain: audio threads did not drain in 200ms") captureThread = null playoutThread = null - Log.i(TAG, "audio pipeline stopped") + drainLatch = null + return ok } private fun applyGain(pcm: ShortArray, count: Int, db: Float) { diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt index 30bd7e4..9cf1534 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt @@ -254,8 +254,17 @@ class CallViewModel : ViewModel(), WzpCallback { Log.i(TAG, "teardown: stopping audio, stopService=$stopService") val hadCall = audioStarted CallService.onStopFromNotification = null - stopAudio() + stopAudio() // sets running=false (non-blocking) stopStatsPolling() + + // Wait for audio threads to exit their loops before destroying the engine. + // This guarantees no in-flight JNI calls to writeAudio/readAudio. + val drained = audioPipeline?.awaitDrain() ?: true + if (!drained) { + Log.w(TAG, "teardown: audio threads did not drain in time") + } + audioPipeline = null + Log.i(TAG, "teardown: stopping engine") try { engine?.stopCall() } catch (e: Exception) { Log.w(TAG, "stopCall err: $e") } try { engine?.destroy() } catch (e: Exception) { Log.w(TAG, "destroy err: $e") } @@ -399,8 +408,7 @@ class CallViewModel : ViewModel(), WzpCallback { private fun stopAudio() { if (!audioStarted) return - audioPipeline?.stop() - audioPipeline = null + audioPipeline?.stop() // sets running=false; DON'T null — teardown needs awaitDrain() audioRouteManager?.unregister() audioRouteManager?.setSpeaker(false) _isSpeaker.value = false diff --git a/crates/wzp-android/src/audio_ring.rs b/crates/wzp-android/src/audio_ring.rs index fbb881c..897fe8f 100644 --- a/crates/wzp-android/src/audio_ring.rs +++ b/crates/wzp-android/src/audio_ring.rs @@ -1,91 +1,128 @@ -//! Lock-free SPSC ring buffers for audio PCM transfer between -//! Kotlin AudioRecord/AudioTrack threads and the Rust engine. +//! Lock-free SPSC ring buffer — "Reader-Detects-Lap" architecture. //! -//! These use a simple spin-free design: the producer writes and advances -//! a write cursor, the consumer reads and advances a read cursor. -//! Both cursors are atomic so no mutex is needed. +//! SPSC invariant: the producer ONLY writes `write_pos`, the consumer +//! ONLY writes `read_pos`. Neither thread touches the other's cursor. +//! +//! On overflow (writer laps the reader), the writer simply overwrites +//! old buffer data. The reader detects the lap via `available() > +//! RING_CAPACITY` and snaps its own `read_pos` forward. +//! +//! Capacity is a power of 2 for bitmask indexing (no modulo). -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -/// Ring buffer capacity in i16 samples. -/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono. -const RING_CAPACITY: usize = 960 * 10; +/// Ring buffer capacity — power of 2 for bitmask indexing. +/// 16384 samples = 341.3ms at 48kHz mono. 70% more headroom +/// than the previous 9600 (200ms) for surviving Android GC pauses. +const RING_CAPACITY: usize = 16384; // 2^14 +const RING_MASK: usize = RING_CAPACITY - 1; /// Lock-free single-producer single-consumer ring buffer for i16 PCM samples. pub struct AudioRing { buf: Box<[i16; RING_CAPACITY]>, + /// Monotonically increasing write cursor. ONLY written by producer. write_pos: AtomicUsize, + /// Monotonically increasing read cursor. ONLY written by consumer. read_pos: AtomicUsize, + /// Incremented by reader when it detects it was lapped (overflow). + overflow_count: AtomicU64, + /// Incremented by reader when ring is empty (underrun). + underrun_count: AtomicU64, } -// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads. -// The atomics ensure visibility. The buffer itself is never accessed -// from the same index by both threads simultaneously because the -// producer only writes to positions between write_pos and read_pos, -// and the consumer only reads from positions between read_pos and write_pos. +// SAFETY: AudioRing is SPSC — one thread writes (producer), one reads (consumer). +// The producer only writes write_pos. The consumer only writes read_pos. +// Neither thread writes the other's cursor. Buffer indices are derived from +// the owning thread's cursor, ensuring no concurrent access to the same index. unsafe impl Send for AudioRing {} unsafe impl Sync for AudioRing {} impl AudioRing { pub fn new() -> Self { + debug_assert!(RING_CAPACITY.is_power_of_two()); Self { buf: Box::new([0i16; RING_CAPACITY]), write_pos: AtomicUsize::new(0), read_pos: AtomicUsize::new(0), + overflow_count: AtomicU64::new(0), + underrun_count: AtomicU64::new(0), } } - /// Number of samples available to read. + /// Number of samples available to read (clamped to capacity). pub fn available(&self) -> usize { let w = self.write_pos.load(Ordering::Acquire); - let r = self.read_pos.load(Ordering::Acquire); - w.wrapping_sub(r) + let r = self.read_pos.load(Ordering::Relaxed); + w.wrapping_sub(r).min(RING_CAPACITY) } - /// Number of samples that can be written without overwriting. + /// Number of samples that can be written without overwriting unread data. pub fn free_space(&self) -> usize { - RING_CAPACITY - self.available() + RING_CAPACITY.saturating_sub(self.available()) } /// Write samples into the ring. Returns number of samples written. - /// Drops oldest samples if the ring is full. + /// + /// If the ring is full, old data is silently overwritten. The reader + /// will detect the lap and self-correct. The writer NEVER touches + /// `read_pos` — this is the key invariant that prevents cursor desync. pub fn write(&self, samples: &[i16]) -> usize { - let w = self.write_pos.load(Ordering::Relaxed); let count = samples.len().min(RING_CAPACITY); + let w = self.write_pos.load(Ordering::Relaxed); for i in 0..count { - let idx = (w + i) % RING_CAPACITY; - // SAFETY: We're the only writer, and the reader won't read - // past read_pos which we haven't advanced past yet. unsafe { let ptr = self.buf.as_ptr() as *mut i16; - *ptr.add(idx) = samples[i]; + *ptr.add((w + i) & RING_MASK) = samples[i]; } } self.write_pos.store(w.wrapping_add(count), Ordering::Release); - - // If we overwrote unread data, advance read_pos - if self.available() > RING_CAPACITY { - let new_read = self.write_pos.load(Ordering::Relaxed).wrapping_sub(RING_CAPACITY); - self.read_pos.store(new_read, Ordering::Release); - } - count } /// Read samples from the ring into `out`. Returns number of samples read. + /// + /// If the writer has lapped the reader (overflow), `read_pos` is snapped + /// forward to the oldest valid data. This is safe because only the + /// reader thread writes `read_pos`. pub fn read(&self, out: &mut [i16]) -> usize { - let avail = self.available(); - let count = out.len().min(avail); + let w = self.write_pos.load(Ordering::Acquire); + let mut r = self.read_pos.load(Ordering::Relaxed); + + let mut avail = w.wrapping_sub(r); + + // Lap detection: writer has overwritten our unread data. + // Snap read_pos forward to oldest valid data in the buffer. + if avail > RING_CAPACITY { + r = w.wrapping_sub(RING_CAPACITY); + avail = RING_CAPACITY; + self.overflow_count.fetch_add(1, Ordering::Relaxed); + } + + let count = out.len().min(avail); + if count == 0 { + if w == r { + self.underrun_count.fetch_add(1, Ordering::Relaxed); + } + return 0; + } - let r = self.read_pos.load(Ordering::Relaxed); for i in 0..count { - let idx = (r + i) % RING_CAPACITY; - out[i] = unsafe { *self.buf.as_ptr().add(idx) }; + out[i] = unsafe { *self.buf.as_ptr().add((r + i) & RING_MASK) }; } self.read_pos.store(r.wrapping_add(count), Ordering::Release); count } + + /// Number of overflow events (reader was lapped by writer). + pub fn overflow_count(&self) -> u64 { + self.overflow_count.load(Ordering::Relaxed) + } + + /// Number of underrun events (reader found empty buffer). + pub fn underrun_count(&self) -> u64 { + self.underrun_count.load(Ordering::Relaxed) + } } diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index ea20fb6..1041324 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -183,6 +183,9 @@ impl WzpEngine { stats.duration_secs = start.elapsed().as_secs_f64(); } stats.audio_level = self.state.audio_level_rms.load(Ordering::Relaxed); + stats.playout_overflows = self.state.playout_ring.overflow_count(); + stats.playout_underruns = self.state.playout_ring.underrun_count(); + stats.capture_overflows = self.state.capture_ring.overflow_count(); stats } @@ -476,6 +479,7 @@ async fn run_call( frames_dropped, send_errors, ring_avail = state.capture_ring.available(), + capture_overflows = state.capture_ring.overflow_count(), "send stats" ); last_stats_log = Instant::now(); @@ -578,6 +582,8 @@ async fn run_call( recv_errors, max_recv_gap_ms, playout_avail = state.playout_ring.available(), + playout_overflows = state.playout_ring.overflow_count(), + playout_underruns = state.playout_ring.underrun_count(), "recv stats" ); max_recv_gap_ms = 0; diff --git a/crates/wzp-android/src/stats.rs b/crates/wzp-android/src/stats.rs index 49ea3c7..bc4d496 100644 --- a/crates/wzp-android/src/stats.rs +++ b/crates/wzp-android/src/stats.rs @@ -51,6 +51,12 @@ pub struct CallStats { pub underruns: u64, /// Frames recovered by FEC. pub fec_recovered: u64, + /// Playout ring overflow count (reader was lapped by writer). + pub playout_overflows: u64, + /// Playout ring underrun count (reader found empty buffer). + pub playout_underruns: u64, + /// Capture ring overflow count. + pub capture_overflows: u64, /// Current mic audio level (RMS of i16 samples, 0-32767). pub audio_level: u32, /// Number of participants in the room (from last RoomUpdate).