fix: move ALL signaling code into JNI-spawned 8MB thread — zero Rust on caller stack
This commit is contained in:
@@ -363,9 +363,9 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativePingRelay<'a>(
|
|||||||
// ── Direct calling JNI functions ──
|
// ── Direct calling JNI functions ──
|
||||||
|
|
||||||
/// Start persistent signaling connection to relay for direct calls.
|
/// Start persistent signaling connection to relay for direct calls.
|
||||||
/// Returns 0 on success, -1 on error.
|
/// Returns 0 immediately — the actual work happens on a dedicated thread.
|
||||||
/// NOTE: No panic::catch_unwind here — Android's Kotlin dispatcher thread has
|
/// The JNI function MUST be minimal because Android's DefaultDispatch thread
|
||||||
/// very limited stack (~1MB) and the unwind machinery overflows it.
|
/// has a tiny stack that overflows with any Rust crypto/network code.
|
||||||
#[unsafe(no_mangle)]
|
#[unsafe(no_mangle)]
|
||||||
pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling<'a>(
|
pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling<'a>(
|
||||||
mut env: JNIEnv<'a>,
|
mut env: JNIEnv<'a>,
|
||||||
@@ -376,21 +376,132 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling
|
|||||||
token_j: JString,
|
token_j: JString,
|
||||||
alias_j: JString,
|
alias_j: JString,
|
||||||
) -> jint {
|
) -> jint {
|
||||||
let h = unsafe { handle_ref(handle) };
|
// Extract JNI strings — this is all we do on the caller's thread
|
||||||
let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default();
|
let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default();
|
||||||
let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default();
|
let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default();
|
||||||
let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default();
|
let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default();
|
||||||
let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default();
|
let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default();
|
||||||
|
|
||||||
match h.engine.start_signaling(
|
// Spawn a Java-level thread with large stack to run start_signaling.
|
||||||
&relay_addr,
|
// We CANNOT call start_signaling on this thread — even thread::spawn
|
||||||
&seed_hex,
|
// overflows the ~512KB DefaultDispatch stack with Rust's codegen.
|
||||||
if token.is_empty() { None } else { Some(&token) },
|
let h = unsafe { handle_ref(handle) };
|
||||||
if alias.is_empty() { None } else { Some(&alias) },
|
let state = h.engine.state.clone();
|
||||||
) {
|
let running = &h.engine.state.running;
|
||||||
Ok(()) => 0,
|
running.store(true, std::sync::atomic::Ordering::Release);
|
||||||
Err(e) => { error!("start_signaling failed: {e}"); -1 }
|
|
||||||
}
|
std::thread::Builder::new()
|
||||||
|
.name("wzp-signal-init".into())
|
||||||
|
.stack_size(8 * 1024 * 1024)
|
||||||
|
.spawn(move || {
|
||||||
|
use wzp_proto::{MediaTransport, SignalMessage};
|
||||||
|
|
||||||
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
|
let addr: std::net::SocketAddr = match relay_addr.parse() {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => { tracing::error!("bad relay addr: {e}"); return; }
|
||||||
|
};
|
||||||
|
let seed = if seed_hex.is_empty() {
|
||||||
|
wzp_crypto::Seed::generate()
|
||||||
|
} else {
|
||||||
|
match wzp_crypto::Seed::from_hex(&seed_hex) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => { tracing::error!("bad seed: {e}"); return; }
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let identity = seed.derive_identity();
|
||||||
|
let pub_id = identity.public_identity();
|
||||||
|
let identity_pub = *pub_id.signing.as_bytes();
|
||||||
|
let fp = pub_id.fingerprint.to_string();
|
||||||
|
|
||||||
|
tracing::info!(fingerprint = %fp, relay = %addr, "signal thread: starting");
|
||||||
|
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.expect("tokio runtime");
|
||||||
|
|
||||||
|
let signal_state = state.clone();
|
||||||
|
rt.block_on(async move {
|
||||||
|
let bind: std::net::SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let endpoint = match wzp_transport::create_endpoint(bind, None) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => { tracing::error!("signal endpoint: {e}"); return; }
|
||||||
|
};
|
||||||
|
let client_cfg = wzp_transport::client_config();
|
||||||
|
let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => { tracing::error!("signal connect: {e}"); return; }
|
||||||
|
};
|
||||||
|
let transport = std::sync::Arc::new(wzp_transport::QuinnTransport::new(conn));
|
||||||
|
|
||||||
|
if !token.is_empty() {
|
||||||
|
let _ = transport.send_signal(&SignalMessage::AuthToken { token: token.clone() }).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let alias_val = if alias.is_empty() { None } else { Some(alias.clone()) };
|
||||||
|
let _ = transport.send_signal(&SignalMessage::RegisterPresence {
|
||||||
|
identity_pub,
|
||||||
|
signature: vec![],
|
||||||
|
alias: alias_val,
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
match transport.recv_signal().await {
|
||||||
|
Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => {
|
||||||
|
tracing::info!(fingerprint = %fp, "signal: registered");
|
||||||
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
|
stats.state = crate::stats::CallState::Registered;
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
tracing::error!("signal registration failed: {other:?}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if !signal_state.running.load(std::sync::atomic::Ordering::Relaxed) { break; }
|
||||||
|
match transport.recv_signal().await {
|
||||||
|
Ok(Some(SignalMessage::CallRinging { call_id })) => {
|
||||||
|
tracing::info!(call_id = %call_id, "signal: ringing");
|
||||||
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
|
stats.state = crate::stats::CallState::Ringing;
|
||||||
|
}
|
||||||
|
Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => {
|
||||||
|
tracing::info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call");
|
||||||
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
|
stats.state = crate::stats::CallState::IncomingCall;
|
||||||
|
stats.incoming_call_id = Some(call_id);
|
||||||
|
stats.incoming_caller_fp = Some(caller_fingerprint);
|
||||||
|
stats.incoming_caller_alias = caller_alias;
|
||||||
|
}
|
||||||
|
Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => {
|
||||||
|
tracing::info!(call_id = %call_id, room = %room, "signal: call setup");
|
||||||
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
|
stats.state = crate::stats::CallState::Connecting;
|
||||||
|
stats.incoming_call_id = Some(format!("{relay_addr}|{room}"));
|
||||||
|
}
|
||||||
|
Ok(Some(SignalMessage::Hangup { reason })) => {
|
||||||
|
tracing::info!(reason = ?reason, "signal: hangup");
|
||||||
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
|
stats.state = crate::stats::CallState::Closed;
|
||||||
|
stats.incoming_call_id = None;
|
||||||
|
stats.incoming_caller_fp = None;
|
||||||
|
stats.incoming_caller_alias = None;
|
||||||
|
}
|
||||||
|
Ok(Some(_)) => {}
|
||||||
|
Ok(None) => { tracing::info!("signal: closed"); break; }
|
||||||
|
Err(e) => { tracing::error!("signal recv: {e}"); break; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
|
stats.state = crate::stats::CallState::Closed;
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.ok();
|
||||||
|
|
||||||
|
0 // always return success — actual errors logged from the thread
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Place a direct call to a target fingerprint.
|
/// Place a direct call to a target fingerprint.
|
||||||
|
|||||||
Reference in New Issue
Block a user