phase 3(android): wire CallEngine::start to wzp-native audio FFI
Some checks failed
Mirror to GitHub / mirror (push) Failing after 39s
Build Release Binaries / build-amd64 (push) Failing after 3m57s

Replaces the Android-side CallEngine::start() stub with a real implementation
that mirrors the desktop start() body but routes all PCM through the
standalone wzp-native cdylib loaded at startup via libloading instead of
using CPAL.

- desktop/src-tauri/src/wzp_native.rs: new module with a static
  OnceLock<libloading::Library> + cached raw fn pointers for every symbol
  we need (version, hello, audio_start/stop, read_capture, write_playout,
  is_running, capture/playout_latency_ms). init() resolves everything once
  at startup; accessors return default values if init() never ran.

- desktop/src-tauri/src/lib.rs: drop the inline dlopen smoke test, add
  `mod wzp_native;` behind target_os="android", and invoke
  wzp_native::init() from the Tauri setup() callback so the library is
  loaded + all symbols cached before any CallEngine can touch audio.

- desktop/src-tauri/src/engine.rs: the Android #[cfg] branch of
  CallEngine::start() now does the full QUIC handshake + signal loop +
  Opus send/recv tasks, calling wzp_native::audio_start() /
  audio_read_capture() / audio_write_playout() instead of the desktop
  CPAL rings. SyncWrapper now holds a placeholder Box<()> on Android
  because the audio backend lives in a process-global singleton inside
  libwzp_native.so rather than being owned per-engine.

Next step: build #39 on the remote docker builder and smoke-test on
Pixel 6 that the Connect button in the UI successfully brings up Oboe
and streams audio through the dlopen boundary.
This commit is contained in:
Siavash Sameni
2026-04-09 18:42:27 +04:00
parent c769a476a2
commit fdbe502524
4 changed files with 3413 additions and 129 deletions

3067
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -14,23 +14,17 @@ use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
// tracing is used heavily inside the desktop CallEngine::start body but never
// from the 6-line Android stub — keep it gated to avoid an unused_imports
// warning on Android.
#[cfg(not(target_os = "android"))]
use tracing::{error, info};
// CPAL audio I/O is only available on desktop (wzp-client's `audio` feature).
#[cfg(not(target_os = "android"))]
use wzp_client::audio_io::{AudioCapture, AudioPlayback};
// call / CallEncoder / CallConfig are platform-independent (pure Rust codec
// plumbing) but we only use them from the non-Android CallEngine::start body.
#[cfg(not(target_os = "android"))]
// Codec + handshake pipelines are platform-independent Rust (no CPAL
// dependency) so they're available from wzp-client on both desktop and
// Android (where wzp-client is pulled in with default-features=false).
use wzp_client::call::{CallConfig, CallEncoder};
// wzp_proto types are platform-independent and referenced from
// resolve_quality() which is compiled on every platform.
use wzp_proto::{CodecId, MediaTransport, QualityProfile};
const FRAME_SAMPLES_40MS: usize = 1920;
@@ -100,23 +94,273 @@ pub struct CallEngine {
}
impl CallEngine {
/// Android stub — the real audio pipeline depends on wzp_client's
/// CPAL-backed audio_io module, which isn't available here. Returns an
/// error so the `connect` Tauri command fails cleanly. We'll replace
/// this in a later step with an Oboe-backed implementation.
/// Android engine path — uses the standalone `wzp-native` cdylib
/// (loaded at startup via `crate::wzp_native::init()`) for Oboe-backed
/// capture and playout instead of CPAL. Mirrors the desktop send/recv
/// task structure otherwise.
#[cfg(target_os = "android")]
pub async fn start<F>(
_relay: String,
_room: String,
_alias: String,
relay: String,
room: String,
alias: String,
_os_aec: bool,
_quality: String,
_event_cb: F,
quality: String,
event_cb: F,
) -> Result<Self, anyhow::Error>
where
F: Fn(&str, &str) + Send + Sync + 'static,
{
Err(anyhow::anyhow!("audio engine not yet wired on Android (step C)"))
let _ = rustls::crypto::ring::default_provider().install_default();
let relay_addr: SocketAddr = relay.parse()?;
// Identity via shared helper (uses Tauri path().app_data_dir()).
let seed = crate::load_or_create_seed()
.map_err(|e| anyhow::anyhow!("identity: {e}"))?;
let fp = seed.derive_identity().public_identity().fingerprint;
let fingerprint = fp.to_string();
info!(%fp, "identity loaded");
// QUIC transport + handshake.
let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = wzp_transport::create_endpoint(bind_addr, None)?;
let client_config = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, relay_addr, &room, client_config).await?;
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
let _session = wzp_client::handshake::perform_handshake(
&*transport,
&seed.0,
Some(&alias),
)
.await?;
info!("connected to relay, handshake complete");
event_cb("connected", &format!("joined room {room}"));
// Oboe audio via the wzp-native cdylib that was dlopen'd at
// startup. `wzp_native::audio_start()` brings up the capture +
// playout streams; send/recv tasks below pull/push PCM through
// the extern "C" bridge rings.
if !crate::wzp_native::is_loaded() {
return Err(anyhow::anyhow!(
"wzp-native not loaded — dlopen failed at startup"
));
}
if let Err(code) = crate::wzp_native::audio_start() {
return Err(anyhow::anyhow!("wzp_native_audio_start failed: code {code}"));
}
info!("wzp-native audio started");
let running = Arc::new(AtomicBool::new(true));
let mic_muted = Arc::new(AtomicBool::new(false));
let spk_muted = Arc::new(AtomicBool::new(false));
let participants: Arc<Mutex<Vec<ParticipantInfo>>> = Arc::new(Mutex::new(vec![]));
let frames_sent = Arc::new(AtomicU64::new(0));
let frames_received = Arc::new(AtomicU64::new(0));
let audio_level = Arc::new(AtomicU32::new(0));
let tx_codec = Arc::new(Mutex::new(String::new()));
let rx_codec = Arc::new(Mutex::new(String::new()));
// Send task — drain Oboe capture ring, Opus-encode, push to transport.
let send_t = transport.clone();
let send_r = running.clone();
let send_mic = mic_muted.clone();
let send_fs = frames_sent.clone();
let send_level = audio_level.clone();
let send_drops = Arc::new(AtomicU64::new(0));
let send_quality = quality.clone();
let send_tx_codec = tx_codec.clone();
tokio::spawn(async move {
let profile = resolve_quality(&send_quality);
let config = match profile {
Some(p) => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::from_profile(p)
},
None => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::default()
},
};
let frame_samples = (config.profile.frame_duration_ms as usize) * 48;
info!(codec = ?config.profile.codec, frame_samples, "send task starting (android/oboe)");
*send_tx_codec.lock().await = format!("{:?}", config.profile.codec);
let mut encoder = CallEncoder::new(&config);
encoder.set_aec_enabled(false);
let mut buf = vec![0i16; frame_samples];
loop {
if !send_r.load(Ordering::Relaxed) {
break;
}
// wzp-native doesn't expose `available()`, so we just try
// to read a full frame and sleep briefly if the ring is
// short. Oboe's capture callback fills at a steady rate
// so in steady state this spins once per frame.
let read = crate::wzp_native::audio_read_capture(&mut buf);
if read < frame_samples {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
continue;
}
// RMS for UI meter
let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum();
let rms = (sum_sq / buf.len() as f64).sqrt() as u32;
send_level.store(rms, Ordering::Relaxed);
if send_mic.load(Ordering::Relaxed) {
buf.fill(0);
}
match encoder.encode_frame(&buf) {
Ok(pkts) => {
for pkt in &pkts {
if let Err(e) = send_t.send_media(pkt).await {
send_drops.fetch_add(1, Ordering::Relaxed);
if send_drops.load(Ordering::Relaxed) <= 3 {
tracing::warn!("send_media error (dropping packet): {e}");
}
}
}
send_fs.fetch_add(1, Ordering::Relaxed);
}
Err(e) => error!("encode: {e}"),
}
}
});
// Recv task — decode incoming packets, push PCM into Oboe playout.
let recv_t = transport.clone();
let recv_r = running.clone();
let recv_spk = spk_muted.clone();
let recv_fr = frames_received.clone();
let recv_rx_codec = rx_codec.clone();
tokio::spawn(async move {
let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD);
let mut decoder = wzp_codec::create_decoder(initial_profile);
let mut current_codec = initial_profile.codec;
let mut agc = wzp_codec::AutoGainControl::new();
let mut pcm = vec![0i16; FRAME_SAMPLES_40MS];
loop {
if !recv_r.load(Ordering::Relaxed) {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(100),
recv_t.recv_media(),
)
.await
{
Ok(Ok(Some(pkt))) => {
if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
{
let mut rx = recv_rx_codec.lock().await;
let codec_name = format!("{:?}", pkt.header.codec_id);
if *rx != codec_name { *rx = codec_name; }
}
if pkt.header.codec_id != current_codec {
let new_profile = match pkt.header.codec_id {
CodecId::Opus24k => QualityProfile::GOOD,
CodecId::Opus6k => QualityProfile::DEGRADED,
CodecId::Opus32k => QualityProfile::STUDIO_32K,
CodecId::Opus48k => QualityProfile::STUDIO_48K,
CodecId::Opus64k => QualityProfile::STUDIO_64K,
CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC,
CodecId::Codec2_3200 => QualityProfile {
codec: CodecId::Codec2_3200,
fec_ratio: 0.5, frame_duration_ms: 20, frames_per_block: 5,
},
other => QualityProfile { codec: other, ..QualityProfile::GOOD },
};
info!(from = ?current_codec, to = ?pkt.header.codec_id, "recv: switching decoder");
let _ = decoder.set_profile(new_profile);
current_codec = pkt.header.codec_id;
}
if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) {
agc.process_frame(&mut pcm[..n]);
if !recv_spk.load(Ordering::Relaxed) {
crate::wzp_native::audio_write_playout(&pcm[..n]);
}
}
}
recv_fr.fetch_add(1, Ordering::Relaxed);
}
Ok(Ok(None)) => break,
Ok(Err(e)) => {
let msg = e.to_string();
if msg.contains("closed") || msg.contains("reset") {
error!("recv fatal: {e}");
break;
}
}
Err(_) => {}
}
}
});
// Signal task (presence — same shape as desktop).
let sig_t = transport.clone();
let sig_r = running.clone();
let sig_p = participants.clone();
let event_cb = Arc::new(event_cb);
let sig_cb = event_cb.clone();
tokio::spawn(async move {
loop {
if !sig_r.load(Ordering::Relaxed) {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(200),
sig_t.recv_signal(),
)
.await
{
Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate {
participants: parts,
..
}))) => {
let mut seen = std::collections::HashSet::new();
let unique: Vec<ParticipantInfo> = parts
.into_iter()
.filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone())))
.map(|p| ParticipantInfo {
fingerprint: p.fingerprint,
alias: p.alias,
relay_label: p.relay_label,
})
.collect();
let count = unique.len();
*sig_p.lock().await = unique;
sig_cb("room-update", &format!("{count} participants"));
}
Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break,
Ok(Err(_)) => break,
Err(_) => {}
}
}
});
Ok(Self {
running,
mic_muted,
spk_muted,
participants,
frames_sent,
frames_received,
audio_level,
transport,
start_time: Instant::now(),
fingerprint,
tx_codec,
rx_codec,
// No CPAL / VPIO handle to keep alive on Android — wzp_native
// is a static dlopen'd library, the audio streams live inside
// the standalone cdylib's process-global singleton.
_audio_handle: SyncWrapper(Box::new(())),
})
}
#[cfg(not(target_os = "android"))]

View File

@@ -7,10 +7,16 @@
)]
// Call engine — now compiled on every platform. On desktop it runs the real
// CPAL/VPIO audio pipeline; on Android `CallEngine::start()` currently returns
// an error stub (see engine.rs — that's step C of the Oboe integration).
// CPAL/VPIO audio pipeline; on Android the engine calls into the standalone
// wzp-native cdylib (via the wzp_native module) for Oboe-backed audio.
mod engine;
// Android runtime binding to libwzp_native.so (Oboe audio backend, built as
// a standalone cdylib with cargo-ndk to avoid the Tauri staticlib symbol
// leak — see docs/incident-tauri-android-init-tcb.md).
#[cfg(target_os = "android")]
mod wzp_native;
// CallEngine is only referenced from the non-Android connect/disconnect/etc
// commands; the Android stubs return errors directly.
#[cfg(not(target_os = "android"))]
@@ -536,40 +542,24 @@ pub fn run() {
tracing::info!("app data dir: {data_dir:?}");
let _ = APP_DATA_DIR.set(data_dir);
// Phase 1 smoke test of the separate-cdylib approach to the
// __init_tcb crash (see docs/incident-tauri-android-init-tcb.md):
// dlopen the sibling libwzp_native.so that gradle dropped into
// our jniLibs directory and call its exported wzp_native_version()
// + wzp_native_hello() functions. If this logs
// "wzp-native dlopen OK: version=42 msg=...",
// we've validated the whole cdylib-split pipeline and can move
// to Phase 2 (port the Oboe bridge into wzp-native).
// Load the standalone wzp-native cdylib (Oboe audio bridge) and
// cache its exported function pointers. The library handle is
// kept alive in a 'static OnceLock for the lifetime of the
// process, so CallEngine::start() can invoke its audio FFI
// from anywhere. See src/wzp_native.rs and the incident report
// in docs/incident-tauri-android-init-tcb.md.
#[cfg(target_os = "android")]
{
match unsafe { libloading::Library::new("libwzp_native.so") } {
Ok(lib) => {
unsafe {
match lib.get::<unsafe extern "C" fn() -> i32>(b"wzp_native_version") {
Ok(version_fn) => {
let v = version_fn();
let mut buf = [0u8; 64];
let msg = match lib.get::<unsafe extern "C" fn(*mut u8, usize) -> usize>(b"wzp_native_hello") {
Ok(hello_fn) => {
let n = hello_fn(buf.as_mut_ptr(), buf.len());
String::from_utf8_lossy(&buf[..n]).into_owned()
}
Err(e) => format!("<no hello: {e}>"),
};
tracing::info!("wzp-native dlopen OK: version={v} msg=\"{msg}\"");
match wzp_native::init() {
Ok(()) => {
tracing::info!(
"wzp-native loaded: version={} msg=\"{}\"",
wzp_native::version(),
wzp_native::hello()
);
}
Err(e) => {
tracing::warn!("wzp-native loaded but dlsym(wzp_native_version) failed: {e}");
}
}
}
}
Err(e) => {
tracing::warn!("wzp-native dlopen failed: {e}");
tracing::warn!("wzp-native init failed: {e}");
}
}
}

View File

@@ -0,0 +1,139 @@
//! Runtime binding to the standalone `wzp-native` cdylib.
//!
//! See `docs/incident-tauri-android-init-tcb.md` and the top of
//! `crates/wzp-native/src/lib.rs` for the full story on why this split
//! exists. Short version: Tauri's desktop cdylib cannot have any C++
//! compiled into it (via cc::Build) without landing in rust-lang/rust#104707's
//! staticlib symbol leak, which makes bionic's private `pthread_create`
//! symbols bind locally and SIGSEGV in `__init_tcb+4` at launch. So all
//! the Oboe + audio code lives in a standalone `wzp-native` .so built
//! with `cargo-ndk`, and we dlopen it here at runtime.
//!
//! The Library handle lives in a `'static` `OnceLock` for the lifetime of
//! the process; all function pointers cached below borrow from it safely.
#![cfg(target_os = "android")]
use std::sync::OnceLock;
// ─── Library handle (kept alive forever) ─────────────────────────────────
static LIB: OnceLock<libloading::Library> = OnceLock::new();
// Cached function pointers, resolved once at init(). Each is a raw
// `extern "C"` fn pointer with effectively `'static` lifetime because
// LIB is a OnceLock that never drops.
static VERSION: OnceLock<unsafe extern "C" fn() -> i32> = OnceLock::new();
static HELLO: OnceLock<unsafe extern "C" fn(*mut u8, usize) -> usize> = OnceLock::new();
static AUDIO_START: OnceLock<unsafe extern "C" fn() -> i32> = OnceLock::new();
static AUDIO_STOP: OnceLock<unsafe extern "C" fn()> = OnceLock::new();
static AUDIO_READ_CAPTURE: OnceLock<unsafe extern "C" fn(*mut i16, usize) -> usize> = OnceLock::new();
static AUDIO_WRITE_PLAYOUT: OnceLock<unsafe extern "C" fn(*const i16, usize) -> usize> = OnceLock::new();
static AUDIO_IS_RUNNING: OnceLock<unsafe extern "C" fn() -> i32> = OnceLock::new();
static AUDIO_CAPTURE_LATENCY: OnceLock<unsafe extern "C" fn() -> f32> = OnceLock::new();
static AUDIO_PLAYOUT_LATENCY: OnceLock<unsafe extern "C" fn() -> f32> = OnceLock::new();
/// Load `libwzp_native.so` and resolve every exported function we use.
/// Call this once at app startup (from the Tauri `setup()` callback).
/// Subsequent calls are no-ops.
pub fn init() -> Result<(), String> {
if LIB.get().is_some() {
return Ok(());
}
// Open the sibling cdylib. The Android dynamic linker searches
// /data/app/<pkg>/lib/arm64/ which gradle populates from jniLibs.
let lib = unsafe { libloading::Library::new("libwzp_native.so") }
.map_err(|e| format!("dlopen libwzp_native.so: {e}"))?;
// Stash the Library into the OnceLock first so all Symbol lookups
// below borrow from the 'static reference rather than a local.
LIB.set(lib).map_err(|_| "wzp_native::LIB already set")?;
let lib_ref: &'static libloading::Library = LIB.get().unwrap();
unsafe {
macro_rules! resolve {
($cell:expr, $ty:ty, $name:expr) => {{
let sym: libloading::Symbol<$ty> = lib_ref.get($name)
.map_err(|e| format!("dlsym {}: {e}", core::str::from_utf8($name).unwrap_or("?")))?;
// Dereference the Symbol to extract the raw fn pointer;
// it stays valid because lib_ref is 'static.
$cell.set(*sym).map_err(|_| format!("{} already set", core::str::from_utf8($name).unwrap_or("?")))?;
}};
}
resolve!(VERSION, unsafe extern "C" fn() -> i32, b"wzp_native_version");
resolve!(HELLO, unsafe extern "C" fn(*mut u8, usize) -> usize, b"wzp_native_hello");
resolve!(AUDIO_START, unsafe extern "C" fn() -> i32, b"wzp_native_audio_start");
resolve!(AUDIO_STOP, unsafe extern "C" fn(), b"wzp_native_audio_stop");
resolve!(AUDIO_READ_CAPTURE, unsafe extern "C" fn(*mut i16, usize) -> usize, b"wzp_native_audio_read_capture");
resolve!(AUDIO_WRITE_PLAYOUT, unsafe extern "C" fn(*const i16, usize) -> usize, b"wzp_native_audio_write_playout");
resolve!(AUDIO_IS_RUNNING, unsafe extern "C" fn() -> i32, b"wzp_native_audio_is_running");
resolve!(AUDIO_CAPTURE_LATENCY, unsafe extern "C" fn() -> f32, b"wzp_native_audio_capture_latency_ms");
resolve!(AUDIO_PLAYOUT_LATENCY, unsafe extern "C" fn() -> f32, b"wzp_native_audio_playout_latency_ms");
}
Ok(())
}
/// Is `init()` done and all symbols cached?
pub fn is_loaded() -> bool {
AUDIO_START.get().is_some()
}
// ─── Smoke-test accessors ────────────────────────────────────────────────
pub fn version() -> i32 {
VERSION.get().map(|f| unsafe { f() }).unwrap_or(-1)
}
pub fn hello() -> String {
let Some(f) = HELLO.get() else { return String::new(); };
let mut buf = [0u8; 64];
let n = unsafe { f(buf.as_mut_ptr(), buf.len()) };
String::from_utf8_lossy(&buf[..n]).into_owned()
}
// ─── Audio accessors ─────────────────────────────────────────────────────
/// Start the Oboe capture + playout streams. Returns `Err(code)` on
/// failure. Idempotent on the wzp-native side.
pub fn audio_start() -> Result<(), i32> {
let f = AUDIO_START.get().ok_or(-100_i32)?;
let ret = unsafe { f() };
if ret == 0 { Ok(()) } else { Err(ret) }
}
/// Stop both streams. Safe to call even if not running.
pub fn audio_stop() {
if let Some(f) = AUDIO_STOP.get() {
unsafe { f() };
}
}
/// Read captured i16 PCM into `out`. Returns bytes actually copied.
pub fn audio_read_capture(out: &mut [i16]) -> usize {
let Some(f) = AUDIO_READ_CAPTURE.get() else { return 0; };
unsafe { f(out.as_mut_ptr(), out.len()) }
}
/// Write i16 PCM into the playout ring. Returns samples enqueued.
pub fn audio_write_playout(input: &[i16]) -> usize {
let Some(f) = AUDIO_WRITE_PLAYOUT.get() else { return 0; };
unsafe { f(input.as_ptr(), input.len()) }
}
#[allow(dead_code)]
pub fn audio_is_running() -> bool {
AUDIO_IS_RUNNING.get().map(|f| unsafe { f() } != 0).unwrap_or(false)
}
#[allow(dead_code)]
pub fn audio_capture_latency_ms() -> f32 {
AUDIO_CAPTURE_LATENCY.get().map(|f| unsafe { f() }).unwrap_or(0.0)
}
#[allow(dead_code)]
pub fn audio_playout_latency_ms() -> f32 {
AUDIO_PLAYOUT_LATENCY.get().map(|f| unsafe { f() }).unwrap_or(0.0)
}