From ed09c2e8cc0f7835b1ed61c7ab917db5e50ca6db Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Thu, 9 Apr 2026 08:33:08 +0400 Subject: [PATCH] =?UTF-8?q?fix:=20use=20block=5Fon=20pattern=20for=20signa?= =?UTF-8?q?ling=20(same=20as=20start=5Fcall)=20=E2=80=94=20no=20thread::sp?= =?UTF-8?q?awn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/wzp-android/src/engine.rs | 66 ++++++-------- crates/wzp-android/src/jni_bridge.rs | 129 +++------------------------ 2 files changed, 36 insertions(+), 159 deletions(-) diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 9220076..ff7212d 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -246,6 +246,9 @@ impl WzpEngine { /// Start persistent signaling connection for direct calls. /// Spawns a background task that maintains the `_signal` connection. + /// Start persistent signaling for direct calls. + /// Blocks the calling thread (Kotlin provides a Thread with 8MB stack). + /// Same pattern as start_call: tokio block_on on the caller's thread. pub fn start_signaling( &mut self, relay_addr: &str, @@ -253,52 +256,34 @@ impl WzpEngine { token: Option<&str>, alias: Option<&str>, ) -> Result<(), anyhow::Error> { - // Capture lightweight params only — all crypto work happens on the spawned thread - let addr_str = relay_addr.to_string(); - let seed_str = seed_hex.to_string(); + use wzp_proto::{MediaTransport, SignalMessage}; + + let _ = rustls::crypto::ring::default_provider().install_default(); + + let addr: SocketAddr = relay_addr.parse()?; + let seed = if seed_hex.is_empty() { + wzp_crypto::Seed::generate() + } else { + wzp_crypto::Seed::from_hex(seed_hex).map_err(|e| anyhow::anyhow!(e))? + }; + let identity = seed.derive_identity(); + let pub_id = identity.public_identity(); + let identity_pub = *pub_id.signing.as_bytes(); + let fp = pub_id.fingerprint.to_string(); let token = token.map(|s| s.to_string()); let alias = alias.map(|s| s.to_string()); let state = self.state.clone(); + info!(fingerprint = %fp, relay = %addr, "starting signaling"); + self.state.running.store(true, Ordering::Release); - // Spawn on a dedicated thread — Android's Kotlin dispatcher has ~1MB stack, - // too small for rustls + QUIC + tokio + crypto. Do ALL work on this thread. - std::thread::Builder::new() - .name("wzp-signal".into()) - .stack_size(8 * 1024 * 1024) // 8MB stack - .spawn(move || { - use wzp_proto::{MediaTransport, SignalMessage}; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; - let _ = rustls::crypto::ring::default_provider().install_default(); - - let addr: SocketAddr = match addr_str.parse() { - Ok(a) => a, - Err(e) => { error!("bad relay addr: {e}"); return; } - }; - let seed = if seed_str.is_empty() { - wzp_crypto::Seed::generate() - } else { - match wzp_crypto::Seed::from_hex(&seed_str) { - Ok(s) => s, - Err(e) => { error!("bad seed: {e}"); return; } - } - }; - let identity = seed.derive_identity(); - let pub_id = identity.public_identity(); - let identity_pub = *pub_id.signing.as_bytes(); - let fp = pub_id.fingerprint.to_string(); - - info!(fingerprint = %fp, relay = %addr, "starting signaling"); - - let signal_state = state.clone(); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime"); - rt.block_on(async move { - let _ = rustls::crypto::ring::default_provider().install_default(); + let signal_state = state.clone(); + rt.block_on(async move { let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); let endpoint = match wzp_transport::create_endpoint(bind, None) { Ok(e) => e, @@ -389,8 +374,7 @@ impl WzpEngine { let mut stats = signal_state.stats.lock().unwrap(); stats.state = crate::stats::CallState::Closed; - }); // block_on - })?; // thread spawn + }); // block_on Ok(()) } diff --git a/crates/wzp-android/src/jni_bridge.rs b/crates/wzp-android/src/jni_bridge.rs index 63a1cdb..5f0928e 100644 --- a/crates/wzp-android/src/jni_bridge.rs +++ b/crates/wzp-android/src/jni_bridge.rs @@ -382,126 +382,19 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default(); let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default(); - // Spawn a Java-level thread with large stack to run start_signaling. - // We CANNOT call start_signaling on this thread — even thread::spawn - // overflows the ~512KB DefaultDispatch stack with Rust's codegen. + // Use the existing start_call pattern — create engine, call start_signaling + // which spawns a thread internally. This is the same pattern that works for room calls. let h = unsafe { handle_ref(handle) }; - let state = h.engine.state.clone(); - let running = &h.engine.state.running; - running.store(true, std::sync::atomic::Ordering::Release); - std::thread::Builder::new() - .name("wzp-signal-init".into()) - .stack_size(8 * 1024 * 1024) - .spawn(move || { - use wzp_proto::{MediaTransport, SignalMessage}; - - let _ = rustls::crypto::ring::default_provider().install_default(); - - let addr: std::net::SocketAddr = match relay_addr.parse() { - Ok(a) => a, - Err(e) => { tracing::error!("bad relay addr: {e}"); return; } - }; - let seed = if seed_hex.is_empty() { - wzp_crypto::Seed::generate() - } else { - match wzp_crypto::Seed::from_hex(&seed_hex) { - Ok(s) => s, - Err(e) => { tracing::error!("bad seed: {e}"); return; } - } - }; - let identity = seed.derive_identity(); - let pub_id = identity.public_identity(); - let identity_pub = *pub_id.signing.as_bytes(); - let fp = pub_id.fingerprint.to_string(); - - tracing::info!(fingerprint = %fp, relay = %addr, "signal thread: starting"); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime"); - - let signal_state = state.clone(); - rt.block_on(async move { - let bind: std::net::SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = match wzp_transport::create_endpoint(bind, None) { - Ok(e) => e, - Err(e) => { tracing::error!("signal endpoint: {e}"); return; } - }; - let client_cfg = wzp_transport::client_config(); - let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await { - Ok(c) => c, - Err(e) => { tracing::error!("signal connect: {e}"); return; } - }; - let transport = std::sync::Arc::new(wzp_transport::QuinnTransport::new(conn)); - - if !token.is_empty() { - let _ = transport.send_signal(&SignalMessage::AuthToken { token: token.clone() }).await; - } - - let alias_val = if alias.is_empty() { None } else { Some(alias.clone()) }; - let _ = transport.send_signal(&SignalMessage::RegisterPresence { - identity_pub, - signature: vec![], - alias: alias_val, - }).await; - - match transport.recv_signal().await { - Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => { - tracing::info!(fingerprint = %fp, "signal: registered"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Registered; - } - other => { - tracing::error!("signal registration failed: {other:?}"); - return; - } - } - - loop { - if !signal_state.running.load(std::sync::atomic::Ordering::Relaxed) { break; } - match transport.recv_signal().await { - Ok(Some(SignalMessage::CallRinging { call_id })) => { - tracing::info!(call_id = %call_id, "signal: ringing"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Ringing; - } - Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => { - tracing::info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::IncomingCall; - stats.incoming_call_id = Some(call_id); - stats.incoming_caller_fp = Some(caller_fingerprint); - stats.incoming_caller_alias = caller_alias; - } - Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => { - tracing::info!(call_id = %call_id, room = %room, "signal: call setup"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Connecting; - stats.incoming_call_id = Some(format!("{relay_addr}|{room}")); - } - Ok(Some(SignalMessage::Hangup { reason })) => { - tracing::info!(reason = ?reason, "signal: hangup"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Closed; - stats.incoming_call_id = None; - stats.incoming_caller_fp = None; - stats.incoming_caller_alias = None; - } - Ok(Some(_)) => {} - Ok(None) => { tracing::info!("signal: closed"); break; } - Err(e) => { tracing::error!("signal recv: {e}"); break; } - } - } - - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Closed; - }); - }) - .ok(); - - 0 // always return success — actual errors logged from the thread + match h.engine.start_signaling( + &relay_addr, + &seed_hex, + if token.is_empty() { None } else { Some(&token) }, + if alias.is_empty() { None } else { Some(&alias) }, + ) { + Ok(()) => 0, + Err(e) => { error!("start_signaling failed: {e}"); -1 } + } } /// Place a direct call to a target fingerprint.