fix: single thread for entire signal lifecycle — runtime never dropped (libcrypto TLS fix)
Some checks failed
Mirror to GitHub / mirror (push) Failing after 37s
Build Release Binaries / build-amd64 (push) Failing after 3m52s

This commit is contained in:
Siavash Sameni
2026-04-09 10:11:33 +04:00
parent b53eae9192
commit 46c9ee1be3
2 changed files with 85 additions and 72 deletions

View File

@@ -413,24 +413,12 @@ pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConn
let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default(); let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default();
let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default(); let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default();
// start() connects + registers synchronously (blocks briefly). // start() spawns an internal thread (connect+register+recv, ONE runtime, never dropped).
// Then we spawn run() on a thread for the recv loop. // Blocks up to 10s waiting for the connect+register to complete.
match crate::signal_mgr::SignalManager::start(&relay, &seed) { match crate::signal_mgr::SignalManager::start(&relay, &seed) {
Ok(mgr) => { Ok(mgr) => {
let handle = Box::new(SignalHandle { mgr }); let handle = Box::new(SignalHandle { mgr });
let raw = Box::into_raw(handle); Box::into_raw(handle) as jlong
// Spawn recv loop on a small thread (no crypto init, just recv)
let recv_ref: &'static SignalHandle = unsafe { &*raw };
std::thread::Builder::new()
.name("wzp-signal-recv".into())
.stack_size(2 * 1024 * 1024) // 2MB is enough for recv-only
.spawn(move || {
recv_ref.mgr.run();
})
.ok();
raw as jlong
} }
Err(e) => { Err(e) => {
error!("signal connect failed: {e}"); error!("signal connect failed: {e}");

View File

@@ -38,8 +38,9 @@ pub struct SignalManager {
} }
impl SignalManager { impl SignalManager {
/// Connect to relay and register. Returns immediately with Self. /// Create SignalManager and start connect+register+recv on a background thread.
/// Then call `run()` on a separate thread to start the recv loop. /// Returns immediately. The internal thread runs forever.
/// CRITICAL: tokio runtime must never be dropped on Android (libcrypto TLS conflict).
pub fn start(relay_addr: &str, seed_hex: &str) -> Result<Self, anyhow::Error> { pub fn start(relay_addr: &str, seed_hex: &str) -> Result<Self, anyhow::Error> {
let addr: SocketAddr = relay_addr.parse()?; let addr: SocketAddr = relay_addr.parse()?;
let seed = if seed_hex.is_empty() { let seed = if seed_hex.is_empty() {
@@ -52,66 +53,79 @@ impl SignalManager {
let identity_pub = *pub_id.signing.as_bytes(); let identity_pub = *pub_id.signing.as_bytes();
let fp = pub_id.fingerprint.to_string(); let fp = pub_id.fingerprint.to_string();
info!(fingerprint = %fp, relay = %addr, "signal: connecting");
// Connect + register synchronously
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let transport = rt.block_on(async {
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = wzp_transport::create_endpoint(bind, None)?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?;
Ok::<_, anyhow::Error>(Arc::new(wzp_transport::QuinnTransport::new(conn)))
})?;
// Register presence (still on the same runtime)
rt.block_on(async {
transport.send_signal(&SignalMessage::RegisterPresence {
identity_pub,
signature: vec![],
alias: None,
}).await?;
match transport.recv_signal().await? {
Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => {
info!(fingerprint = %fp, "signal: registered");
Ok(())
}
other => Err(anyhow::anyhow!("registration failed: {other:?}")),
}
})?;
// Don't drop runtime — keep it for run()
let state = Arc::new(Mutex::new(SignalState { let state = Arc::new(Mutex::new(SignalState {
status: "registered".into(), status: "connecting".into(),
fingerprint: fp, fingerprint: fp.clone(),
..Default::default() ..Default::default()
})); }));
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
Ok(Self { // Channel to receive transport after connect succeeds
transport, let (transport_tx, transport_rx) = std::sync::mpsc::channel();
state,
running,
})
}
/// Blocking recv loop. Run on a dedicated thread after start(). let bg_state = Arc::clone(&state);
/// Never returns until the connection drops or stop() is called. let bg_running = Arc::clone(&running);
pub fn run(&self) { let ret_state = Arc::clone(&state);
let rt = tokio::runtime::Builder::new_current_thread() let ret_running = Arc::clone(&running);
.enable_all()
.build()
.expect("tokio runtime");
let transport = self.transport.clone(); // ONE thread, ONE runtime, NEVER dropped.
let state = self.state.clone(); // Connect + register + recv loop all happen here.
let running = self.running.clone(); std::thread::Builder::new()
.name("wzp-signal".into())
.stack_size(4 * 1024 * 1024)
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");
rt.block_on(async move { rt.block_on(async move {
info!(fingerprint = %fp, relay = %addr, "signal: connecting");
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = match wzp_transport::create_endpoint(bind, None) {
Ok(e) => e,
Err(e) => {
error!("signal endpoint: {e}");
bg_state.lock().unwrap().status = "idle".into();
return;
}
};
let client_cfg = wzp_transport::client_config();
let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await {
Ok(c) => c,
Err(e) => {
error!("signal connect: {e}");
bg_state.lock().unwrap().status = "idle".into();
return;
}
};
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
// Register
if let Err(e) = transport.send_signal(&SignalMessage::RegisterPresence {
identity_pub, signature: vec![], alias: None,
}).await {
error!("signal register: {e}");
bg_state.lock().unwrap().status = "idle".into();
return;
}
match transport.recv_signal().await {
Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => {
info!(fingerprint = %fp, "signal: registered");
bg_state.lock().unwrap().status = "registered".into();
// Send transport to caller
let _ = transport_tx.send(transport.clone());
}
other => {
error!("signal registration failed: {other:?}");
bg_state.lock().unwrap().status = "idle".into();
return;
}
}
// Recv loop — runs forever
loop { loop {
if !running.load(Ordering::Relaxed) { break; } if !running.load(Ordering::Relaxed) { break; }
@@ -163,9 +177,20 @@ impl SignalManager {
} }
} }
let mut s = state.lock().unwrap(); bg_state.lock().unwrap().status = "idle".into();
s.status = "idle".into(); }); // block_on
}); // block_on
// Runtime intentionally NOT dropped — lives until thread exits.
// This prevents ring/libcrypto TLS cleanup conflict on Android.
// The thread is parked here forever (block_on returned = connection lost).
std::thread::park();
})?; // thread spawn
// Wait for transport (up to 10s)
let transport = transport_rx.recv_timeout(std::time::Duration::from_secs(10))
.map_err(|_| anyhow::anyhow!("signal connect timeout — check relay address"))?;
Ok(Self { transport, state: ret_state, running: ret_running })
} }
/// Get current state (non-blocking). /// Get current state (non-blocking).