From 46c9ee1be3aa95bf1aa189787f8fa3e7074f8352 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Thu, 9 Apr 2026 10:11:33 +0400 Subject: [PATCH] =?UTF-8?q?fix:=20single=20thread=20for=20entire=20signal?= =?UTF-8?q?=20lifecycle=20=E2=80=94=20runtime=20never=20dropped=20(libcryp?= =?UTF-8?q?to=20TLS=20fix)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/wzp-android/src/jni_bridge.rs | 18 +--- crates/wzp-android/src/signal_mgr.rs | 139 ++++++++++++++++----------- 2 files changed, 85 insertions(+), 72 deletions(-) diff --git a/crates/wzp-android/src/jni_bridge.rs b/crates/wzp-android/src/jni_bridge.rs index 424feb1..357b5d5 100644 --- a/crates/wzp-android/src/jni_bridge.rs +++ b/crates/wzp-android/src/jni_bridge.rs @@ -413,24 +413,12 @@ pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConn let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default(); let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default(); - // start() connects + registers synchronously (blocks briefly). - // Then we spawn run() on a thread for the recv loop. + // start() spawns an internal thread (connect+register+recv, ONE runtime, never dropped). + // Blocks up to 10s waiting for the connect+register to complete. match crate::signal_mgr::SignalManager::start(&relay, &seed) { Ok(mgr) => { let handle = Box::new(SignalHandle { mgr }); - let raw = Box::into_raw(handle); - - // Spawn recv loop on a small thread (no crypto init, just recv) - let recv_ref: &'static SignalHandle = unsafe { &*raw }; - std::thread::Builder::new() - .name("wzp-signal-recv".into()) - .stack_size(2 * 1024 * 1024) // 2MB is enough for recv-only - .spawn(move || { - recv_ref.mgr.run(); - }) - .ok(); - - raw as jlong + Box::into_raw(handle) as jlong } Err(e) => { error!("signal connect failed: {e}"); diff --git a/crates/wzp-android/src/signal_mgr.rs b/crates/wzp-android/src/signal_mgr.rs index 696a841..57ee19b 100644 --- a/crates/wzp-android/src/signal_mgr.rs +++ b/crates/wzp-android/src/signal_mgr.rs @@ -38,8 +38,9 @@ pub struct SignalManager { } impl SignalManager { - /// Connect to relay and register. Returns immediately with Self. - /// Then call `run()` on a separate thread to start the recv loop. + /// Create SignalManager and start connect+register+recv on a background thread. + /// Returns immediately. The internal thread runs forever. + /// CRITICAL: tokio runtime must never be dropped on Android (libcrypto TLS conflict). pub fn start(relay_addr: &str, seed_hex: &str) -> Result { let addr: SocketAddr = relay_addr.parse()?; let seed = if seed_hex.is_empty() { @@ -52,66 +53,79 @@ impl SignalManager { let identity_pub = *pub_id.signing.as_bytes(); let fp = pub_id.fingerprint.to_string(); - info!(fingerprint = %fp, relay = %addr, "signal: connecting"); - - // Connect + register synchronously - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let transport = rt.block_on(async { - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = wzp_transport::create_endpoint(bind, None)?; - let client_cfg = wzp_transport::client_config(); - let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?; - Ok::<_, anyhow::Error>(Arc::new(wzp_transport::QuinnTransport::new(conn))) - })?; - - // Register presence (still on the same runtime) - rt.block_on(async { - transport.send_signal(&SignalMessage::RegisterPresence { - identity_pub, - signature: vec![], - alias: None, - }).await?; - - match transport.recv_signal().await? { - Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => { - info!(fingerprint = %fp, "signal: registered"); - Ok(()) - } - other => Err(anyhow::anyhow!("registration failed: {other:?}")), - } - })?; - - // Don't drop runtime — keep it for run() let state = Arc::new(Mutex::new(SignalState { - status: "registered".into(), - fingerprint: fp, + status: "connecting".into(), + fingerprint: fp.clone(), ..Default::default() })); let running = Arc::new(AtomicBool::new(true)); - Ok(Self { - transport, - state, - running, - }) - } + // Channel to receive transport after connect succeeds + let (transport_tx, transport_rx) = std::sync::mpsc::channel(); - /// Blocking recv loop. Run on a dedicated thread after start(). - /// Never returns until the connection drops or stop() is called. - pub fn run(&self) { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime"); + let bg_state = Arc::clone(&state); + let bg_running = Arc::clone(&running); + let ret_state = Arc::clone(&state); + let ret_running = Arc::clone(&running); - let transport = self.transport.clone(); - let state = self.state.clone(); - let running = self.running.clone(); + // ONE thread, ONE runtime, NEVER dropped. + // Connect + register + recv loop all happen here. + std::thread::Builder::new() + .name("wzp-signal".into()) + .stack_size(4 * 1024 * 1024) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); - rt.block_on(async move { + rt.block_on(async move { + info!(fingerprint = %fp, relay = %addr, "signal: connecting"); + + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let endpoint = match wzp_transport::create_endpoint(bind, None) { + Ok(e) => e, + Err(e) => { + error!("signal endpoint: {e}"); + bg_state.lock().unwrap().status = "idle".into(); + return; + } + }; + let client_cfg = wzp_transport::client_config(); + let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await { + Ok(c) => c, + Err(e) => { + error!("signal connect: {e}"); + bg_state.lock().unwrap().status = "idle".into(); + return; + } + }; + let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); + + // Register + if let Err(e) = transport.send_signal(&SignalMessage::RegisterPresence { + identity_pub, signature: vec![], alias: None, + }).await { + error!("signal register: {e}"); + bg_state.lock().unwrap().status = "idle".into(); + return; + } + + match transport.recv_signal().await { + Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => { + info!(fingerprint = %fp, "signal: registered"); + bg_state.lock().unwrap().status = "registered".into(); + // Send transport to caller + let _ = transport_tx.send(transport.clone()); + } + other => { + error!("signal registration failed: {other:?}"); + bg_state.lock().unwrap().status = "idle".into(); + return; + } + } + + // Recv loop — runs forever loop { if !running.load(Ordering::Relaxed) { break; } @@ -163,9 +177,20 @@ impl SignalManager { } } - let mut s = state.lock().unwrap(); - s.status = "idle".into(); - }); // block_on + bg_state.lock().unwrap().status = "idle".into(); + }); // block_on + + // Runtime intentionally NOT dropped — lives until thread exits. + // This prevents ring/libcrypto TLS cleanup conflict on Android. + // The thread is parked here forever (block_on returned = connection lost). + std::thread::park(); + })?; // thread spawn + + // Wait for transport (up to 10s) + let transport = transport_rx.recv_timeout(std::time::Duration::from_secs(10)) + .map_err(|_| anyhow::anyhow!("signal connect timeout — check relay address"))?; + + Ok(Self { transport, state: ret_state, running: ret_running }) } /// Get current state (non-blocking).