fix: keep audio handles alive for call duration, fix Send+Sync
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m39s

The VPIO/CPAL audio handles were dropped at the end of start(),
killing the audio unit immediately. Audio I/O stopped working
after the first frame.

- Store audio handle in CallEngine via SyncWrapper
- Drop MutexGuard before returning from status() (Send future)
- Audio streams now live for the entire call duration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-06 12:00:16 +04:00
parent 9b733010ab
commit 21f5b24cbf

View File

@@ -15,6 +15,12 @@ use wzp_proto::MediaTransport;
const FRAME_SAMPLES: usize = 960; const FRAME_SAMPLES: usize = 960;
/// Wrapper to make non-Sync audio handles safe to store in shared state.
/// The audio handle is only accessed from the thread that created it (drop),
/// never shared across threads — Sync is safe.
struct SyncWrapper(Box<dyn std::any::Any + Send>);
unsafe impl Sync for SyncWrapper {}
pub struct ParticipantInfo { pub struct ParticipantInfo {
pub fingerprint: String, pub fingerprint: String,
pub alias: Option<String>, pub alias: Option<String>,
@@ -42,6 +48,9 @@ pub struct CallEngine {
transport: Arc<wzp_transport::QuinnTransport>, transport: Arc<wzp_transport::QuinnTransport>,
start_time: Instant, start_time: Instant,
fingerprint: String, fingerprint: String,
/// Keep audio handles alive for the duration of the call.
/// Wrapped in SyncWrapper because AudioUnit isn't Sync.
_audio_handle: SyncWrapper,
} }
impl CallEngine { impl CallEngine {
@@ -108,54 +117,45 @@ impl CallEngine {
info!("connected to relay, handshake complete"); info!("connected to relay, handshake complete");
event_cb("connected", &format!("joined room {room}")); event_cb("connected", &format!("joined room {room}"));
// Audio I/O — VPIO (OS AEC) on macOS, plain CPAL otherwise // Audio I/O — VPIO (OS AEC) on macOS, plain CPAL otherwise.
#[cfg(target_os = "macos")] // The audio handle must be stored in CallEngine to keep streams alive.
let _vpio_handle; let (capture_ring, playout_ring, audio_handle): (_, _, Box<dyn std::any::Any + Send>) =
let (capture_ring, playout_ring) = if _os_aec { if _os_aec {
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
{ {
// Try VPIO; fall back to CPAL if it fails match wzp_client::audio_vpio::VpioAudio::start() {
match wzp_client::audio_vpio::VpioAudio::start() { Ok(v) => {
Ok(v) => { let cr = v.capture_ring().clone();
let cr = v.capture_ring().clone(); let pr = v.playout_ring().clone();
let pr = v.playout_ring().clone(); info!("using VoiceProcessingIO (OS AEC)");
_vpio_handle = Some(v); (cr, pr, Box::new(v))
info!("using VoiceProcessingIO (OS AEC)"); }
(cr, pr) Err(e) => {
} info!("VPIO failed ({e}), falling back to CPAL");
Err(e) => { let capture = AudioCapture::start()?;
info!("VPIO failed ({e}), falling back to CPAL"); let playback = AudioPlayback::start()?;
_vpio_handle = None; let cr = capture.ring().clone();
let capture = AudioCapture::start()?; let pr = playback.ring().clone();
let playback = AudioPlayback::start()?; (cr, pr, Box::new((capture, playback)))
let cr = capture.ring().clone(); }
let pr = playback.ring().clone();
std::mem::forget(capture);
std::mem::forget(playback);
(cr, pr)
} }
} }
} #[cfg(not(target_os = "macos"))]
#[cfg(not(target_os = "macos"))] {
{ info!("OS AEC not available on this platform, using CPAL");
info!("OS AEC not available on this platform, using CPAL"); let capture = AudioCapture::start()?;
let playback = AudioPlayback::start()?;
let cr = capture.ring().clone();
let pr = playback.ring().clone();
(cr, pr, Box::new((capture, playback)))
}
} else {
let capture = AudioCapture::start()?; let capture = AudioCapture::start()?;
let playback = AudioPlayback::start()?; let playback = AudioPlayback::start()?;
let cr = capture.ring().clone(); let cr = capture.ring().clone();
let pr = playback.ring().clone(); let pr = playback.ring().clone();
std::mem::forget(capture); (cr, pr, Box::new((capture, playback)))
std::mem::forget(playback); };
(cr, pr)
}
} else {
let capture = AudioCapture::start()?;
let playback = AudioPlayback::start()?;
let cr = capture.ring().clone();
let pr = playback.ring().clone();
std::mem::forget(capture);
std::mem::forget(playback);
(cr, pr)
};
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let mic_muted = Arc::new(AtomicBool::new(false)); let mic_muted = Arc::new(AtomicBool::new(false));
@@ -318,6 +318,7 @@ impl CallEngine {
transport, transport,
start_time: Instant::now(), start_time: Instant::now(),
fingerprint, fingerprint,
_audio_handle: SyncWrapper(audio_handle),
}) })
} }
@@ -334,17 +335,20 @@ impl CallEngine {
} }
pub async fn status(&self) -> EngineStatus { pub async fn status(&self) -> EngineStatus {
let parts = self.participants.lock().await; let participants = {
EngineStatus { let parts = self.participants.lock().await;
mic_muted: self.mic_muted.load(Ordering::Relaxed), parts
spk_muted: self.spk_muted.load(Ordering::Relaxed),
participants: parts
.iter() .iter()
.map(|p| ParticipantInfo { .map(|p| ParticipantInfo {
fingerprint: p.fingerprint.clone(), fingerprint: p.fingerprint.clone(),
alias: p.alias.clone(), alias: p.alias.clone(),
}) })
.collect(), .collect()
}; // lock dropped here
EngineStatus {
mic_muted: self.mic_muted.load(Ordering::Relaxed),
spk_muted: self.spk_muted.load(Ordering::Relaxed),
participants,
frames_sent: self.frames_sent.load(Ordering::Relaxed), frames_sent: self.frames_sent.load(Ordering::Relaxed),
frames_received: self.frames_received.load(Ordering::Relaxed), frames_received: self.frames_received.load(Ordering::Relaxed),
audio_level: self.audio_level.load(Ordering::Relaxed), audio_level: self.audio_level.load(Ordering::Relaxed),