fix: AudioRing cursor desync + capture thread use-after-free
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m56s
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m56s
AudioRing (reader-detects-lap architecture): - Writer NEVER touches read_pos — fixes SPSC invariant violation - Reader self-corrects when lapped (snaps read_pos forward) - Power-of-2 capacity (16384 = 341ms) with bitmask indexing - Added overflow_count and underrun_count diagnostics - Wired ring health into engine stats and periodic logging Capture thread use-after-free (drain latch): - Added CountDownLatch(2) to AudioPipeline - Audio threads count down after exiting their loops - teardown() awaits latch (200ms timeout) before destroy() - Guarantees no in-flight JNI calls when native handle is freed - stopAudio() no longer nulls pipeline (teardown handles it) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,6 +19,8 @@ import java.io.FileOutputStream
|
|||||||
import java.io.OutputStreamWriter
|
import java.io.OutputStreamWriter
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.ByteOrder
|
import java.nio.ByteOrder
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import kotlin.math.pow
|
import kotlin.math.pow
|
||||||
import kotlin.math.sqrt
|
import kotlin.math.sqrt
|
||||||
|
|
||||||
@@ -58,6 +60,9 @@ class AudioPipeline(private val context: Context) {
|
|||||||
var debugRecording: Boolean = true
|
var debugRecording: Boolean = true
|
||||||
private var captureThread: Thread? = null
|
private var captureThread: Thread? = null
|
||||||
private var playoutThread: Thread? = null
|
private var playoutThread: Thread? = null
|
||||||
|
/** Latch counted down by each audio thread after exiting its loop.
|
||||||
|
* stop() does NOT wait on this — teardown waits via awaitDrain(). */
|
||||||
|
private var drainLatch: CountDownLatch? = null
|
||||||
|
|
||||||
private val debugDir: File by lazy {
|
private val debugDir: File by lazy {
|
||||||
File(context.cacheDir, "wzp_debug").also { it.mkdirs() }
|
File(context.cacheDir, "wzp_debug").also { it.mkdirs() }
|
||||||
@@ -66,9 +71,11 @@ class AudioPipeline(private val context: Context) {
|
|||||||
fun start(engine: WzpEngine) {
|
fun start(engine: WzpEngine) {
|
||||||
if (running) return
|
if (running) return
|
||||||
running = true
|
running = true
|
||||||
|
drainLatch = CountDownLatch(2) // one for capture, one for playout
|
||||||
|
|
||||||
captureThread = Thread({
|
captureThread = Thread({
|
||||||
runCapture(engine)
|
runCapture(engine)
|
||||||
|
drainLatch?.countDown() // signal: capture loop exited, no more JNI calls
|
||||||
// Park thread forever — exiting triggers a libcrypto TLS destructor
|
// Park thread forever — exiting triggers a libcrypto TLS destructor
|
||||||
// crash (SIGSEGV in OPENSSL_free) on Android when a JNI-calling thread exits.
|
// crash (SIGSEGV in OPENSSL_free) on Android when a JNI-calling thread exits.
|
||||||
parkThread()
|
parkThread()
|
||||||
@@ -80,6 +87,7 @@ class AudioPipeline(private val context: Context) {
|
|||||||
|
|
||||||
playoutThread = Thread({
|
playoutThread = Thread({
|
||||||
runPlayout(engine)
|
runPlayout(engine)
|
||||||
|
drainLatch?.countDown() // signal: playout loop exited
|
||||||
parkThread()
|
parkThread()
|
||||||
}, "wzp-playout").apply {
|
}, "wzp-playout").apply {
|
||||||
isDaemon = true
|
isDaemon = true
|
||||||
@@ -92,10 +100,20 @@ class AudioPipeline(private val context: Context) {
|
|||||||
|
|
||||||
fun stop() {
|
fun stop() {
|
||||||
running = false
|
running = false
|
||||||
// Don't join — threads are parked as daemons to avoid native TLS crash
|
// Don't join threads — they are parked as daemons to avoid native TLS crash.
|
||||||
|
// Don't null thread refs or drainLatch — teardown() needs awaitDrain().
|
||||||
|
Log.i(TAG, "audio pipeline stopped (running=false)")
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Block until both audio threads have exited their loops (max 200ms).
|
||||||
|
* After this returns, no more JNI calls to the engine will be made. */
|
||||||
|
fun awaitDrain(): Boolean {
|
||||||
|
val ok = drainLatch?.await(200, TimeUnit.MILLISECONDS) ?: true
|
||||||
|
if (!ok) Log.w(TAG, "awaitDrain: audio threads did not drain in 200ms")
|
||||||
captureThread = null
|
captureThread = null
|
||||||
playoutThread = null
|
playoutThread = null
|
||||||
Log.i(TAG, "audio pipeline stopped")
|
drainLatch = null
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun applyGain(pcm: ShortArray, count: Int, db: Float) {
|
private fun applyGain(pcm: ShortArray, count: Int, db: Float) {
|
||||||
|
|||||||
@@ -254,8 +254,17 @@ class CallViewModel : ViewModel(), WzpCallback {
|
|||||||
Log.i(TAG, "teardown: stopping audio, stopService=$stopService")
|
Log.i(TAG, "teardown: stopping audio, stopService=$stopService")
|
||||||
val hadCall = audioStarted
|
val hadCall = audioStarted
|
||||||
CallService.onStopFromNotification = null
|
CallService.onStopFromNotification = null
|
||||||
stopAudio()
|
stopAudio() // sets running=false (non-blocking)
|
||||||
stopStatsPolling()
|
stopStatsPolling()
|
||||||
|
|
||||||
|
// Wait for audio threads to exit their loops before destroying the engine.
|
||||||
|
// This guarantees no in-flight JNI calls to writeAudio/readAudio.
|
||||||
|
val drained = audioPipeline?.awaitDrain() ?: true
|
||||||
|
if (!drained) {
|
||||||
|
Log.w(TAG, "teardown: audio threads did not drain in time")
|
||||||
|
}
|
||||||
|
audioPipeline = null
|
||||||
|
|
||||||
Log.i(TAG, "teardown: stopping engine")
|
Log.i(TAG, "teardown: stopping engine")
|
||||||
try { engine?.stopCall() } catch (e: Exception) { Log.w(TAG, "stopCall err: $e") }
|
try { engine?.stopCall() } catch (e: Exception) { Log.w(TAG, "stopCall err: $e") }
|
||||||
try { engine?.destroy() } catch (e: Exception) { Log.w(TAG, "destroy err: $e") }
|
try { engine?.destroy() } catch (e: Exception) { Log.w(TAG, "destroy err: $e") }
|
||||||
@@ -399,8 +408,7 @@ class CallViewModel : ViewModel(), WzpCallback {
|
|||||||
|
|
||||||
private fun stopAudio() {
|
private fun stopAudio() {
|
||||||
if (!audioStarted) return
|
if (!audioStarted) return
|
||||||
audioPipeline?.stop()
|
audioPipeline?.stop() // sets running=false; DON'T null — teardown needs awaitDrain()
|
||||||
audioPipeline = null
|
|
||||||
audioRouteManager?.unregister()
|
audioRouteManager?.unregister()
|
||||||
audioRouteManager?.setSpeaker(false)
|
audioRouteManager?.setSpeaker(false)
|
||||||
_isSpeaker.value = false
|
_isSpeaker.value = false
|
||||||
|
|||||||
@@ -1,91 +1,128 @@
|
|||||||
//! Lock-free SPSC ring buffers for audio PCM transfer between
|
//! Lock-free SPSC ring buffer — "Reader-Detects-Lap" architecture.
|
||||||
//! Kotlin AudioRecord/AudioTrack threads and the Rust engine.
|
|
||||||
//!
|
//!
|
||||||
//! These use a simple spin-free design: the producer writes and advances
|
//! SPSC invariant: the producer ONLY writes `write_pos`, the consumer
|
||||||
//! a write cursor, the consumer reads and advances a read cursor.
|
//! ONLY writes `read_pos`. Neither thread touches the other's cursor.
|
||||||
//! Both cursors are atomic so no mutex is needed.
|
//!
|
||||||
|
//! On overflow (writer laps the reader), the writer simply overwrites
|
||||||
|
//! old buffer data. The reader detects the lap via `available() >
|
||||||
|
//! RING_CAPACITY` and snaps its own `read_pos` forward.
|
||||||
|
//!
|
||||||
|
//! Capacity is a power of 2 for bitmask indexing (no modulo).
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||||
|
|
||||||
/// Ring buffer capacity in i16 samples.
|
/// Ring buffer capacity — power of 2 for bitmask indexing.
|
||||||
/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono.
|
/// 16384 samples = 341.3ms at 48kHz mono. 70% more headroom
|
||||||
const RING_CAPACITY: usize = 960 * 10;
|
/// than the previous 9600 (200ms) for surviving Android GC pauses.
|
||||||
|
const RING_CAPACITY: usize = 16384; // 2^14
|
||||||
|
const RING_MASK: usize = RING_CAPACITY - 1;
|
||||||
|
|
||||||
/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples.
|
/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples.
|
||||||
pub struct AudioRing {
|
pub struct AudioRing {
|
||||||
buf: Box<[i16; RING_CAPACITY]>,
|
buf: Box<[i16; RING_CAPACITY]>,
|
||||||
|
/// Monotonically increasing write cursor. ONLY written by producer.
|
||||||
write_pos: AtomicUsize,
|
write_pos: AtomicUsize,
|
||||||
|
/// Monotonically increasing read cursor. ONLY written by consumer.
|
||||||
read_pos: AtomicUsize,
|
read_pos: AtomicUsize,
|
||||||
|
/// Incremented by reader when it detects it was lapped (overflow).
|
||||||
|
overflow_count: AtomicU64,
|
||||||
|
/// Incremented by reader when ring is empty (underrun).
|
||||||
|
underrun_count: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads.
|
// SAFETY: AudioRing is SPSC — one thread writes (producer), one reads (consumer).
|
||||||
// The atomics ensure visibility. The buffer itself is never accessed
|
// The producer only writes write_pos. The consumer only writes read_pos.
|
||||||
// from the same index by both threads simultaneously because the
|
// Neither thread writes the other's cursor. Buffer indices are derived from
|
||||||
// producer only writes to positions between write_pos and read_pos,
|
// the owning thread's cursor, ensuring no concurrent access to the same index.
|
||||||
// and the consumer only reads from positions between read_pos and write_pos.
|
|
||||||
unsafe impl Send for AudioRing {}
|
unsafe impl Send for AudioRing {}
|
||||||
unsafe impl Sync for AudioRing {}
|
unsafe impl Sync for AudioRing {}
|
||||||
|
|
||||||
impl AudioRing {
|
impl AudioRing {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
debug_assert!(RING_CAPACITY.is_power_of_two());
|
||||||
Self {
|
Self {
|
||||||
buf: Box::new([0i16; RING_CAPACITY]),
|
buf: Box::new([0i16; RING_CAPACITY]),
|
||||||
write_pos: AtomicUsize::new(0),
|
write_pos: AtomicUsize::new(0),
|
||||||
read_pos: AtomicUsize::new(0),
|
read_pos: AtomicUsize::new(0),
|
||||||
|
overflow_count: AtomicU64::new(0),
|
||||||
|
underrun_count: AtomicU64::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of samples available to read.
|
/// Number of samples available to read (clamped to capacity).
|
||||||
pub fn available(&self) -> usize {
|
pub fn available(&self) -> usize {
|
||||||
let w = self.write_pos.load(Ordering::Acquire);
|
let w = self.write_pos.load(Ordering::Acquire);
|
||||||
let r = self.read_pos.load(Ordering::Acquire);
|
let r = self.read_pos.load(Ordering::Relaxed);
|
||||||
w.wrapping_sub(r)
|
w.wrapping_sub(r).min(RING_CAPACITY)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of samples that can be written without overwriting.
|
/// Number of samples that can be written without overwriting unread data.
|
||||||
pub fn free_space(&self) -> usize {
|
pub fn free_space(&self) -> usize {
|
||||||
RING_CAPACITY - self.available()
|
RING_CAPACITY.saturating_sub(self.available())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write samples into the ring. Returns number of samples written.
|
/// Write samples into the ring. Returns number of samples written.
|
||||||
/// Drops oldest samples if the ring is full.
|
///
|
||||||
|
/// If the ring is full, old data is silently overwritten. The reader
|
||||||
|
/// will detect the lap and self-correct. The writer NEVER touches
|
||||||
|
/// `read_pos` — this is the key invariant that prevents cursor desync.
|
||||||
pub fn write(&self, samples: &[i16]) -> usize {
|
pub fn write(&self, samples: &[i16]) -> usize {
|
||||||
let w = self.write_pos.load(Ordering::Relaxed);
|
|
||||||
let count = samples.len().min(RING_CAPACITY);
|
let count = samples.len().min(RING_CAPACITY);
|
||||||
|
let w = self.write_pos.load(Ordering::Relaxed);
|
||||||
|
|
||||||
for i in 0..count {
|
for i in 0..count {
|
||||||
let idx = (w + i) % RING_CAPACITY;
|
|
||||||
// SAFETY: We're the only writer, and the reader won't read
|
|
||||||
// past read_pos which we haven't advanced past yet.
|
|
||||||
unsafe {
|
unsafe {
|
||||||
let ptr = self.buf.as_ptr() as *mut i16;
|
let ptr = self.buf.as_ptr() as *mut i16;
|
||||||
*ptr.add(idx) = samples[i];
|
*ptr.add((w + i) & RING_MASK) = samples[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.write_pos.store(w.wrapping_add(count), Ordering::Release);
|
self.write_pos.store(w.wrapping_add(count), Ordering::Release);
|
||||||
|
|
||||||
// If we overwrote unread data, advance read_pos
|
|
||||||
if self.available() > RING_CAPACITY {
|
|
||||||
let new_read = self.write_pos.load(Ordering::Relaxed).wrapping_sub(RING_CAPACITY);
|
|
||||||
self.read_pos.store(new_read, Ordering::Release);
|
|
||||||
}
|
|
||||||
|
|
||||||
count
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read samples from the ring into `out`. Returns number of samples read.
|
/// Read samples from the ring into `out`. Returns number of samples read.
|
||||||
|
///
|
||||||
|
/// If the writer has lapped the reader (overflow), `read_pos` is snapped
|
||||||
|
/// forward to the oldest valid data. This is safe because only the
|
||||||
|
/// reader thread writes `read_pos`.
|
||||||
pub fn read(&self, out: &mut [i16]) -> usize {
|
pub fn read(&self, out: &mut [i16]) -> usize {
|
||||||
let avail = self.available();
|
let w = self.write_pos.load(Ordering::Acquire);
|
||||||
let count = out.len().min(avail);
|
let mut r = self.read_pos.load(Ordering::Relaxed);
|
||||||
|
|
||||||
|
let mut avail = w.wrapping_sub(r);
|
||||||
|
|
||||||
|
// Lap detection: writer has overwritten our unread data.
|
||||||
|
// Snap read_pos forward to oldest valid data in the buffer.
|
||||||
|
if avail > RING_CAPACITY {
|
||||||
|
r = w.wrapping_sub(RING_CAPACITY);
|
||||||
|
avail = RING_CAPACITY;
|
||||||
|
self.overflow_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = out.len().min(avail);
|
||||||
|
if count == 0 {
|
||||||
|
if w == r {
|
||||||
|
self.underrun_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
let r = self.read_pos.load(Ordering::Relaxed);
|
|
||||||
for i in 0..count {
|
for i in 0..count {
|
||||||
let idx = (r + i) % RING_CAPACITY;
|
out[i] = unsafe { *self.buf.as_ptr().add((r + i) & RING_MASK) };
|
||||||
out[i] = unsafe { *self.buf.as_ptr().add(idx) };
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.read_pos.store(r.wrapping_add(count), Ordering::Release);
|
self.read_pos.store(r.wrapping_add(count), Ordering::Release);
|
||||||
count
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Number of overflow events (reader was lapped by writer).
|
||||||
|
pub fn overflow_count(&self) -> u64 {
|
||||||
|
self.overflow_count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of underrun events (reader found empty buffer).
|
||||||
|
pub fn underrun_count(&self) -> u64 {
|
||||||
|
self.underrun_count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -183,6 +183,9 @@ impl WzpEngine {
|
|||||||
stats.duration_secs = start.elapsed().as_secs_f64();
|
stats.duration_secs = start.elapsed().as_secs_f64();
|
||||||
}
|
}
|
||||||
stats.audio_level = self.state.audio_level_rms.load(Ordering::Relaxed);
|
stats.audio_level = self.state.audio_level_rms.load(Ordering::Relaxed);
|
||||||
|
stats.playout_overflows = self.state.playout_ring.overflow_count();
|
||||||
|
stats.playout_underruns = self.state.playout_ring.underrun_count();
|
||||||
|
stats.capture_overflows = self.state.capture_ring.overflow_count();
|
||||||
stats
|
stats
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -476,6 +479,7 @@ async fn run_call(
|
|||||||
frames_dropped,
|
frames_dropped,
|
||||||
send_errors,
|
send_errors,
|
||||||
ring_avail = state.capture_ring.available(),
|
ring_avail = state.capture_ring.available(),
|
||||||
|
capture_overflows = state.capture_ring.overflow_count(),
|
||||||
"send stats"
|
"send stats"
|
||||||
);
|
);
|
||||||
last_stats_log = Instant::now();
|
last_stats_log = Instant::now();
|
||||||
@@ -578,6 +582,8 @@ async fn run_call(
|
|||||||
recv_errors,
|
recv_errors,
|
||||||
max_recv_gap_ms,
|
max_recv_gap_ms,
|
||||||
playout_avail = state.playout_ring.available(),
|
playout_avail = state.playout_ring.available(),
|
||||||
|
playout_overflows = state.playout_ring.overflow_count(),
|
||||||
|
playout_underruns = state.playout_ring.underrun_count(),
|
||||||
"recv stats"
|
"recv stats"
|
||||||
);
|
);
|
||||||
max_recv_gap_ms = 0;
|
max_recv_gap_ms = 0;
|
||||||
|
|||||||
@@ -51,6 +51,12 @@ pub struct CallStats {
|
|||||||
pub underruns: u64,
|
pub underruns: u64,
|
||||||
/// Frames recovered by FEC.
|
/// Frames recovered by FEC.
|
||||||
pub fec_recovered: u64,
|
pub fec_recovered: u64,
|
||||||
|
/// Playout ring overflow count (reader was lapped by writer).
|
||||||
|
pub playout_overflows: u64,
|
||||||
|
/// Playout ring underrun count (reader found empty buffer).
|
||||||
|
pub playout_underruns: u64,
|
||||||
|
/// Capture ring overflow count.
|
||||||
|
pub capture_overflows: u64,
|
||||||
/// Current mic audio level (RMS of i16 samples, 0-32767).
|
/// Current mic audio level (RMS of i16 samples, 0-32767).
|
||||||
pub audio_level: u32,
|
pub audio_level: u32,
|
||||||
/// Number of participants in the room (from last RoomUpdate).
|
/// Number of participants in the room (from last RoomUpdate).
|
||||||
|
|||||||
Reference in New Issue
Block a user