fix: split start() into connect+register (inline) + run() (separate thread) — avoids thread::spawn closure stack overflow
This commit is contained in:
@@ -402,7 +402,7 @@ unsafe fn signal_ref(handle: jlong) -> &'static SignalHandle {
|
||||
}
|
||||
|
||||
/// Connect to relay for signaling. Returns handle (jlong) or 0 on error.
|
||||
/// MUST be called from a thread with sufficient stack (8MB).
|
||||
/// Blocks up to 10s waiting for the internal signal thread to connect.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConnect<'a>(
|
||||
mut env: JNIEnv<'a>,
|
||||
@@ -413,12 +413,24 @@ pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConn
|
||||
let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default();
|
||||
let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default();
|
||||
|
||||
// start() spawns its own thread internally — connect + register + recv loop
|
||||
// all run on ONE thread with ONE runtime to avoid TLS conflicts.
|
||||
// start() connects + registers synchronously (blocks briefly).
|
||||
// Then we spawn run() on a thread for the recv loop.
|
||||
match crate::signal_mgr::SignalManager::start(&relay, &seed) {
|
||||
Ok(mgr) => {
|
||||
let handle = Box::new(SignalHandle { mgr });
|
||||
Box::into_raw(handle) as jlong
|
||||
let raw = Box::into_raw(handle);
|
||||
|
||||
// Spawn recv loop on a small thread (no crypto init, just recv)
|
||||
let recv_ref: &'static SignalHandle = unsafe { &*raw };
|
||||
std::thread::Builder::new()
|
||||
.name("wzp-signal-recv".into())
|
||||
.stack_size(2 * 1024 * 1024) // 2MB is enough for recv-only
|
||||
.spawn(move || {
|
||||
recv_ref.mgr.run();
|
||||
})
|
||||
.ok();
|
||||
|
||||
raw as jlong
|
||||
}
|
||||
Err(e) => {
|
||||
error!("signal connect failed: {e}");
|
||||
|
||||
@@ -38,10 +38,8 @@ pub struct SignalManager {
|
||||
}
|
||||
|
||||
impl SignalManager {
|
||||
/// Connect to relay, register presence, and start recv loop.
|
||||
/// This creates the SignalManager, spawns a thread that runs FOREVER
|
||||
/// (connect + register + recv loop all on ONE thread/runtime to avoid TLS conflicts).
|
||||
/// Returns immediately after spawning.
|
||||
/// Connect to relay and register. Returns immediately with Self.
|
||||
/// Then call `run()` on a separate thread to start the recv loop.
|
||||
pub fn start(relay_addr: &str, seed_hex: &str) -> Result<Self, anyhow::Error> {
|
||||
let addr: SocketAddr = relay_addr.parse()?;
|
||||
let seed = if seed_hex.is_empty() {
|
||||
@@ -54,86 +52,66 @@ impl SignalManager {
|
||||
let identity_pub = *pub_id.signing.as_bytes();
|
||||
let fp = pub_id.fingerprint.to_string();
|
||||
|
||||
info!(fingerprint = %fp, relay = %addr, "signal: connecting");
|
||||
|
||||
// Connect + register synchronously
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let transport = rt.block_on(async {
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let endpoint = wzp_transport::create_endpoint(bind, None)?;
|
||||
let client_cfg = wzp_transport::client_config();
|
||||
let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?;
|
||||
Ok::<_, anyhow::Error>(Arc::new(wzp_transport::QuinnTransport::new(conn)))
|
||||
})?;
|
||||
|
||||
// Register presence (still on the same runtime)
|
||||
rt.block_on(async {
|
||||
transport.send_signal(&SignalMessage::RegisterPresence {
|
||||
identity_pub,
|
||||
signature: vec![],
|
||||
alias: None,
|
||||
}).await?;
|
||||
|
||||
match transport.recv_signal().await? {
|
||||
Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => {
|
||||
info!(fingerprint = %fp, "signal: registered");
|
||||
Ok(())
|
||||
}
|
||||
other => Err(anyhow::anyhow!("registration failed: {other:?}")),
|
||||
}
|
||||
})?;
|
||||
|
||||
// Don't drop runtime — keep it for run()
|
||||
let state = Arc::new(Mutex::new(SignalState {
|
||||
status: "connecting".into(),
|
||||
fingerprint: fp.clone(),
|
||||
status: "registered".into(),
|
||||
fingerprint: fp,
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
|
||||
// We need a transport Arc that's shared with the spawned thread.
|
||||
// The thread does connect + register + recv loop. We use a oneshot
|
||||
// channel to get the transport back after connect succeeds.
|
||||
let (tx, rx) = std::sync::mpsc::channel::<Arc<wzp_transport::QuinnTransport>>();
|
||||
Ok(Self {
|
||||
transport,
|
||||
state,
|
||||
running,
|
||||
})
|
||||
}
|
||||
|
||||
let thread_state = Arc::clone(&state);
|
||||
let thread_running = Arc::clone(&running);
|
||||
let thread_fp = fp.clone();
|
||||
let return_state = Arc::clone(&state);
|
||||
let return_running = Arc::clone(&running);
|
||||
/// Blocking recv loop. Run on a dedicated thread after start().
|
||||
/// Never returns until the connection drops or stop() is called.
|
||||
pub fn run(&self) {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("tokio runtime");
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("wzp-signal".into())
|
||||
.stack_size(4 * 1024 * 1024)
|
||||
.spawn(move || {
|
||||
// ONE runtime for the entire lifetime of this thread.
|
||||
// Never dropped until thread exits — avoids TLS cleanup conflicts.
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("tokio runtime");
|
||||
let transport = self.transport.clone();
|
||||
let state = self.state.clone();
|
||||
let running = self.running.clone();
|
||||
|
||||
rt.block_on(async move {
|
||||
info!(fingerprint = %thread_fp, relay = %addr, "signal: connecting");
|
||||
|
||||
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let endpoint = match wzp_transport::create_endpoint(bind, None) {
|
||||
Ok(e) => e,
|
||||
Err(e) => { error!("signal endpoint: {e}"); return; }
|
||||
};
|
||||
let client_cfg = wzp_transport::client_config();
|
||||
let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
error!("signal connect failed: {e}");
|
||||
let mut s = thread_state.lock().unwrap();
|
||||
s.status = "idle".into();
|
||||
return;
|
||||
}
|
||||
};
|
||||
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
||||
|
||||
// Register presence
|
||||
if let Err(e) = transport.send_signal(&SignalMessage::RegisterPresence {
|
||||
identity_pub,
|
||||
signature: vec![],
|
||||
alias: None,
|
||||
}).await {
|
||||
error!("signal register send: {e}");
|
||||
let mut s = thread_state.lock().unwrap();
|
||||
s.status = "idle".into();
|
||||
return;
|
||||
}
|
||||
|
||||
match transport.recv_signal().await {
|
||||
Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => {
|
||||
info!(fingerprint = %thread_fp, "signal: registered");
|
||||
let mut s = thread_state.lock().unwrap();
|
||||
s.status = "registered".into();
|
||||
}
|
||||
other => {
|
||||
error!("signal registration failed: {other:?}");
|
||||
let mut s = thread_state.lock().unwrap();
|
||||
s.status = "idle".into();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Send transport to the caller
|
||||
let _ = tx.send(transport.clone());
|
||||
|
||||
// Recv loop — runs forever until stopped
|
||||
rt.block_on(async move {
|
||||
loop {
|
||||
if !running.load(Ordering::Relaxed) { break; }
|
||||
|
||||
@@ -185,20 +163,9 @@ impl SignalManager {
|
||||
}
|
||||
}
|
||||
|
||||
let mut s = thread_state.lock().unwrap();
|
||||
let mut s = state.lock().unwrap();
|
||||
s.status = "idle".into();
|
||||
}); // block_on — runtime lives until thread exits
|
||||
})?; // thread spawn
|
||||
|
||||
// Wait for transport from the spawned thread (with timeout)
|
||||
let transport = rx.recv_timeout(std::time::Duration::from_secs(10))
|
||||
.map_err(|_| anyhow::anyhow!("signal connect timeout"))?;
|
||||
|
||||
Ok(Self {
|
||||
transport,
|
||||
state: return_state,
|
||||
running: return_running,
|
||||
})
|
||||
}); // block_on
|
||||
}
|
||||
|
||||
/// Get current state (non-blocking).
|
||||
|
||||
Reference in New Issue
Block a user