fix: single thread+runtime for signal lifecycle — avoids ring/libcrypto TLS conflict on pthread_exit
This commit is contained in:
@@ -172,19 +172,22 @@ class CallViewModel : ViewModel(), WzpCallback {
|
|||||||
val seed = _seedHex.value
|
val seed = _seedHex.value
|
||||||
val resolvedRelay = resolveToIp(relay) ?: relay
|
val resolvedRelay = resolveToIp(relay) ?: relay
|
||||||
|
|
||||||
// Connect on a thread with 8MB stack (QUIC + TLS needs it)
|
// nativeSignalConnect blocks up to 10s (waits for QUIC connect + register).
|
||||||
Thread(null, {
|
// Internal thread does the actual work; we just wait for the result.
|
||||||
|
viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) {
|
||||||
|
try {
|
||||||
val mgr = com.wzp.engine.SignalManager()
|
val mgr = com.wzp.engine.SignalManager()
|
||||||
val ok = mgr.connect(resolvedRelay, seed)
|
val ok = mgr.connect(resolvedRelay, seed)
|
||||||
viewModelScope.launch {
|
|
||||||
if (ok) {
|
if (ok) {
|
||||||
signalManager = mgr
|
signalManager = mgr
|
||||||
startSignalPolling()
|
startSignalPolling()
|
||||||
} else {
|
} else {
|
||||||
_errorMessage.value = "Failed to register on relay"
|
_errorMessage.value = "Failed to register on relay"
|
||||||
}
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
_errorMessage.value = "Register error: ${e.message}"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, "wzp-signal-connect", 8 * 1024 * 1024).start()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Poll signal manager state every 500ms */
|
/** Poll signal manager state every 500ms */
|
||||||
|
|||||||
@@ -413,24 +413,12 @@ pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConn
|
|||||||
let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default();
|
let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default();
|
||||||
let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default();
|
let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default();
|
||||||
|
|
||||||
match crate::signal_mgr::SignalManager::connect(&relay, &seed) {
|
// start() spawns its own thread internally — connect + register + recv loop
|
||||||
|
// all run on ONE thread with ONE runtime to avoid TLS conflicts.
|
||||||
|
match crate::signal_mgr::SignalManager::start(&relay, &seed) {
|
||||||
Ok(mgr) => {
|
Ok(mgr) => {
|
||||||
// Spawn recv loop on a dedicated thread
|
|
||||||
let mgr_ref = &mgr as *const crate::signal_mgr::SignalManager;
|
|
||||||
let handle = Box::new(SignalHandle { mgr });
|
let handle = Box::new(SignalHandle { mgr });
|
||||||
let raw = Box::into_raw(handle);
|
Box::into_raw(handle) as jlong
|
||||||
|
|
||||||
// Get a reference for the recv thread
|
|
||||||
let recv_ref = unsafe { &(*raw).mgr };
|
|
||||||
std::thread::Builder::new()
|
|
||||||
.name("wzp-signal-recv".into())
|
|
||||||
.stack_size(4 * 1024 * 1024)
|
|
||||||
.spawn(move || {
|
|
||||||
recv_ref.run_recv_loop();
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
raw as jlong
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("signal connect failed: {e}");
|
error!("signal connect failed: {e}");
|
||||||
|
|||||||
@@ -38,9 +38,11 @@ pub struct SignalManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SignalManager {
|
impl SignalManager {
|
||||||
/// Connect to relay, register presence, return immediately.
|
/// Connect to relay, register presence, and start recv loop.
|
||||||
/// Call `run_recv_loop()` on a separate thread after this.
|
/// This creates the SignalManager, spawns a thread that runs FOREVER
|
||||||
pub fn connect(relay_addr: &str, seed_hex: &str) -> Result<Self, anyhow::Error> {
|
/// (connect + register + recv loop all on ONE thread/runtime to avoid TLS conflicts).
|
||||||
|
/// Returns immediately after spawning.
|
||||||
|
pub fn start(relay_addr: &str, seed_hex: &str) -> Result<Self, anyhow::Error> {
|
||||||
let addr: SocketAddr = relay_addr.parse()?;
|
let addr: SocketAddr = relay_addr.parse()?;
|
||||||
let seed = if seed_hex.is_empty() {
|
let seed = if seed_hex.is_empty() {
|
||||||
wzp_crypto::Seed::generate()
|
wzp_crypto::Seed::generate()
|
||||||
@@ -52,68 +54,86 @@ impl SignalManager {
|
|||||||
let identity_pub = *pub_id.signing.as_bytes();
|
let identity_pub = *pub_id.signing.as_bytes();
|
||||||
let fp = pub_id.fingerprint.to_string();
|
let fp = pub_id.fingerprint.to_string();
|
||||||
|
|
||||||
info!(fingerprint = %fp, relay = %addr, "signal: connecting");
|
|
||||||
|
|
||||||
// Synchronous QUIC connect + register (runs on caller's thread)
|
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
|
||||||
.enable_all()
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
let transport = rt.block_on(async {
|
|
||||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
|
||||||
let endpoint = wzp_transport::create_endpoint(bind, None)?;
|
|
||||||
let client_cfg = wzp_transport::client_config();
|
|
||||||
let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?;
|
|
||||||
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
|
||||||
|
|
||||||
// Register presence
|
|
||||||
transport.send_signal(&SignalMessage::RegisterPresence {
|
|
||||||
identity_pub,
|
|
||||||
signature: vec![],
|
|
||||||
alias: None,
|
|
||||||
}).await?;
|
|
||||||
|
|
||||||
match transport.recv_signal().await? {
|
|
||||||
Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => {
|
|
||||||
info!(fingerprint = %fp, "signal: registered");
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
return Err(anyhow::anyhow!("registration failed: {other:?}"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok::<_, anyhow::Error>(transport)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Don't drop the runtime — we need it for the recv loop
|
|
||||||
// Store it... actually, we'll create a new one in run_recv_loop
|
|
||||||
drop(rt);
|
|
||||||
|
|
||||||
let state = Arc::new(Mutex::new(SignalState {
|
let state = Arc::new(Mutex::new(SignalState {
|
||||||
status: "registered".into(),
|
status: "connecting".into(),
|
||||||
fingerprint: fp,
|
fingerprint: fp.clone(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}));
|
}));
|
||||||
|
|
||||||
Ok(Self {
|
let running = Arc::new(AtomicBool::new(true));
|
||||||
transport,
|
|
||||||
state,
|
|
||||||
running: Arc::new(AtomicBool::new(true)),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Blocking signal recv loop. Run on a dedicated thread.
|
// We need a transport Arc that's shared with the spawned thread.
|
||||||
pub fn run_recv_loop(&self) {
|
// The thread does connect + register + recv loop. We use a oneshot
|
||||||
|
// channel to get the transport back after connect succeeds.
|
||||||
|
let (tx, rx) = std::sync::mpsc::channel::<Arc<wzp_transport::QuinnTransport>>();
|
||||||
|
|
||||||
|
let thread_state = Arc::clone(&state);
|
||||||
|
let thread_running = Arc::clone(&running);
|
||||||
|
let thread_fp = fp.clone();
|
||||||
|
let return_state = Arc::clone(&state);
|
||||||
|
let return_running = Arc::clone(&running);
|
||||||
|
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("wzp-signal".into())
|
||||||
|
.stack_size(4 * 1024 * 1024)
|
||||||
|
.spawn(move || {
|
||||||
|
// ONE runtime for the entire lifetime of this thread.
|
||||||
|
// Never dropped until thread exits — avoids TLS cleanup conflicts.
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()
|
.build()
|
||||||
.expect("tokio runtime for signal recv");
|
.expect("tokio runtime");
|
||||||
|
|
||||||
let transport = self.transport.clone();
|
|
||||||
let state = self.state.clone();
|
|
||||||
let running = self.running.clone();
|
|
||||||
|
|
||||||
rt.block_on(async move {
|
rt.block_on(async move {
|
||||||
|
info!(fingerprint = %thread_fp, relay = %addr, "signal: connecting");
|
||||||
|
|
||||||
|
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let endpoint = match wzp_transport::create_endpoint(bind, None) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => { error!("signal endpoint: {e}"); return; }
|
||||||
|
};
|
||||||
|
let client_cfg = wzp_transport::client_config();
|
||||||
|
let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
error!("signal connect failed: {e}");
|
||||||
|
let mut s = thread_state.lock().unwrap();
|
||||||
|
s.status = "idle".into();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
||||||
|
|
||||||
|
// Register presence
|
||||||
|
if let Err(e) = transport.send_signal(&SignalMessage::RegisterPresence {
|
||||||
|
identity_pub,
|
||||||
|
signature: vec![],
|
||||||
|
alias: None,
|
||||||
|
}).await {
|
||||||
|
error!("signal register send: {e}");
|
||||||
|
let mut s = thread_state.lock().unwrap();
|
||||||
|
s.status = "idle".into();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match transport.recv_signal().await {
|
||||||
|
Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => {
|
||||||
|
info!(fingerprint = %thread_fp, "signal: registered");
|
||||||
|
let mut s = thread_state.lock().unwrap();
|
||||||
|
s.status = "registered".into();
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
error!("signal registration failed: {other:?}");
|
||||||
|
let mut s = thread_state.lock().unwrap();
|
||||||
|
s.status = "idle".into();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send transport to the caller
|
||||||
|
let _ = tx.send(transport.clone());
|
||||||
|
|
||||||
|
// Recv loop — runs forever until stopped
|
||||||
loop {
|
loop {
|
||||||
if !running.load(Ordering::Relaxed) { break; }
|
if !running.load(Ordering::Relaxed) { break; }
|
||||||
|
|
||||||
@@ -165,9 +185,20 @@ impl SignalManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut s = state.lock().unwrap();
|
let mut s = thread_state.lock().unwrap();
|
||||||
s.status = "idle".into();
|
s.status = "idle".into();
|
||||||
});
|
}); // block_on — runtime lives until thread exits
|
||||||
|
})?; // thread spawn
|
||||||
|
|
||||||
|
// Wait for transport from the spawned thread (with timeout)
|
||||||
|
let transport = rx.recv_timeout(std::time::Duration::from_secs(10))
|
||||||
|
.map_err(|_| anyhow::anyhow!("signal connect timeout"))?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
transport,
|
||||||
|
state: return_state,
|
||||||
|
running: return_running,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get current state (non-blocking).
|
/// Get current state (non-blocking).
|
||||||
|
|||||||
Reference in New Issue
Block a user