fix: use block_on pattern for signaling (same as start_call) — no thread::spawn
Some checks failed
Mirror to GitHub / mirror (push) Failing after 37s
Build Release Binaries / build-amd64 (push) Failing after 3m50s

This commit is contained in:
Siavash Sameni
2026-04-09 08:33:08 +04:00
parent f44306cc17
commit ed09c2e8cc
2 changed files with 36 additions and 159 deletions

View File

@@ -246,6 +246,9 @@ impl WzpEngine {
/// Start persistent signaling connection for direct calls.
/// Spawns a background task that maintains the `_signal` connection.
/// Start persistent signaling for direct calls.
/// Blocks the calling thread (Kotlin provides a Thread with 8MB stack).
/// Same pattern as start_call: tokio block_on on the caller's thread.
pub fn start_signaling(
&mut self,
relay_addr: &str,
@@ -253,52 +256,34 @@ impl WzpEngine {
token: Option<&str>,
alias: Option<&str>,
) -> Result<(), anyhow::Error> {
// Capture lightweight params only — all crypto work happens on the spawned thread
let addr_str = relay_addr.to_string();
let seed_str = seed_hex.to_string();
use wzp_proto::{MediaTransport, SignalMessage};
let _ = rustls::crypto::ring::default_provider().install_default();
let addr: SocketAddr = relay_addr.parse()?;
let seed = if seed_hex.is_empty() {
wzp_crypto::Seed::generate()
} else {
wzp_crypto::Seed::from_hex(seed_hex).map_err(|e| anyhow::anyhow!(e))?
};
let identity = seed.derive_identity();
let pub_id = identity.public_identity();
let identity_pub = *pub_id.signing.as_bytes();
let fp = pub_id.fingerprint.to_string();
let token = token.map(|s| s.to_string());
let alias = alias.map(|s| s.to_string());
let state = self.state.clone();
info!(fingerprint = %fp, relay = %addr, "starting signaling");
self.state.running.store(true, Ordering::Release);
// Spawn on a dedicated thread — Android's Kotlin dispatcher has ~1MB stack,
// too small for rustls + QUIC + tokio + crypto. Do ALL work on this thread.
std::thread::Builder::new()
.name("wzp-signal".into())
.stack_size(8 * 1024 * 1024) // 8MB stack
.spawn(move || {
use wzp_proto::{MediaTransport, SignalMessage};
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let _ = rustls::crypto::ring::default_provider().install_default();
let addr: SocketAddr = match addr_str.parse() {
Ok(a) => a,
Err(e) => { error!("bad relay addr: {e}"); return; }
};
let seed = if seed_str.is_empty() {
wzp_crypto::Seed::generate()
} else {
match wzp_crypto::Seed::from_hex(&seed_str) {
Ok(s) => s,
Err(e) => { error!("bad seed: {e}"); return; }
}
};
let identity = seed.derive_identity();
let pub_id = identity.public_identity();
let identity_pub = *pub_id.signing.as_bytes();
let fp = pub_id.fingerprint.to_string();
info!(fingerprint = %fp, relay = %addr, "starting signaling");
let signal_state = state.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");
rt.block_on(async move {
let _ = rustls::crypto::ring::default_provider().install_default();
let signal_state = state.clone();
rt.block_on(async move {
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = match wzp_transport::create_endpoint(bind, None) {
Ok(e) => e,
@@ -389,8 +374,7 @@ impl WzpEngine {
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Closed;
}); // block_on
})?; // thread spawn
}); // block_on
Ok(())
}

View File

@@ -382,126 +382,19 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling
let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default();
let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default();
// Spawn a Java-level thread with large stack to run start_signaling.
// We CANNOT call start_signaling on this thread — even thread::spawn
// overflows the ~512KB DefaultDispatch stack with Rust's codegen.
// Use the existing start_call pattern — create engine, call start_signaling
// which spawns a thread internally. This is the same pattern that works for room calls.
let h = unsafe { handle_ref(handle) };
let state = h.engine.state.clone();
let running = &h.engine.state.running;
running.store(true, std::sync::atomic::Ordering::Release);
std::thread::Builder::new()
.name("wzp-signal-init".into())
.stack_size(8 * 1024 * 1024)
.spawn(move || {
use wzp_proto::{MediaTransport, SignalMessage};
let _ = rustls::crypto::ring::default_provider().install_default();
let addr: std::net::SocketAddr = match relay_addr.parse() {
Ok(a) => a,
Err(e) => { tracing::error!("bad relay addr: {e}"); return; }
};
let seed = if seed_hex.is_empty() {
wzp_crypto::Seed::generate()
} else {
match wzp_crypto::Seed::from_hex(&seed_hex) {
Ok(s) => s,
Err(e) => { tracing::error!("bad seed: {e}"); return; }
}
};
let identity = seed.derive_identity();
let pub_id = identity.public_identity();
let identity_pub = *pub_id.signing.as_bytes();
let fp = pub_id.fingerprint.to_string();
tracing::info!(fingerprint = %fp, relay = %addr, "signal thread: starting");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");
let signal_state = state.clone();
rt.block_on(async move {
let bind: std::net::SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = match wzp_transport::create_endpoint(bind, None) {
Ok(e) => e,
Err(e) => { tracing::error!("signal endpoint: {e}"); return; }
};
let client_cfg = wzp_transport::client_config();
let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await {
Ok(c) => c,
Err(e) => { tracing::error!("signal connect: {e}"); return; }
};
let transport = std::sync::Arc::new(wzp_transport::QuinnTransport::new(conn));
if !token.is_empty() {
let _ = transport.send_signal(&SignalMessage::AuthToken { token: token.clone() }).await;
}
let alias_val = if alias.is_empty() { None } else { Some(alias.clone()) };
let _ = transport.send_signal(&SignalMessage::RegisterPresence {
identity_pub,
signature: vec![],
alias: alias_val,
}).await;
match transport.recv_signal().await {
Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => {
tracing::info!(fingerprint = %fp, "signal: registered");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Registered;
}
other => {
tracing::error!("signal registration failed: {other:?}");
return;
}
}
loop {
if !signal_state.running.load(std::sync::atomic::Ordering::Relaxed) { break; }
match transport.recv_signal().await {
Ok(Some(SignalMessage::CallRinging { call_id })) => {
tracing::info!(call_id = %call_id, "signal: ringing");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Ringing;
}
Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => {
tracing::info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::IncomingCall;
stats.incoming_call_id = Some(call_id);
stats.incoming_caller_fp = Some(caller_fingerprint);
stats.incoming_caller_alias = caller_alias;
}
Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => {
tracing::info!(call_id = %call_id, room = %room, "signal: call setup");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Connecting;
stats.incoming_call_id = Some(format!("{relay_addr}|{room}"));
}
Ok(Some(SignalMessage::Hangup { reason })) => {
tracing::info!(reason = ?reason, "signal: hangup");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Closed;
stats.incoming_call_id = None;
stats.incoming_caller_fp = None;
stats.incoming_caller_alias = None;
}
Ok(Some(_)) => {}
Ok(None) => { tracing::info!("signal: closed"); break; }
Err(e) => { tracing::error!("signal recv: {e}"); break; }
}
}
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Closed;
});
})
.ok();
0 // always return success — actual errors logged from the thread
match h.engine.start_signaling(
&relay_addr,
&seed_hex,
if token.is_empty() { None } else { Some(&token) },
if alias.is_empty() { None } else { Some(&alias) },
) {
Ok(()) => 0,
Err(e) => { error!("start_signaling failed: {e}"); -1 }
}
}
/// Place a direct call to a target fingerprint.