From abc96e8887b3d0d64f790248912650e7399529dd Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Thu, 9 Apr 2026 09:34:36 +0400 Subject: [PATCH] refactor: separate SignalManager from WzpEngine for direct calling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SignalManager (NEW): - Dedicated Rust struct with its own QUIC connection to _signal - Separate JNI handle (nativeSignalConnect/GetState/PlaceCall/etc) - Kotlin wrapper polls state every 500ms via getState() JSON - Lives independently of WzpEngine — survives across calls - connect() blocks briefly on 8MB thread, then recv loop runs on dedicated thread WzpEngine (CLEANED): - Back to pure media-only role (audio, codec, FEC, jitter) - Removed start_signaling/place_call/answer_call methods - Removed signal_transport/signal_fingerprint from EngineState CallViewModel: - Two separate managers: signalManager (persistent) + engine (per-call) - Two separate polling loops: signalPollJob + statsJob - Auto-connect to media room when signal polling detects "setup" state - hangupDirectCall() ends media but keeps signal alive Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/java/com/wzp/engine/SignalManager.kt | 97 +++++++ .../java/com/wzp/ui/call/CallViewModel.kt | 108 +++---- .../main/java/com/wzp/ui/call/InCallScreen.kt | 16 +- crates/wzp-android/src/engine.rs | 204 +------------- crates/wzp-android/src/jni_bridge.rs | 177 +++++++----- crates/wzp-android/src/lib.rs | 1 + crates/wzp-android/src/signal_mgr.rs | 265 ++++++++++++++++++ 7 files changed, 542 insertions(+), 326 deletions(-) create mode 100644 android/app/src/main/java/com/wzp/engine/SignalManager.kt create mode 100644 crates/wzp-android/src/signal_mgr.rs diff --git a/android/app/src/main/java/com/wzp/engine/SignalManager.kt b/android/app/src/main/java/com/wzp/engine/SignalManager.kt new file mode 100644 index 0000000..ec065b5 --- /dev/null +++ b/android/app/src/main/java/com/wzp/engine/SignalManager.kt @@ -0,0 +1,97 @@ +package com.wzp.engine + +import org.json.JSONObject + +/** + * Persistent signal connection for direct 1:1 calls. + * Separate from WzpEngine — survives across calls. + * + * Lifecycle: connect() → [placeCall/answerCall] → destroy() + */ +class SignalManager { + + private var handle: Long = 0L + + val isConnected: Boolean get() = handle != 0L + + /** + * Connect to relay and register for direct calls. + * MUST be called from a thread with sufficient stack (8MB). + * Blocks briefly during QUIC connect + register, then returns. + */ + fun connect(relay: String, seedHex: String): Boolean { + if (handle != 0L) return true // already connected + handle = nativeSignalConnect(relay, seedHex) + return handle != 0L + } + + /** Get current signal state as parsed object. Non-blocking. */ + fun getState(): SignalState { + if (handle == 0L) return SignalState() + val json = nativeSignalGetState(handle) ?: return SignalState() + return try { + val obj = JSONObject(json) + SignalState( + status = obj.optString("status", "idle"), + fingerprint = obj.optString("fingerprint", ""), + incomingCallId = if (obj.isNull("incoming_call_id")) null else obj.optString("incoming_call_id"), + incomingCallerFp = if (obj.isNull("incoming_caller_fp")) null else obj.optString("incoming_caller_fp"), + incomingCallerAlias = if (obj.isNull("incoming_caller_alias")) null else obj.optString("incoming_caller_alias"), + callSetupRelay = if (obj.isNull("call_setup_relay")) null else obj.optString("call_setup_relay"), + callSetupRoom = if (obj.isNull("call_setup_room")) null else obj.optString("call_setup_room"), + callSetupId = if (obj.isNull("call_setup_id")) null else obj.optString("call_setup_id"), + ) + } catch (e: Exception) { + SignalState() + } + } + + /** Place a direct call to a target fingerprint. */ + fun placeCall(targetFp: String): Int { + if (handle == 0L) return -1 + return nativeSignalPlaceCall(handle, targetFp) + } + + /** Answer an incoming call. mode: 0=Reject, 1=AcceptTrusted, 2=AcceptGeneric */ + fun answerCall(callId: String, mode: Int = 2): Int { + if (handle == 0L) return -1 + return nativeSignalAnswerCall(handle, callId, mode) + } + + /** Send hangup signal. */ + fun hangup() { + if (handle != 0L) nativeSignalHangup(handle) + } + + /** Destroy the signal manager. */ + fun destroy() { + if (handle != 0L) { + nativeSignalDestroy(handle) + handle = 0L + } + } + + // JNI native methods + private external fun nativeSignalConnect(relay: String, seed: String): Long + private external fun nativeSignalGetState(handle: Long): String? + private external fun nativeSignalPlaceCall(handle: Long, targetFp: String): Int + private external fun nativeSignalAnswerCall(handle: Long, callId: String, mode: Int): Int + private external fun nativeSignalHangup(handle: Long) + private external fun nativeSignalDestroy(handle: Long) + + companion object { + init { System.loadLibrary("wzp_android") } + } +} + +/** Signal connection state. */ +data class SignalState( + val status: String = "idle", + val fingerprint: String = "", + val incomingCallId: String? = null, + val incomingCallerFp: String? = null, + val incomingCallerAlias: String? = null, + val callSetupRelay: String? = null, + val callSetupRoom: String? = null, + val callSetupId: String? = null, +) diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt index 4b9b34d..d59ddb2 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt @@ -141,9 +141,9 @@ class CallViewModel : ViewModel(), WzpCallback { private val _targetFingerprint = MutableStateFlow("") val targetFingerprint: StateFlow = _targetFingerprint.asStateFlow() - /** Signal connection state: 0=idle, 5=registered, 6=ringing, 7=incoming */ - private val _signalState = MutableStateFlow(0) - val signalState: StateFlow = _signalState.asStateFlow() + /** Signal state string: "idle", "registered", "ringing", "incoming", "setup" */ + private val _signalState = MutableStateFlow("idle") + val signalState: StateFlow = _signalState.asStateFlow() /** Incoming call info */ private val _incomingCallId = MutableStateFlow(null) @@ -155,37 +155,65 @@ class CallViewModel : ViewModel(), WzpCallback { private val _incomingCallerAlias = MutableStateFlow(null) val incomingCallerAlias: StateFlow = _incomingCallerAlias.asStateFlow() + /** Separate signal manager (persistent, survives calls) */ + private var signalManager: com.wzp.engine.SignalManager? = null + private var signalPollJob: Job? = null + fun setCallMode(mode: Int) { _callMode.value = mode } fun setTargetFingerprint(fp: String) { _targetFingerprint.value = fp } /** Register on relay for direct calls */ fun registerForCalls() { - if (engine == null) { - engine = WzpEngine(this).also { it.init() } - } val serverIdx = _selectedServer.value val serverList = _servers.value if (serverIdx >= serverList.size) return val relay = serverList[serverIdx].address val seed = _seedHex.value - val alias = _alias.value - - // Start stats polling BEFORE blocking — startSignaling blocks the thread forever - startStatsPolling() - - // Use a Java Thread with 8MB stack — blocks forever in signal recv loop val resolvedRelay = resolveToIp(relay) ?: relay + + // Connect on a thread with 8MB stack (QUIC + TLS needs it) Thread(null, { - val result = engine?.startSignaling(resolvedRelay, seed, "", alias) - // Only reached if signaling disconnects + val mgr = com.wzp.engine.SignalManager() + val ok = mgr.connect(resolvedRelay, seed) viewModelScope.launch { - if (result != 0) { - _errorMessage.value = "Signal connection lost" + if (ok) { + signalManager = mgr + startSignalPolling() + } else { + _errorMessage.value = "Failed to register on relay" } - _signalState.value = 0 } - }, "wzp-register", 8 * 1024 * 1024).start() + }, "wzp-signal-connect", 8 * 1024 * 1024).start() + } + + /** Poll signal manager state every 500ms */ + private fun startSignalPolling() { + signalPollJob?.cancel() + signalPollJob = viewModelScope.launch { + while (isActive) { + val mgr = signalManager + if (mgr != null && mgr.isConnected) { + val state = mgr.getState() + _signalState.value = state.status + _incomingCallId.value = state.incomingCallId + _incomingCallerFp.value = state.incomingCallerFp + _incomingCallerAlias.value = state.incomingCallerAlias + + // Auto-connect to media room when call is set up + if (state.status == "setup" && state.callSetupRelay != null && state.callSetupRoom != null) { + Log.i(TAG, "CallSetup: connecting to ${state.callSetupRelay} room ${state.callSetupRoom}") + startCallInternal(state.callSetupRelay, state.callSetupRoom) + } + } + delay(500L) + } + } + } + + private fun stopSignalPolling() { + signalPollJob?.cancel() + signalPollJob = null } /** Place a direct call to the target fingerprint */ @@ -195,24 +223,28 @@ class CallViewModel : ViewModel(), WzpCallback { _errorMessage.value = "Enter a fingerprint to call" return } - engine?.placeCall(target) - _signalState.value = 6 // Ringing + signalManager?.placeCall(target) } /** Answer an incoming direct call */ fun answerIncomingCall(mode: Int = 2) { val callId = _incomingCallId.value ?: return - engine?.answerCall(callId, mode) + signalManager?.answerCall(callId, mode) } /** Reject an incoming direct call */ fun rejectIncomingCall() { val callId = _incomingCallId.value ?: return - engine?.answerCall(callId, 0) // 0 = Reject - _signalState.value = 5 // Back to registered - _incomingCallId.value = null - _incomingCallerFp.value = null - _incomingCallerAlias.value = null + signalManager?.answerCall(callId, 0) + } + + /** Hang up direct call — media ends, signal stays alive */ + fun hangupDirectCall() { + signalManager?.hangup() + engine?.stopCall() + engine?.destroy() + engine = null + engineInitialized = false } companion object { @@ -690,30 +722,10 @@ class CallViewModel : ViewModel(), WzpCallback { val s = CallStats.fromJson(json) lastCallDuration = s.durationSecs _stats.value = s - // Track signal state changes for direct calling - if (s.state in 5..7) { - _signalState.value = s.state - // Don't update callState for signal-only states - } else if (s.state != 0) { + // Only update callState from media engine stats (not signal) + if (s.state != 0) { _callState.value = s.state } - // Incoming call detection - if (s.state == 7) { // IncomingCall - _incomingCallId.value = s.incomingCallId - _incomingCallerFp.value = s.incomingCallerFp - _incomingCallerAlias.value = s.incomingCallerAlias - } - // CallSetup: auto-connect to media room - if (s.state == 1 && s.incomingCallId != null && s.incomingCallId.contains("|")) { - // Format: "relay_addr|room_name" - val parts = s.incomingCallId.split("|", limit = 2) - if (parts.size == 2) { - val mediaRelay = parts[0] - val mediaRoom = parts[1] - Log.i(TAG, "CallSetup: connecting to $mediaRelay room $mediaRoom") - startCallInternal(mediaRelay, mediaRoom) - } - } if (s.state == 2 && !audioStarted) { startAudio() } diff --git a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt index d4b3740..e8d582e 100644 --- a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt +++ b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt @@ -219,7 +219,7 @@ fun InCallScreen( // Mode toggle: Room vs Direct Call val callMode by viewModel.callMode.collectAsState() - val signalState by viewModel.signalState.collectAsState() + val signalState by viewModel.signalState.collectAsState() // "idle"/"registered"/"ringing"/etc val targetFp by viewModel.targetFingerprint.collectAsState() val incomingCallId by viewModel.incomingCallId.collectAsState() val incomingCallerFp by viewModel.incomingCallerFp.collectAsState() @@ -309,7 +309,7 @@ fun InCallScreen( } } else { // ── Direct call mode ── - if (signalState < 5) { + if (signalState == "idle") { // Not registered yet SectionLabel("ALIAS") OutlinedTextField( @@ -333,7 +333,7 @@ fun InCallScreen( color = Color.White ) } - } else if (signalState == 5) { + } else if (signalState == "registered" || signalState == "incoming") { // Registered — show dial pad Text( "\u2705 Registered — waiting for calls", @@ -403,8 +403,7 @@ fun InCallScreen( color = Color.White ) } - } else if (signalState == 6) { - // Ringing + } else if (signalState == "ringing") { Text( "\uD83D\uDD14 Ringing...", color = Yellow, @@ -412,11 +411,10 @@ fun InCallScreen( textAlign = TextAlign.Center, modifier = Modifier.fillMaxWidth() ) - } else if (signalState == 7) { - // Incoming call (state 7 also handled above in registered view) + } else if (signalState == "setup") { Text( - "\uD83D\uDCDE Incoming call...", - color = Green, + "Connecting to call...", + color = Accent, style = MaterialTheme.typography.titleMedium, textAlign = TextAlign.Center, modifier = Modifier.fillMaxWidth() diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index c88bafb..a6b3d45 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -97,10 +97,6 @@ pub(crate) struct EngineState { /// QUIC transport handle — stored so stop_call() can close it immediately, /// triggering relay-side leave + RoomUpdate broadcast. pub quic_transport: Mutex>>, - /// Signal transport for direct calling — stored so place_call/answer_call can send. - pub signal_transport: Mutex>>, - /// Our fingerprint (set during signaling registration). - pub signal_fingerprint: Mutex>, } pub struct WzpEngine { @@ -122,8 +118,6 @@ impl WzpEngine { playout_ring: AudioRing::new(), audio_level_rms: AtomicU32::new(0), quic_transport: Mutex::new(None), - signal_transport: Mutex::new(None), - signal_fingerprint: Mutex::new(None), }); Self { state, @@ -250,203 +244,7 @@ impl WzpEngine { } /// Start persistent signaling connection for direct calls. - /// Spawns a background task that maintains the `_signal` connection. - /// Start persistent signaling for direct calls. - /// Blocks the calling thread (Kotlin provides a Thread with 8MB stack). - /// Same pattern as start_call: tokio block_on on the caller's thread. - pub fn start_signaling( - &mut self, - relay_addr: &str, - seed_hex: &str, - token: Option<&str>, - alias: Option<&str>, - ) -> Result<(), anyhow::Error> { - use wzp_proto::{MediaTransport, SignalMessage}; - - - let addr: SocketAddr = relay_addr.parse()?; - let seed = if seed_hex.is_empty() { - wzp_crypto::Seed::generate() - } else { - wzp_crypto::Seed::from_hex(seed_hex).map_err(|e| anyhow::anyhow!(e))? - }; - let identity = seed.derive_identity(); - let pub_id = identity.public_identity(); - let identity_pub = *pub_id.signing.as_bytes(); - let fp = pub_id.fingerprint.to_string(); - let token = token.map(|s| s.to_string()); - let alias = alias.map(|s| s.to_string()); - let state = self.state.clone(); - - info!(fingerprint = %fp, relay = %addr, "starting signaling"); - - self.state.running.store(true, Ordering::Release); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let signal_state = state.clone(); - rt.block_on(async move { - let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let endpoint = match wzp_transport::create_endpoint(bind, None) { - Ok(e) => e, - Err(e) => { error!("signal endpoint: {e}"); return; } - }; - let client_cfg = wzp_transport::client_config(); - let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await { - Ok(c) => c, - Err(e) => { error!("signal connect: {e}"); return; } - }; - let transport = std::sync::Arc::new(wzp_transport::QuinnTransport::new(conn)); - - // Auth if token provided - if let Some(ref tok) = token { - let _ = transport.send_signal(&SignalMessage::AuthToken { token: tok.clone() }).await; - } - - // Register presence - let _ = transport.send_signal(&SignalMessage::RegisterPresence { - identity_pub, - signature: vec![], - alias: alias.clone(), - }).await; - - // Wait for ack - match transport.recv_signal().await { - Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => { - info!(fingerprint = %fp, "signal: registered"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Registered; - drop(stats); - // Store transport + fingerprint so place_call/answer_call can use them - *signal_state.signal_transport.lock().unwrap() = Some(transport.clone()); - *signal_state.signal_fingerprint.lock().unwrap() = Some(fp.clone()); - } - other => { - error!("signal registration failed: {other:?}"); - return; - } - } - - // Signal recv loop - loop { - if !signal_state.running.load(Ordering::Relaxed) { - break; - } - match transport.recv_signal().await { - Ok(Some(SignalMessage::CallRinging { call_id })) => { - info!(call_id = %call_id, "signal: ringing"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Ringing; - } - Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => { - info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::IncomingCall; - stats.incoming_call_id = Some(call_id); - stats.incoming_caller_fp = Some(caller_fingerprint); - stats.incoming_caller_alias = caller_alias; - } - Ok(Some(SignalMessage::DirectCallAnswer { call_id, accept_mode, .. })) => { - info!(call_id = %call_id, mode = ?accept_mode, "signal: call answered"); - } - Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => { - info!(call_id = %call_id, room = %room, relay = %relay_addr, "signal: call setup"); - // Connect to media room via the existing start_call mechanism - // Store the room info so Kotlin can call startCall with it - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Connecting; - // Store call setup info for Kotlin to pick up - stats.incoming_call_id = Some(format!("{relay_addr}|{room}")); - } - Ok(Some(SignalMessage::Hangup { reason })) => { - info!(reason = ?reason, "signal: call ended by remote"); - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Closed; - stats.incoming_call_id = None; - stats.incoming_caller_fp = None; - stats.incoming_caller_alias = None; - } - Ok(Some(_)) => {} - Ok(None) => { - info!("signal: connection closed"); - break; - } - Err(e) => { - error!("signal recv error: {e}"); - break; - } - } - } - - let mut stats = signal_state.stats.lock().unwrap(); - stats.state = crate::stats::CallState::Closed; - }); // block_on - - Ok(()) - } - - /// Place a direct call to a target fingerprint via the signal transport. - pub fn place_call(&self, target_fingerprint: &str) -> Result<(), anyhow::Error> { - use wzp_proto::SignalMessage; - - let transport = self.state.signal_transport.lock().unwrap().clone() - .ok_or_else(|| anyhow::anyhow!("not registered"))?; - let caller_fp = self.state.signal_fingerprint.lock().unwrap().clone() - .unwrap_or_default(); - let target = target_fingerprint.to_string(); - let call_id = format!("{:016x}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); - - // Send on a separate thread since we can't block the UI thread - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime"); - rt.block_on(async { - let _ = transport.send_signal(&SignalMessage::DirectCallOffer { - caller_fingerprint: caller_fp, - caller_alias: None, - target_fingerprint: target, - call_id, - identity_pub: [0u8; 32], - ephemeral_pub: [0u8; 32], - signature: vec![], - supported_profiles: vec![wzp_proto::QualityProfile::GOOD], - }).await; - }); - }); - Ok(()) - } - - /// Answer an incoming direct call via the signal transport. - pub fn answer_call(&self, call_id: &str, mode: wzp_proto::CallAcceptMode) -> Result<(), anyhow::Error> { - use wzp_proto::SignalMessage; - - let transport = self.state.signal_transport.lock().unwrap().clone() - .ok_or_else(|| anyhow::anyhow!("not registered"))?; - let call_id = call_id.to_string(); - - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("tokio runtime"); - rt.block_on(async { - let _ = transport.send_signal(&SignalMessage::DirectCallAnswer { - call_id, - accept_mode: mode, - identity_pub: None, - ephemeral_pub: None, - signature: None, - chosen_profile: Some(wzp_proto::QualityProfile::GOOD), - }).await; - }); - }); - Ok(()) - } + // Signal methods (start_signaling, place_call, answer_call) moved to signal_mgr.rs pub fn set_mute(&self, muted: bool) { self.state.muted.store(muted, Ordering::Relaxed); diff --git a/crates/wzp-android/src/jni_bridge.rs b/crates/wzp-android/src/jni_bridge.rs index 5fe7469..e5f5b76 100644 --- a/crates/wzp-android/src/jni_bridge.rs +++ b/crates/wzp-android/src/jni_bridge.rs @@ -390,87 +390,132 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeGetFingerprint // ── Direct calling JNI functions ── -/// Start persistent signaling connection to relay for direct calls. -/// Returns 0 immediately — the actual work happens on a dedicated thread. -/// The JNI function MUST be minimal because Android's DefaultDispatch thread -/// has a tiny stack that overflows with any Rust crypto/network code. +// ── SignalManager JNI functions ── + +/// Opaque handle for SignalManager (separate from EngineHandle). +struct SignalHandle { + mgr: crate::signal_mgr::SignalManager, +} + +unsafe fn signal_ref(handle: jlong) -> &'static SignalHandle { + unsafe { &*(handle as *const SignalHandle) } +} + +/// Connect to relay for signaling. Returns handle (jlong) or 0 on error. +/// MUST be called from a thread with sufficient stack (8MB). #[unsafe(no_mangle)] -pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling<'a>( +pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConnect<'a>( + mut env: JNIEnv<'a>, + _class: JClass, + relay_j: JString, + seed_j: JString, +) -> jlong { + let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default(); + let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default(); + + match crate::signal_mgr::SignalManager::connect(&relay, &seed) { + Ok(mgr) => { + // Spawn recv loop on a dedicated thread + let mgr_ref = &mgr as *const crate::signal_mgr::SignalManager; + let handle = Box::new(SignalHandle { mgr }); + let raw = Box::into_raw(handle); + + // Get a reference for the recv thread + let recv_ref = unsafe { &(*raw).mgr }; + std::thread::Builder::new() + .name("wzp-signal-recv".into()) + .stack_size(4 * 1024 * 1024) + .spawn(move || { + recv_ref.run_recv_loop(); + }) + .ok(); + + raw as jlong + } + Err(e) => { + error!("signal connect failed: {e}"); + 0 + } + } +} + +/// Get signal state as JSON string. +#[unsafe(no_mangle)] +pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalGetState<'a>( mut env: JNIEnv<'a>, _class: JClass, handle: jlong, - relay_addr_j: JString, - seed_hex_j: JString, - token_j: JString, - alias_j: JString, +) -> jstring { + if handle == 0 { return JObject::null().into_raw(); } + let h = signal_ref(handle); + let json = h.mgr.get_state_json(); + env.new_string(&json) + .map(|s| s.into_raw()) + .unwrap_or(JObject::null().into_raw()) +} + +/// Place a direct call. +#[unsafe(no_mangle)] +pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalPlaceCall<'a>( + mut env: JNIEnv<'a>, + _class: JClass, + handle: jlong, + target_j: JString, ) -> jint { - // Extract JNI strings — this is all we do on the caller's thread - let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default(); - let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default(); - let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default(); - let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default(); - - // Use the existing start_call pattern — create engine, call start_signaling - // which spawns a thread internally. This is the same pattern that works for room calls. - let h = unsafe { handle_ref(handle) }; - - match h.engine.start_signaling( - &relay_addr, - &seed_hex, - if token.is_empty() { None } else { Some(&token) }, - if alias.is_empty() { None } else { Some(&alias) }, - ) { + if handle == 0 { return -1; } + let h = signal_ref(handle); + let target: String = env.get_string(&target_j).map(|s| s.into()).unwrap_or_default(); + match h.mgr.place_call(&target) { Ok(()) => 0, - Err(e) => { error!("start_signaling failed: {e}"); -1 } + Err(e) => { error!("place_call: {e}"); -1 } } } -/// Place a direct call to a target fingerprint. -/// Returns 0 on success, -1 on error. +/// Answer an incoming call. #[unsafe(no_mangle)] -pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativePlaceCall<'a>( - mut env: JNIEnv<'a>, - _class: JClass, - handle: jlong, - target_fp_j: JString, -) -> jint { - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - let h = unsafe { handle_ref(handle) }; - let target: String = env.get_string(&target_fp_j).map(|s| s.into()).unwrap_or_default(); - h.engine.place_call(&target) - })); - - match result { - Ok(Ok(())) => 0, - Ok(Err(e)) => { error!("place_call failed: {e}"); -1 } - Err(_) => { error!("place_call panicked"); -1 } - } -} - -/// Answer an incoming direct call. -/// mode: 0=Reject, 1=AcceptTrusted, 2=AcceptGeneric -#[unsafe(no_mangle)] -pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeAnswerCall<'a>( +pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalAnswerCall<'a>( mut env: JNIEnv<'a>, _class: JClass, handle: jlong, call_id_j: JString, mode: jint, ) -> jint { - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - let h = unsafe { handle_ref(handle) }; - let call_id: String = env.get_string(&call_id_j).map(|s| s.into()).unwrap_or_default(); - let accept_mode = match mode { - 0 => wzp_proto::CallAcceptMode::Reject, - 1 => wzp_proto::CallAcceptMode::AcceptTrusted, - _ => wzp_proto::CallAcceptMode::AcceptGeneric, - }; - h.engine.answer_call(&call_id, accept_mode) - })); - - match result { - Ok(Ok(())) => 0, - Ok(Err(e)) => { error!("answer_call failed: {e}"); -1 } - Err(_) => { error!("answer_call panicked"); -1 } + if handle == 0 { return -1; } + let h = signal_ref(handle); + let call_id: String = env.get_string(&call_id_j).map(|s| s.into()).unwrap_or_default(); + let accept_mode = match mode { + 0 => wzp_proto::CallAcceptMode::Reject, + 1 => wzp_proto::CallAcceptMode::AcceptTrusted, + _ => wzp_proto::CallAcceptMode::AcceptGeneric, + }; + match h.mgr.answer_call(&call_id, accept_mode) { + Ok(()) => 0, + Err(e) => { error!("answer_call: {e}"); -1 } } } + +/// Send hangup signal. +#[unsafe(no_mangle)] +pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalHangup( + _env: JNIEnv, + _class: JClass, + handle: jlong, +) { + if handle == 0 { return; } + let h = signal_ref(handle); + h.mgr.hangup(); +} + +/// Destroy the signal manager and free resources. +#[unsafe(no_mangle)] +pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalDestroy( + _env: JNIEnv, + _class: JClass, + handle: jlong, +) { + if handle == 0 { return; } + let h = signal_ref(handle); + h.mgr.stop(); + // Reclaim the Box + let _ = unsafe { Box::from_raw(handle as *mut SignalHandle) }; +} diff --git a/crates/wzp-android/src/lib.rs b/crates/wzp-android/src/lib.rs index 8c0d5df..7c2930e 100644 --- a/crates/wzp-android/src/lib.rs +++ b/crates/wzp-android/src/lib.rs @@ -14,5 +14,6 @@ pub mod audio_ring; pub mod commands; pub mod engine; pub mod pipeline; +pub mod signal_mgr; pub mod stats; pub mod jni_bridge; diff --git a/crates/wzp-android/src/signal_mgr.rs b/crates/wzp-android/src/signal_mgr.rs new file mode 100644 index 0000000..0d0135f --- /dev/null +++ b/crates/wzp-android/src/signal_mgr.rs @@ -0,0 +1,265 @@ +//! Persistent signal connection manager for direct 1:1 calls. +//! +//! Separate from the media engine — survives across calls. +//! Connects to relay via `_signal` SNI, registers presence, +//! and handles call signaling (offer/answer/setup/hangup). + +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use tracing::{error, info, warn}; +use wzp_proto::{MediaTransport, SignalMessage}; + +/// Signal connection status. +#[derive(Clone, Debug, Default, serde::Serialize)] +pub struct SignalState { + pub status: String, // "idle", "registered", "ringing", "incoming", "setup" + pub fingerprint: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub incoming_call_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub incoming_caller_fp: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub incoming_caller_alias: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub call_setup_relay: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub call_setup_room: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub call_setup_id: Option, +} + +/// Manages a persistent `_signal` QUIC connection to a relay. +pub struct SignalManager { + transport: Arc, + state: Arc>, + running: Arc, +} + +impl SignalManager { + /// Connect to relay, register presence, return immediately. + /// Call `run_recv_loop()` on a separate thread after this. + pub fn connect(relay_addr: &str, seed_hex: &str) -> Result { + let addr: SocketAddr = relay_addr.parse()?; + let seed = if seed_hex.is_empty() { + wzp_crypto::Seed::generate() + } else { + wzp_crypto::Seed::from_hex(seed_hex).map_err(|e| anyhow::anyhow!(e))? + }; + let identity = seed.derive_identity(); + let pub_id = identity.public_identity(); + let identity_pub = *pub_id.signing.as_bytes(); + let fp = pub_id.fingerprint.to_string(); + + info!(fingerprint = %fp, relay = %addr, "signal: connecting"); + + // Synchronous QUIC connect + register (runs on caller's thread) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let transport = rt.block_on(async { + let bind: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let endpoint = wzp_transport::create_endpoint(bind, None)?; + let client_cfg = wzp_transport::client_config(); + let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?; + let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); + + // Register presence + transport.send_signal(&SignalMessage::RegisterPresence { + identity_pub, + signature: vec![], + alias: None, + }).await?; + + match transport.recv_signal().await? { + Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => { + info!(fingerprint = %fp, "signal: registered"); + } + other => { + return Err(anyhow::anyhow!("registration failed: {other:?}")); + } + } + + Ok::<_, anyhow::Error>(transport) + })?; + + // Don't drop the runtime — we need it for the recv loop + // Store it... actually, we'll create a new one in run_recv_loop + drop(rt); + + let state = Arc::new(Mutex::new(SignalState { + status: "registered".into(), + fingerprint: fp, + ..Default::default() + })); + + Ok(Self { + transport, + state, + running: Arc::new(AtomicBool::new(true)), + }) + } + + /// Blocking signal recv loop. Run on a dedicated thread. + pub fn run_recv_loop(&self) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime for signal recv"); + + let transport = self.transport.clone(); + let state = self.state.clone(); + let running = self.running.clone(); + + rt.block_on(async move { + loop { + if !running.load(Ordering::Relaxed) { break; } + + match transport.recv_signal().await { + Ok(Some(SignalMessage::CallRinging { call_id })) => { + info!(call_id = %call_id, "signal: ringing"); + let mut s = state.lock().unwrap(); + s.status = "ringing".into(); + } + Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => { + info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call"); + let mut s = state.lock().unwrap(); + s.status = "incoming".into(); + s.incoming_call_id = Some(call_id); + s.incoming_caller_fp = Some(caller_fingerprint); + s.incoming_caller_alias = caller_alias; + } + Ok(Some(SignalMessage::DirectCallAnswer { call_id, accept_mode, .. })) => { + info!(call_id = %call_id, mode = ?accept_mode, "signal: call answered"); + } + Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => { + info!(call_id = %call_id, room = %room, relay = %relay_addr, "signal: call setup"); + let mut s = state.lock().unwrap(); + s.status = "setup".into(); + s.call_setup_relay = Some(relay_addr); + s.call_setup_room = Some(room); + s.call_setup_id = Some(call_id); + } + Ok(Some(SignalMessage::Hangup { reason })) => { + info!(reason = ?reason, "signal: hangup"); + let mut s = state.lock().unwrap(); + s.status = "registered".into(); + s.incoming_call_id = None; + s.incoming_caller_fp = None; + s.incoming_caller_alias = None; + s.call_setup_relay = None; + s.call_setup_room = None; + s.call_setup_id = None; + } + Ok(Some(_)) => {} + Ok(None) => { + info!("signal: connection closed"); + break; + } + Err(e) => { + error!("signal recv error: {e}"); + break; + } + } + } + + let mut s = state.lock().unwrap(); + s.status = "idle".into(); + }); + } + + /// Get current state (non-blocking). + pub fn get_state(&self) -> SignalState { + self.state.lock().unwrap().clone() + } + + /// Get state as JSON string. + pub fn get_state_json(&self) -> String { + serde_json::to_string(&self.get_state()).unwrap_or_else(|_| "{}".into()) + } + + /// Place a direct call. + pub fn place_call(&self, target_fp: &str) -> Result<(), anyhow::Error> { + let fp = self.state.lock().unwrap().fingerprint.clone(); + let target = target_fp.to_string(); + let call_id = format!("{:016x}", std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); + let transport = self.transport.clone(); + + // Send on a small thread (async send needs a runtime) + std::thread::Builder::new() + .name("wzp-call-send".into()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().expect("rt"); + rt.block_on(async { + let _ = transport.send_signal(&SignalMessage::DirectCallOffer { + caller_fingerprint: fp, + caller_alias: None, + target_fingerprint: target, + call_id, + identity_pub: [0u8; 32], + ephemeral_pub: [0u8; 32], + signature: vec![], + supported_profiles: vec![wzp_proto::QualityProfile::GOOD], + }).await; + }); + })?; + Ok(()) + } + + /// Answer an incoming call. + pub fn answer_call(&self, call_id: &str, mode: wzp_proto::CallAcceptMode) -> Result<(), anyhow::Error> { + let call_id = call_id.to_string(); + let transport = self.transport.clone(); + + std::thread::Builder::new() + .name("wzp-answer-send".into()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().expect("rt"); + rt.block_on(async { + let _ = transport.send_signal(&SignalMessage::DirectCallAnswer { + call_id, + accept_mode: mode, + identity_pub: None, + ephemeral_pub: None, + signature: None, + chosen_profile: Some(wzp_proto::QualityProfile::GOOD), + }).await; + }); + })?; + Ok(()) + } + + /// Send hangup. + pub fn hangup(&self) { + let transport = self.transport.clone(); + let state = self.state.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all().build().expect("rt"); + rt.block_on(async { + let _ = transport.send_signal(&SignalMessage::Hangup { + reason: wzp_proto::HangupReason::Normal, + }).await; + }); + let mut s = state.lock().unwrap(); + s.status = "registered".into(); + s.incoming_call_id = None; + s.incoming_caller_fp = None; + s.incoming_caller_alias = None; + s.call_setup_relay = None; + s.call_setup_room = None; + s.call_setup_id = None; + }); + } + + /// Stop the signal connection. + pub fn stop(&self) { + self.running.store(false, Ordering::Release); + self.transport.connection().close(0u32.into(), b"shutdown"); + } +}