fix: AudioRing cursor desync + capture thread use-after-free
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m56s

AudioRing (reader-detects-lap architecture):
- Writer NEVER touches read_pos — fixes SPSC invariant violation
- Reader self-corrects when lapped (snaps read_pos forward)
- Power-of-2 capacity (16384 = 341ms) with bitmask indexing
- Added overflow_count and underrun_count diagnostics
- Wired ring health into engine stats and periodic logging

Capture thread use-after-free (drain latch):
- Added CountDownLatch(2) to AudioPipeline
- Audio threads count down after exiting their loops
- teardown() awaits latch (200ms timeout) before destroy()
- Guarantees no in-flight JNI calls when native handle is freed
- stopAudio() no longer nulls pipeline (teardown handles it)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-06 13:28:34 +04:00
parent 6597b5bd86
commit 4af7c5f94c
5 changed files with 117 additions and 42 deletions

View File

@@ -19,6 +19,8 @@ import java.io.FileOutputStream
import java.io.OutputStreamWriter
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.math.pow
import kotlin.math.sqrt
@@ -58,6 +60,9 @@ class AudioPipeline(private val context: Context) {
var debugRecording: Boolean = true
private var captureThread: Thread? = null
private var playoutThread: Thread? = null
/** Latch counted down by each audio thread after exiting its loop.
* stop() does NOT wait on this — teardown waits via awaitDrain(). */
private var drainLatch: CountDownLatch? = null
private val debugDir: File by lazy {
File(context.cacheDir, "wzp_debug").also { it.mkdirs() }
@@ -66,9 +71,11 @@ class AudioPipeline(private val context: Context) {
fun start(engine: WzpEngine) {
if (running) return
running = true
drainLatch = CountDownLatch(2) // one for capture, one for playout
captureThread = Thread({
runCapture(engine)
drainLatch?.countDown() // signal: capture loop exited, no more JNI calls
// Park thread forever — exiting triggers a libcrypto TLS destructor
// crash (SIGSEGV in OPENSSL_free) on Android when a JNI-calling thread exits.
parkThread()
@@ -80,6 +87,7 @@ class AudioPipeline(private val context: Context) {
playoutThread = Thread({
runPlayout(engine)
drainLatch?.countDown() // signal: playout loop exited
parkThread()
}, "wzp-playout").apply {
isDaemon = true
@@ -92,10 +100,20 @@ class AudioPipeline(private val context: Context) {
fun stop() {
running = false
// Don't join threads are parked as daemons to avoid native TLS crash
// Don't join threads — they are parked as daemons to avoid native TLS crash.
// Don't null thread refs or drainLatch — teardown() needs awaitDrain().
Log.i(TAG, "audio pipeline stopped (running=false)")
}
/** Block until both audio threads have exited their loops (max 200ms).
* After this returns, no more JNI calls to the engine will be made. */
fun awaitDrain(): Boolean {
val ok = drainLatch?.await(200, TimeUnit.MILLISECONDS) ?: true
if (!ok) Log.w(TAG, "awaitDrain: audio threads did not drain in 200ms")
captureThread = null
playoutThread = null
Log.i(TAG, "audio pipeline stopped")
drainLatch = null
return ok
}
private fun applyGain(pcm: ShortArray, count: Int, db: Float) {

View File

@@ -254,8 +254,17 @@ class CallViewModel : ViewModel(), WzpCallback {
Log.i(TAG, "teardown: stopping audio, stopService=$stopService")
val hadCall = audioStarted
CallService.onStopFromNotification = null
stopAudio()
stopAudio() // sets running=false (non-blocking)
stopStatsPolling()
// Wait for audio threads to exit their loops before destroying the engine.
// This guarantees no in-flight JNI calls to writeAudio/readAudio.
val drained = audioPipeline?.awaitDrain() ?: true
if (!drained) {
Log.w(TAG, "teardown: audio threads did not drain in time")
}
audioPipeline = null
Log.i(TAG, "teardown: stopping engine")
try { engine?.stopCall() } catch (e: Exception) { Log.w(TAG, "stopCall err: $e") }
try { engine?.destroy() } catch (e: Exception) { Log.w(TAG, "destroy err: $e") }
@@ -399,8 +408,7 @@ class CallViewModel : ViewModel(), WzpCallback {
private fun stopAudio() {
if (!audioStarted) return
audioPipeline?.stop()
audioPipeline = null
audioPipeline?.stop() // sets running=false; DON'T null — teardown needs awaitDrain()
audioRouteManager?.unregister()
audioRouteManager?.setSpeaker(false)
_isSpeaker.value = false

View File

@@ -1,91 +1,128 @@
//! Lock-free SPSC ring buffers for audio PCM transfer between
//! Kotlin AudioRecord/AudioTrack threads and the Rust engine.
//! Lock-free SPSC ring buffer — "Reader-Detects-Lap" architecture.
//!
//! These use a simple spin-free design: the producer writes and advances
//! a write cursor, the consumer reads and advances a read cursor.
//! Both cursors are atomic so no mutex is needed.
//! SPSC invariant: the producer ONLY writes `write_pos`, the consumer
//! ONLY writes `read_pos`. Neither thread touches the other's cursor.
//!
//! On overflow (writer laps the reader), the writer simply overwrites
//! old buffer data. The reader detects the lap via `available() >
//! RING_CAPACITY` and snaps its own `read_pos` forward.
//!
//! Capacity is a power of 2 for bitmask indexing (no modulo).
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
/// Ring buffer capacity in i16 samples.
/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono.
const RING_CAPACITY: usize = 960 * 10;
/// Ring buffer capacity — power of 2 for bitmask indexing.
/// 16384 samples = 341.3ms at 48kHz mono. 70% more headroom
/// than the previous 9600 (200ms) for surviving Android GC pauses.
const RING_CAPACITY: usize = 16384; // 2^14
const RING_MASK: usize = RING_CAPACITY - 1;
/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples.
pub struct AudioRing {
buf: Box<[i16; RING_CAPACITY]>,
/// Monotonically increasing write cursor. ONLY written by producer.
write_pos: AtomicUsize,
/// Monotonically increasing read cursor. ONLY written by consumer.
read_pos: AtomicUsize,
/// Incremented by reader when it detects it was lapped (overflow).
overflow_count: AtomicU64,
/// Incremented by reader when ring is empty (underrun).
underrun_count: AtomicU64,
}
// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads.
// The atomics ensure visibility. The buffer itself is never accessed
// from the same index by both threads simultaneously because the
// producer only writes to positions between write_pos and read_pos,
// and the consumer only reads from positions between read_pos and write_pos.
// SAFETY: AudioRing is SPSC — one thread writes (producer), one reads (consumer).
// The producer only writes write_pos. The consumer only writes read_pos.
// Neither thread writes the other's cursor. Buffer indices are derived from
// the owning thread's cursor, ensuring no concurrent access to the same index.
unsafe impl Send for AudioRing {}
unsafe impl Sync for AudioRing {}
impl AudioRing {
pub fn new() -> Self {
debug_assert!(RING_CAPACITY.is_power_of_two());
Self {
buf: Box::new([0i16; RING_CAPACITY]),
write_pos: AtomicUsize::new(0),
read_pos: AtomicUsize::new(0),
overflow_count: AtomicU64::new(0),
underrun_count: AtomicU64::new(0),
}
}
/// Number of samples available to read.
/// Number of samples available to read (clamped to capacity).
pub fn available(&self) -> usize {
let w = self.write_pos.load(Ordering::Acquire);
let r = self.read_pos.load(Ordering::Acquire);
w.wrapping_sub(r)
let r = self.read_pos.load(Ordering::Relaxed);
w.wrapping_sub(r).min(RING_CAPACITY)
}
/// Number of samples that can be written without overwriting.
/// Number of samples that can be written without overwriting unread data.
pub fn free_space(&self) -> usize {
RING_CAPACITY - self.available()
RING_CAPACITY.saturating_sub(self.available())
}
/// Write samples into the ring. Returns number of samples written.
/// Drops oldest samples if the ring is full.
///
/// If the ring is full, old data is silently overwritten. The reader
/// will detect the lap and self-correct. The writer NEVER touches
/// `read_pos` — this is the key invariant that prevents cursor desync.
pub fn write(&self, samples: &[i16]) -> usize {
let w = self.write_pos.load(Ordering::Relaxed);
let count = samples.len().min(RING_CAPACITY);
let w = self.write_pos.load(Ordering::Relaxed);
for i in 0..count {
let idx = (w + i) % RING_CAPACITY;
// SAFETY: We're the only writer, and the reader won't read
// past read_pos which we haven't advanced past yet.
unsafe {
let ptr = self.buf.as_ptr() as *mut i16;
*ptr.add(idx) = samples[i];
*ptr.add((w + i) & RING_MASK) = samples[i];
}
}
self.write_pos.store(w.wrapping_add(count), Ordering::Release);
// If we overwrote unread data, advance read_pos
if self.available() > RING_CAPACITY {
let new_read = self.write_pos.load(Ordering::Relaxed).wrapping_sub(RING_CAPACITY);
self.read_pos.store(new_read, Ordering::Release);
}
count
}
/// Read samples from the ring into `out`. Returns number of samples read.
///
/// If the writer has lapped the reader (overflow), `read_pos` is snapped
/// forward to the oldest valid data. This is safe because only the
/// reader thread writes `read_pos`.
pub fn read(&self, out: &mut [i16]) -> usize {
let avail = self.available();
let count = out.len().min(avail);
let w = self.write_pos.load(Ordering::Acquire);
let mut r = self.read_pos.load(Ordering::Relaxed);
let mut avail = w.wrapping_sub(r);
// Lap detection: writer has overwritten our unread data.
// Snap read_pos forward to oldest valid data in the buffer.
if avail > RING_CAPACITY {
r = w.wrapping_sub(RING_CAPACITY);
avail = RING_CAPACITY;
self.overflow_count.fetch_add(1, Ordering::Relaxed);
}
let count = out.len().min(avail);
if count == 0 {
if w == r {
self.underrun_count.fetch_add(1, Ordering::Relaxed);
}
return 0;
}
let r = self.read_pos.load(Ordering::Relaxed);
for i in 0..count {
let idx = (r + i) % RING_CAPACITY;
out[i] = unsafe { *self.buf.as_ptr().add(idx) };
out[i] = unsafe { *self.buf.as_ptr().add((r + i) & RING_MASK) };
}
self.read_pos.store(r.wrapping_add(count), Ordering::Release);
count
}
/// Number of overflow events (reader was lapped by writer).
pub fn overflow_count(&self) -> u64 {
self.overflow_count.load(Ordering::Relaxed)
}
/// Number of underrun events (reader found empty buffer).
pub fn underrun_count(&self) -> u64 {
self.underrun_count.load(Ordering::Relaxed)
}
}

View File

@@ -183,6 +183,9 @@ impl WzpEngine {
stats.duration_secs = start.elapsed().as_secs_f64();
}
stats.audio_level = self.state.audio_level_rms.load(Ordering::Relaxed);
stats.playout_overflows = self.state.playout_ring.overflow_count();
stats.playout_underruns = self.state.playout_ring.underrun_count();
stats.capture_overflows = self.state.capture_ring.overflow_count();
stats
}
@@ -476,6 +479,7 @@ async fn run_call(
frames_dropped,
send_errors,
ring_avail = state.capture_ring.available(),
capture_overflows = state.capture_ring.overflow_count(),
"send stats"
);
last_stats_log = Instant::now();
@@ -578,6 +582,8 @@ async fn run_call(
recv_errors,
max_recv_gap_ms,
playout_avail = state.playout_ring.available(),
playout_overflows = state.playout_ring.overflow_count(),
playout_underruns = state.playout_ring.underrun_count(),
"recv stats"
);
max_recv_gap_ms = 0;

View File

@@ -51,6 +51,12 @@ pub struct CallStats {
pub underruns: u64,
/// Frames recovered by FEC.
pub fec_recovered: u64,
/// Playout ring overflow count (reader was lapped by writer).
pub playout_overflows: u64,
/// Playout ring underrun count (reader found empty buffer).
pub playout_underruns: u64,
/// Capture ring overflow count.
pub capture_overflows: u64,
/// Current mic audio level (RMS of i16 samples, 0-32767).
pub audio_level: u32,
/// Number of participants in the room (from last RoomUpdate).