diff --git a/crates/wzp-android/src/jni_bridge.rs b/crates/wzp-android/src/jni_bridge.rs index 1627d5a..424feb1 100644 --- a/crates/wzp-android/src/jni_bridge.rs +++ b/crates/wzp-android/src/jni_bridge.rs @@ -402,7 +402,7 @@ unsafe fn signal_ref(handle: jlong) -> &'static SignalHandle { } /// Connect to relay for signaling. Returns handle (jlong) or 0 on error. -/// MUST be called from a thread with sufficient stack (8MB). +/// Blocks up to 10s waiting for the internal signal thread to connect. #[unsafe(no_mangle)] pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConnect<'a>( mut env: JNIEnv<'a>, @@ -413,12 +413,24 @@ pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConn let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default(); let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default(); - // start() spawns its own thread internally — connect + register + recv loop - // all run on ONE thread with ONE runtime to avoid TLS conflicts. + // start() connects + registers synchronously (blocks briefly). + // Then we spawn run() on a thread for the recv loop. match crate::signal_mgr::SignalManager::start(&relay, &seed) { Ok(mgr) => { let handle = Box::new(SignalHandle { mgr }); - Box::into_raw(handle) as jlong + let raw = Box::into_raw(handle); + + // Spawn recv loop on a small thread (no crypto init, just recv) + let recv_ref: &'static SignalHandle = unsafe { &*raw }; + std::thread::Builder::new() + .name("wzp-signal-recv".into()) + .stack_size(2 * 1024 * 1024) // 2MB is enough for recv-only + .spawn(move || { + recv_ref.mgr.run(); + }) + .ok(); + + raw as jlong } Err(e) => { error!("signal connect failed: {e}"); diff --git a/crates/wzp-android/src/signal_mgr.rs b/crates/wzp-android/src/signal_mgr.rs index 968e7cb..696a841 100644 --- a/crates/wzp-android/src/signal_mgr.rs +++ b/crates/wzp-android/src/signal_mgr.rs @@ -38,10 +38,8 @@ pub struct SignalManager { } impl SignalManager { - /// Connect to relay, register presence, and start recv loop. - /// This creates the SignalManager, spawns a thread that runs FOREVER - /// (connect + register + recv loop all on ONE thread/runtime to avoid TLS conflicts). - /// Returns immediately after spawning. + /// Connect to relay and register. Returns immediately with Self. + /// Then call `run()` on a separate thread to start the recv loop. pub fn start(relay_addr: &str, seed_hex: &str) -> Result { let addr: SocketAddr = relay_addr.parse()?; let seed = if seed_hex.is_empty() { @@ -54,86 +52,66 @@ impl SignalManager { let identity_pub = *pub_id.signing.as_bytes(); let fp = pub_id.fingerprint.to_string(); + info!(fingerprint = %fp, relay = %addr, "signal: connecting"); + + // Connect + register synchronously + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let transport = rt.block_on(async { + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let endpoint = wzp_transport::create_endpoint(bind, None)?; + let client_cfg = wzp_transport::client_config(); + let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?; + Ok::<_, anyhow::Error>(Arc::new(wzp_transport::QuinnTransport::new(conn))) + })?; + + // Register presence (still on the same runtime) + rt.block_on(async { + transport.send_signal(&SignalMessage::RegisterPresence { + identity_pub, + signature: vec![], + alias: None, + }).await?; + + match transport.recv_signal().await? { + Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => { + info!(fingerprint = %fp, "signal: registered"); + Ok(()) + } + other => Err(anyhow::anyhow!("registration failed: {other:?}")), + } + })?; + + // Don't drop runtime — keep it for run() let state = Arc::new(Mutex::new(SignalState { - status: "connecting".into(), - fingerprint: fp.clone(), + status: "registered".into(), + fingerprint: fp, ..Default::default() })); - let running = Arc::new(AtomicBool::new(true)); - // We need a transport Arc that's shared with the spawned thread. - // The thread does connect + register + recv loop. We use a oneshot - // channel to get the transport back after connect succeeds. - let (tx, rx) = std::sync::mpsc::channel::>(); + Ok(Self { + transport, + state, + running, + }) + } - let thread_state = Arc::clone(&state); - let thread_running = Arc::clone(&running); - let thread_fp = fp.clone(); - let return_state = Arc::clone(&state); - let return_running = Arc::clone(&running); + /// Blocking recv loop. Run on a dedicated thread after start(). + /// Never returns until the connection drops or stop() is called. + pub fn run(&self) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); - std::thread::Builder::new() - .name("wzp-signal".into()) - .stack_size(4 * 1024 * 1024) - .spawn(move || { - // ONE runtime for the entire lifetime of this thread. - // Never dropped until thread exits — avoids TLS cleanup conflicts. - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime"); + let transport = self.transport.clone(); + let state = self.state.clone(); + let running = self.running.clone(); - rt.block_on(async move { - info!(fingerprint = %thread_fp, relay = %addr, "signal: connecting"); - - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = match wzp_transport::create_endpoint(bind, None) { - Ok(e) => e, - Err(e) => { error!("signal endpoint: {e}"); return; } - }; - let client_cfg = wzp_transport::client_config(); - let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await { - Ok(c) => c, - Err(e) => { - error!("signal connect failed: {e}"); - let mut s = thread_state.lock().unwrap(); - s.status = "idle".into(); - return; - } - }; - let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); - - // Register presence - if let Err(e) = transport.send_signal(&SignalMessage::RegisterPresence { - identity_pub, - signature: vec![], - alias: None, - }).await { - error!("signal register send: {e}"); - let mut s = thread_state.lock().unwrap(); - s.status = "idle".into(); - return; - } - - match transport.recv_signal().await { - Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => { - info!(fingerprint = %thread_fp, "signal: registered"); - let mut s = thread_state.lock().unwrap(); - s.status = "registered".into(); - } - other => { - error!("signal registration failed: {other:?}"); - let mut s = thread_state.lock().unwrap(); - s.status = "idle".into(); - return; - } - } - - // Send transport to the caller - let _ = tx.send(transport.clone()); - - // Recv loop — runs forever until stopped + rt.block_on(async move { loop { if !running.load(Ordering::Relaxed) { break; } @@ -185,20 +163,9 @@ impl SignalManager { } } - let mut s = thread_state.lock().unwrap(); + let mut s = state.lock().unwrap(); s.status = "idle".into(); - }); // block_on — runtime lives until thread exits - })?; // thread spawn - - // Wait for transport from the spawned thread (with timeout) - let transport = rx.recv_timeout(std::time::Duration::from_secs(10)) - .map_err(|_| anyhow::anyhow!("signal connect timeout"))?; - - Ok(Self { - transport, - state: return_state, - running: return_running, - }) + }); // block_on } /// Get current state (non-blocking).