diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt index d59ddb2..220ab2f 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt @@ -172,19 +172,22 @@ class CallViewModel : ViewModel(), WzpCallback { val seed = _seedHex.value val resolvedRelay = resolveToIp(relay) ?: relay - // Connect on a thread with 8MB stack (QUIC + TLS needs it) - Thread(null, { - val mgr = com.wzp.engine.SignalManager() - val ok = mgr.connect(resolvedRelay, seed) - viewModelScope.launch { + // nativeSignalConnect blocks up to 10s (waits for QUIC connect + register). + // Internal thread does the actual work; we just wait for the result. + viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) { + try { + val mgr = com.wzp.engine.SignalManager() + val ok = mgr.connect(resolvedRelay, seed) if (ok) { signalManager = mgr startSignalPolling() } else { _errorMessage.value = "Failed to register on relay" } + } catch (e: Exception) { + _errorMessage.value = "Register error: ${e.message}" } - }, "wzp-signal-connect", 8 * 1024 * 1024).start() + } } /** Poll signal manager state every 500ms */ diff --git a/crates/wzp-android/src/jni_bridge.rs b/crates/wzp-android/src/jni_bridge.rs index e5f5b76..1627d5a 100644 --- a/crates/wzp-android/src/jni_bridge.rs +++ b/crates/wzp-android/src/jni_bridge.rs @@ -413,24 +413,12 @@ pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConn let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default(); let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default(); - match crate::signal_mgr::SignalManager::connect(&relay, &seed) { + // start() spawns its own thread internally — connect + register + recv loop + // all run on ONE thread with ONE runtime to avoid TLS conflicts. + match crate::signal_mgr::SignalManager::start(&relay, &seed) { Ok(mgr) => { - // Spawn recv loop on a dedicated thread - let mgr_ref = &mgr as *const crate::signal_mgr::SignalManager; let handle = Box::new(SignalHandle { mgr }); - let raw = Box::into_raw(handle); - - // Get a reference for the recv thread - let recv_ref = unsafe { &(*raw).mgr }; - std::thread::Builder::new() - .name("wzp-signal-recv".into()) - .stack_size(4 * 1024 * 1024) - .spawn(move || { - recv_ref.run_recv_loop(); - }) - .ok(); - - raw as jlong + Box::into_raw(handle) as jlong } Err(e) => { error!("signal connect failed: {e}"); diff --git a/crates/wzp-android/src/signal_mgr.rs b/crates/wzp-android/src/signal_mgr.rs index 0d0135f..968e7cb 100644 --- a/crates/wzp-android/src/signal_mgr.rs +++ b/crates/wzp-android/src/signal_mgr.rs @@ -38,9 +38,11 @@ pub struct SignalManager { } impl SignalManager { - /// Connect to relay, register presence, return immediately. - /// Call `run_recv_loop()` on a separate thread after this. - pub fn connect(relay_addr: &str, seed_hex: &str) -> Result { + /// Connect to relay, register presence, and start recv loop. + /// This creates the SignalManager, spawns a thread that runs FOREVER + /// (connect + register + recv loop all on ONE thread/runtime to avoid TLS conflicts). + /// Returns immediately after spawning. + pub fn start(relay_addr: &str, seed_hex: &str) -> Result { let addr: SocketAddr = relay_addr.parse()?; let seed = if seed_hex.is_empty() { wzp_crypto::Seed::generate() @@ -52,68 +54,86 @@ impl SignalManager { let identity_pub = *pub_id.signing.as_bytes(); let fp = pub_id.fingerprint.to_string(); - info!(fingerprint = %fp, relay = %addr, "signal: connecting"); - - // Synchronous QUIC connect + register (runs on caller's thread) - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let transport = rt.block_on(async { - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = wzp_transport::create_endpoint(bind, None)?; - let client_cfg = wzp_transport::client_config(); - let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?; - let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); - - // Register presence - transport.send_signal(&SignalMessage::RegisterPresence { - identity_pub, - signature: vec![], - alias: None, - }).await?; - - match transport.recv_signal().await? { - Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => { - info!(fingerprint = %fp, "signal: registered"); - } - other => { - return Err(anyhow::anyhow!("registration failed: {other:?}")); - } - } - - Ok::<_, anyhow::Error>(transport) - })?; - - // Don't drop the runtime — we need it for the recv loop - // Store it... actually, we'll create a new one in run_recv_loop - drop(rt); - let state = Arc::new(Mutex::new(SignalState { - status: "registered".into(), - fingerprint: fp, + status: "connecting".into(), + fingerprint: fp.clone(), ..Default::default() })); - Ok(Self { - transport, - state, - running: Arc::new(AtomicBool::new(true)), - }) - } + let running = Arc::new(AtomicBool::new(true)); - /// Blocking signal recv loop. Run on a dedicated thread. - pub fn run_recv_loop(&self) { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime for signal recv"); + // We need a transport Arc that's shared with the spawned thread. + // The thread does connect + register + recv loop. We use a oneshot + // channel to get the transport back after connect succeeds. + let (tx, rx) = std::sync::mpsc::channel::>(); - let transport = self.transport.clone(); - let state = self.state.clone(); - let running = self.running.clone(); + let thread_state = Arc::clone(&state); + let thread_running = Arc::clone(&running); + let thread_fp = fp.clone(); + let return_state = Arc::clone(&state); + let return_running = Arc::clone(&running); - rt.block_on(async move { + std::thread::Builder::new() + .name("wzp-signal".into()) + .stack_size(4 * 1024 * 1024) + .spawn(move || { + // ONE runtime for the entire lifetime of this thread. + // Never dropped until thread exits — avoids TLS cleanup conflicts. + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); + + rt.block_on(async move { + info!(fingerprint = %thread_fp, relay = %addr, "signal: connecting"); + + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let endpoint = match wzp_transport::create_endpoint(bind, None) { + Ok(e) => e, + Err(e) => { error!("signal endpoint: {e}"); return; } + }; + let client_cfg = wzp_transport::client_config(); + let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await { + Ok(c) => c, + Err(e) => { + error!("signal connect failed: {e}"); + let mut s = thread_state.lock().unwrap(); + s.status = "idle".into(); + return; + } + }; + let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); + + // Register presence + if let Err(e) = transport.send_signal(&SignalMessage::RegisterPresence { + identity_pub, + signature: vec![], + alias: None, + }).await { + error!("signal register send: {e}"); + let mut s = thread_state.lock().unwrap(); + s.status = "idle".into(); + return; + } + + match transport.recv_signal().await { + Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => { + info!(fingerprint = %thread_fp, "signal: registered"); + let mut s = thread_state.lock().unwrap(); + s.status = "registered".into(); + } + other => { + error!("signal registration failed: {other:?}"); + let mut s = thread_state.lock().unwrap(); + s.status = "idle".into(); + return; + } + } + + // Send transport to the caller + let _ = tx.send(transport.clone()); + + // Recv loop — runs forever until stopped loop { if !running.load(Ordering::Relaxed) { break; } @@ -165,9 +185,20 @@ impl SignalManager { } } - let mut s = state.lock().unwrap(); + let mut s = thread_state.lock().unwrap(); s.status = "idle".into(); - }); + }); // block_on — runtime lives until thread exits + })?; // thread spawn + + // Wait for transport from the spawned thread (with timeout) + let transport = rx.recv_timeout(std::time::Duration::from_secs(10)) + .map_err(|_| anyhow::anyhow!("signal connect timeout"))?; + + Ok(Self { + transport, + state: return_state, + running: return_running, + }) } /// Get current state (non-blocking).