diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt index 9ac0b73..4793f97 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt @@ -56,12 +56,22 @@ class CallViewModel : ViewModel(), WzpCallback { engineInitialized = true } _callState.value = 1 // Connecting - val result = engine?.startCall(relayAddr, room) ?: -1 - if (result == 0) { - startStatsPolling() - } else { - _callState.value = 0 - _errorMessage.value = "Failed to start call (code $result)" + startStatsPolling() + + // startCall blocks (runs tokio on calling thread), so dispatch + // to a background coroutine. Using Dispatchers.IO which uses + // Java threads (not native pthread_create). + viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) { + try { + val result = engine?.startCall(relayAddr, room) ?: -1 + if (result != 0) { + _callState.value = 0 + _errorMessage.value = "Failed to start call (code $result)" + } + } catch (e: Exception) { + _callState.value = 0 + _errorMessage.value = "Engine error: ${e.message}" + } } } catch (e: Exception) { _callState.value = 0 diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 22d930e..cbe8b60 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -1,10 +1,9 @@ //! Engine orchestrator — manages the call lifecycle. //! -//! The engine owns: -//! - The Oboe audio backend (start/stop) -//! - A codec thread running the `Pipeline` -//! - A tokio runtime for async network I/O -//! - Command channel for control from the JNI/UI thread +//! IMPORTANT: On Android, pthread_create crashes in shared libraries due to +//! static bionic stubs in the Rust std prebuilt rlibs. ALL work must happen +//! on the JNI calling thread or via the tokio current_thread runtime. +//! No std::thread::spawn or tokio multi_thread allowed. use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering}; @@ -12,28 +11,21 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use bytes::Bytes; -use tracing::{error, info, warn}; +use tracing::{error, info}; use wzp_crypto::{KeyExchange, WarzoneKeyExchange}; use wzp_proto::{ CodecId, MediaHeader, MediaPacket, MediaTransport, QualityProfile, SignalMessage, }; -use crate::audio_android::{OboeBackend, FRAME_SAMPLES}; use crate::commands::EngineCommand; -use crate::pipeline::Pipeline; use crate::stats::{CallState, CallStats}; /// Configuration to start a call. pub struct CallStartConfig { - /// Initial quality profile. pub profile: QualityProfile, - /// Relay server address (host:port). pub relay_addr: String, - /// Room name (passed as SNI). pub room: String, - /// Authentication token for the relay. pub auth_token: Vec, - /// 32-byte identity seed for key derivation. pub identity_seed: [u8; 32], } @@ -49,23 +41,16 @@ impl Default for CallStartConfig { } } -/// Shared state between the engine owner and background threads. struct EngineState { running: AtomicBool, - connected: AtomicBool, muted: AtomicBool, - speaker: AtomicBool, - aec_enabled: AtomicBool, - agc_enabled: AtomicBool, stats: Mutex, command_tx: std::sync::mpsc::Sender, command_rx: Mutex>>, } -/// The WarzonePhone Android engine. pub struct WzpEngine { state: Arc, - codec_thread: Option>, tokio_runtime: Option, call_start: Option, } @@ -75,19 +60,13 @@ impl WzpEngine { let (tx, rx) = std::sync::mpsc::channel(); let state = Arc::new(EngineState { running: AtomicBool::new(false), - connected: AtomicBool::new(false), muted: AtomicBool::new(false), - speaker: AtomicBool::new(false), - aec_enabled: AtomicBool::new(true), - agc_enabled: AtomicBool::new(true), stats: Mutex::new(CallStats::default()), command_tx: tx, command_rx: Mutex::new(Some(rx)), }); - Self { state, - codec_thread: None, tokio_runtime: None, call_start: None, } @@ -106,347 +85,62 @@ impl WzpEngine { }; } - // Create tokio runtime — use current_thread to avoid pthread_create - // issues on Android (SEGV_ACCERR in __init_tcb with multi_thread). + // Create single-threaded tokio runtime — NO thread spawning. + // On Android, pthread_create crashes due to static bionic stubs. let runtime = tokio::runtime::Builder::new_current_thread() - .thread_name("wzp-net") .enable_all() .build()?; - // Channels between codec thread and network tasks - let (send_tx, mut send_rx) = tokio::sync::mpsc::channel::>(64); - let (recv_tx, recv_rx) = tokio::sync::mpsc::channel::(64); - - // Shared sequence counter for outgoing packets - let seq_counter = Arc::new(AtomicU16::new(0)); - let ts_counter = Arc::new(AtomicU32::new(0)); - - // Parse relay address let relay_addr: SocketAddr = config.relay_addr.parse().map_err(|e| { anyhow::anyhow!("invalid relay address '{}': {e}", config.relay_addr) })?; let room = config.room.clone(); let identity_seed = config.identity_seed; - let state_net = self.state.clone(); - let seq_c = seq_counter.clone(); - let ts_c = ts_counter.clone(); + let state = self.state.clone(); - // Spawn the combined network task (connect + handshake + send/recv) - runtime.spawn(async move { - // Install rustls crypto provider - let _ = rustls::crypto::ring::default_provider().install_default(); + self.state.running.store(true, Ordering::Release); + self.call_start = Some(Instant::now()); - // Create QUIC endpoint - let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = match wzp_transport::create_endpoint(bind_addr, None) { - Ok(ep) => ep, - Err(e) => { - error!("failed to create QUIC endpoint: {e}"); - return; - } - }; - - // Connect to relay with room as SNI - let sni = if room.is_empty() { "android" } else { &room }; - info!(%relay_addr, sni, "connecting to relay..."); - let client_cfg = wzp_transport::client_config(); - let conn = match wzp_transport::connect(&endpoint, relay_addr, sni, client_cfg).await { - Ok(c) => c, - Err(e) => { - error!("QUIC connect failed: {e}"); - return; - } - }; - info!("QUIC connected to relay"); - - let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); - - // Crypto handshake: send CallOffer, receive CallAnswer - let mut kx = WarzoneKeyExchange::from_identity_seed(&identity_seed); - let ephemeral_pub = kx.generate_ephemeral(); - let identity_pub = kx.identity_public_key(); - - // Sign (ephemeral_pub || "call-offer") - let mut sign_data = Vec::with_capacity(32 + 10); - sign_data.extend_from_slice(&ephemeral_pub); - sign_data.extend_from_slice(b"call-offer"); - let signature = kx.sign(&sign_data); - - let offer = SignalMessage::CallOffer { - identity_pub, - ephemeral_pub, - signature, - supported_profiles: vec![ - QualityProfile::GOOD, - QualityProfile::DEGRADED, - QualityProfile::CATASTROPHIC, - ], - }; - - if let Err(e) = transport.send_signal(&offer).await { - error!("failed to send CallOffer: {e}"); - return; + // Run the entire call on the current thread's tokio runtime. + // This blocks the JNI thread until the call ends, so Kotlin + // must call startCall from a background coroutine. + let state_clone = state.clone(); + runtime.block_on(async move { + if let Err(e) = run_call(relay_addr, &room, &identity_seed, state_clone).await { + error!("call failed: {e}"); } - info!("CallOffer sent, waiting for CallAnswer..."); - - // Receive CallAnswer - let answer = match transport.recv_signal().await { - Ok(Some(msg)) => msg, - Ok(None) => { - error!("connection closed before CallAnswer"); - return; - } - Err(e) => { - error!("failed to receive CallAnswer: {e}"); - return; - } - }; - - let (relay_ephemeral_pub, _chosen_profile) = match answer { - SignalMessage::CallAnswer { - ephemeral_pub, - chosen_profile, - .. - } => (ephemeral_pub, chosen_profile), - other => { - error!("expected CallAnswer, got {:?}", std::mem::discriminant(&other)); - return; - } - }; - - // Derive crypto session (not encrypting media yet for simplicity) - let _session = match kx.derive_session(&relay_ephemeral_pub) { - Ok(s) => s, - Err(e) => { - error!("session derivation failed: {e}"); - return; - } - }; - - info!("handshake complete, call active"); - state_net.connected.store(true, Ordering::Release); - { - let mut stats = state_net.stats.lock().unwrap(); - stats.state = CallState::Active; - } - - // Spawn recv task - let recv_transport = transport.clone(); - let recv_handle = tokio::spawn(async move { - loop { - match recv_transport.recv_media().await { - Ok(Some(pkt)) => { - if recv_tx.send(pkt).await.is_err() { - break; - } - } - Ok(None) => { - info!("relay disconnected (recv)"); - break; - } - Err(e) => { - error!("recv_media error: {e}"); - break; - } - } - } - }); - - // Send task runs in this task - while let Some(encoded) = send_rx.recv().await { - let seq = seq_c.fetch_add(1, Ordering::Relaxed); - let ts = ts_c.fetch_add(20, Ordering::Relaxed); - let packet = MediaPacket { - header: MediaHeader { - version: 0, - is_repair: false, - codec_id: CodecId::Opus24k, - has_quality_report: false, - fec_ratio_encoded: 0, - seq, - timestamp: ts, - fec_block: 0, - fec_symbol: 0, - reserved: 0, - csrc_count: 0, - }, - payload: Bytes::from(encoded), - quality_report: None, - }; - if let Err(e) = transport.send_media(&packet).await { - error!("send_media error: {e}"); - break; - } - } - - recv_handle.abort(); - transport.close().await.ok(); }); - // Take the command receiver - let command_rx = self - .state - .command_rx - .lock() - .unwrap() - .take() - .ok_or_else(|| anyhow::anyhow!("command receiver already taken"))?; + state.running.store(false, Ordering::Release); + { + let mut stats = state.stats.lock().unwrap(); + stats.state = CallState::Closed; + } - // Start the codec thread - let state = self.state.clone(); - let profile = config.profile; - let codec_thread = std::thread::Builder::new() - .name("wzp-codec".into()) - .spawn(move || { - crate::audio_android::pin_to_big_core(); - crate::audio_android::set_realtime_priority(); - - let mut audio = OboeBackend::new(); - if let Err(e) = audio.start() { - error!("failed to start audio: {e}"); - state.running.store(false, Ordering::Release); - return; - } - - let mut pipeline = match Pipeline::new(profile) { - Ok(p) => p, - Err(e) => { - error!("failed to create pipeline: {e}"); - audio.stop(); - state.running.store(false, Ordering::Release); - return; - } - }; - - state.running.store(true, Ordering::Release); - - let mut prev_aec = true; - let mut prev_agc = true; - let mut capture_buf = vec![0i16; FRAME_SAMPLES]; - let frame_duration = std::time::Duration::from_millis(20); - let mut recv_rx = recv_rx; - - while state.running.load(Ordering::Relaxed) { - let loop_start = Instant::now(); - - // Process commands - while let Ok(cmd) = command_rx.try_recv() { - match cmd { - EngineCommand::SetMute(m) => { - state.muted.store(m, Ordering::Relaxed); - } - EngineCommand::SetSpeaker(s) => { - state.speaker.store(s, Ordering::Relaxed); - } - EngineCommand::ForceProfile(p) => { - pipeline.force_profile(p); - } - EngineCommand::Stop => { - state.running.store(false, Ordering::Release); - break; - } - } - } - - // Sync AEC/AGC - let cur_aec = state.aec_enabled.load(Ordering::Relaxed); - if cur_aec != prev_aec { - pipeline.set_aec_enabled(cur_aec); - prev_aec = cur_aec; - } - let cur_agc = state.agc_enabled.load(Ordering::Relaxed); - if cur_agc != prev_agc { - pipeline.set_agc_enabled(cur_agc); - prev_agc = cur_agc; - } - - if !state.running.load(Ordering::Relaxed) { - break; - } - - // --- Capture → Encode → Send --- - let captured = audio.read_capture(&mut capture_buf); - if captured >= FRAME_SAMPLES { - let muted = state.muted.load(Ordering::Relaxed); - if let Some(encoded) = pipeline.encode_frame(&capture_buf, muted) { - let _ = send_tx.try_send(encoded); - } - } - - // --- Recv → Decode → Playout --- - while let Ok(pkt) = recv_rx.try_recv() { - pipeline.feed_packet(pkt); - } - - if let Some(pcm) = pipeline.decode_frame() { - audio.write_playout(&pcm); - } - - // --- Update stats --- - { - let pstats = pipeline.stats(); - let mut stats = state.stats.lock().unwrap(); - stats.frames_encoded = pstats.frames_encoded; - stats.frames_decoded = pstats.frames_decoded; - stats.underruns = pstats.underruns; - stats.jitter_buffer_depth = pstats.jitter_depth; - stats.quality_tier = pstats.quality_tier; - } - - let elapsed = loop_start.elapsed(); - if elapsed < frame_duration { - std::thread::sleep(frame_duration - elapsed); - } - } - - audio.stop(); - { - let mut stats = state.stats.lock().unwrap(); - stats.state = CallState::Closed; - } - })?; - - self.codec_thread = Some(codec_thread); self.tokio_runtime = Some(runtime); - self.call_start = Some(Instant::now()); Ok(()) } pub fn stop_call(&mut self) { - if !self.state.running.load(Ordering::Acquire) { - return; - } self.state.running.store(false, Ordering::Release); let _ = self.state.command_tx.send(EngineCommand::Stop); - - if let Some(handle) = self.codec_thread.take() { - let _ = handle.join(); - } if let Some(rt) = self.tokio_runtime.take() { - rt.shutdown_timeout(std::time::Duration::from_secs(2)); + rt.shutdown_background(); } self.call_start = None; } pub fn set_mute(&self, muted: bool) { - let _ = self.state.command_tx.send(EngineCommand::SetMute(muted)); + self.state.muted.store(muted, Ordering::Relaxed); } - pub fn set_speaker(&self, enabled: bool) { - let _ = self.state.command_tx.send(EngineCommand::SetSpeaker(enabled)); + pub fn set_speaker(&self, _enabled: bool) { + // TODO: route audio via AudioManager on Kotlin side } - pub fn set_aec_enabled(&self, enabled: bool) { - self.state.aec_enabled.store(enabled, Ordering::Relaxed); - } - - pub fn set_agc_enabled(&self, enabled: bool) { - self.state.agc_enabled.store(enabled, Ordering::Relaxed); - } - - pub fn force_profile(&self, profile: QualityProfile) { - let _ = self.state.command_tx.send(EngineCommand::ForceProfile(profile)); + pub fn force_profile(&self, _profile: QualityProfile) { + // TODO: wire to pipeline when codec thread is re-enabled } pub fn get_stats(&self) -> CallStats { @@ -471,3 +165,162 @@ impl Drop for WzpEngine { self.stop_call(); } } + +/// Run the full call lifecycle: connect, handshake, send/recv media. +/// All async, no thread spawning. +async fn run_call( + relay_addr: SocketAddr, + room: &str, + identity_seed: &[u8; 32], + state: Arc, +) -> Result<(), anyhow::Error> { + // Install rustls crypto provider + let _ = rustls::crypto::ring::default_provider().install_default(); + + // Create QUIC endpoint + let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let endpoint = wzp_transport::create_endpoint(bind_addr, None)?; + + // Connect to relay with room as SNI + let sni = if room.is_empty() { "android" } else { room }; + info!(%relay_addr, sni, "connecting to relay..."); + let client_cfg = wzp_transport::client_config(); + let conn = wzp_transport::connect(&endpoint, relay_addr, sni, client_cfg).await?; + info!("QUIC connected to relay"); + + let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); + + // Crypto handshake + let mut kx = WarzoneKeyExchange::from_identity_seed(identity_seed); + let ephemeral_pub = kx.generate_ephemeral(); + let identity_pub = kx.identity_public_key(); + + let mut sign_data = Vec::with_capacity(42); + sign_data.extend_from_slice(&ephemeral_pub); + sign_data.extend_from_slice(b"call-offer"); + let signature = kx.sign(&sign_data); + + let offer = SignalMessage::CallOffer { + identity_pub, + ephemeral_pub, + signature, + supported_profiles: vec![ + QualityProfile::GOOD, + QualityProfile::DEGRADED, + QualityProfile::CATASTROPHIC, + ], + }; + transport.send_signal(&offer).await?; + info!("CallOffer sent, waiting for CallAnswer..."); + + let answer = transport + .recv_signal() + .await? + .ok_or_else(|| anyhow::anyhow!("connection closed before CallAnswer"))?; + + let relay_ephemeral_pub = match answer { + SignalMessage::CallAnswer { ephemeral_pub, .. } => ephemeral_pub, + other => { + return Err(anyhow::anyhow!( + "expected CallAnswer, got {:?}", + std::mem::discriminant(&other) + )) + } + }; + + let _session = kx.derive_session(&relay_ephemeral_pub)?; + info!("handshake complete, call active"); + + { + let mut stats = state.stats.lock().unwrap(); + stats.state = CallState::Active; + } + + // Simple media loop: send silence, recv and count frames. + // No codec thread, no Oboe — just network I/O to verify connectivity. + // Audio pipeline will be added once native threading is resolved. + let seq = AtomicU16::new(0); + let ts = AtomicU32::new(0); + let transport_recv = transport.clone(); + + let send_task = async { + let silence = vec![0u8; 20]; // minimal opus silence frame + loop { + if !state.running.load(Ordering::Relaxed) { + break; + } + let s = seq.fetch_add(1, Ordering::Relaxed); + let t = ts.fetch_add(20, Ordering::Relaxed); + let packet = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: false, + codec_id: CodecId::Opus24k, + has_quality_report: false, + fec_ratio_encoded: 0, + seq: s, + timestamp: t, + fec_block: 0, + fec_symbol: 0, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(silence.clone()), + quality_report: None, + }; + if let Err(e) = transport.send_media(&packet).await { + error!("send error: {e}"); + break; + } + // 20ms frame interval + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + } + }; + + let recv_task = async { + let mut frames_decoded: u64 = 0; + loop { + if !state.running.load(Ordering::Relaxed) { + break; + } + match transport_recv.recv_media().await { + Ok(Some(_pkt)) => { + frames_decoded += 1; + let mut stats = state.stats.lock().unwrap(); + stats.frames_decoded = frames_decoded; + } + Ok(None) => { + info!("relay disconnected"); + break; + } + Err(e) => { + error!("recv error: {e}"); + break; + } + } + } + }; + + // Update encoded frame count in send task + let stats_task = async { + loop { + if !state.running.load(Ordering::Relaxed) { + break; + } + { + let mut stats = state.stats.lock().unwrap(); + stats.frames_encoded = seq.load(Ordering::Relaxed) as u64; + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + }; + + tokio::select! { + _ = send_task => {} + _ = recv_task => {} + _ = stats_task => {} + } + + transport.close().await.ok(); + Ok(()) +} diff --git a/wzp-release.apk b/wzp-release.apk index e638b28..cd9d690 100644 Binary files a/wzp-release.apk and b/wzp-release.apk differ