refactor: separate SignalManager from WzpEngine for direct calling
Some checks failed
Mirror to GitHub / mirror (push) Failing after 40s
Build Release Binaries / build-amd64 (push) Failing after 3m40s

SignalManager (NEW):
- Dedicated Rust struct with its own QUIC connection to _signal
- Separate JNI handle (nativeSignalConnect/GetState/PlaceCall/etc)
- Kotlin wrapper polls state every 500ms via getState() JSON
- Lives independently of WzpEngine — survives across calls
- connect() blocks briefly on 8MB thread, then recv loop runs on dedicated thread

WzpEngine (CLEANED):
- Back to pure media-only role (audio, codec, FEC, jitter)
- Removed start_signaling/place_call/answer_call methods
- Removed signal_transport/signal_fingerprint from EngineState

CallViewModel:
- Two separate managers: signalManager (persistent) + engine (per-call)
- Two separate polling loops: signalPollJob + statsJob
- Auto-connect to media room when signal polling detects "setup" state
- hangupDirectCall() ends media but keeps signal alive

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-09 09:34:36 +04:00
parent 3a6ae61f8d
commit abc96e8887
7 changed files with 542 additions and 326 deletions

View File

@@ -0,0 +1,97 @@
package com.wzp.engine
import org.json.JSONObject
/**
* Persistent signal connection for direct 1:1 calls.
* Separate from WzpEngine — survives across calls.
*
* Lifecycle: connect() → [placeCall/answerCall] → destroy()
*/
class SignalManager {
private var handle: Long = 0L
val isConnected: Boolean get() = handle != 0L
/**
* Connect to relay and register for direct calls.
* MUST be called from a thread with sufficient stack (8MB).
* Blocks briefly during QUIC connect + register, then returns.
*/
fun connect(relay: String, seedHex: String): Boolean {
if (handle != 0L) return true // already connected
handle = nativeSignalConnect(relay, seedHex)
return handle != 0L
}
/** Get current signal state as parsed object. Non-blocking. */
fun getState(): SignalState {
if (handle == 0L) return SignalState()
val json = nativeSignalGetState(handle) ?: return SignalState()
return try {
val obj = JSONObject(json)
SignalState(
status = obj.optString("status", "idle"),
fingerprint = obj.optString("fingerprint", ""),
incomingCallId = if (obj.isNull("incoming_call_id")) null else obj.optString("incoming_call_id"),
incomingCallerFp = if (obj.isNull("incoming_caller_fp")) null else obj.optString("incoming_caller_fp"),
incomingCallerAlias = if (obj.isNull("incoming_caller_alias")) null else obj.optString("incoming_caller_alias"),
callSetupRelay = if (obj.isNull("call_setup_relay")) null else obj.optString("call_setup_relay"),
callSetupRoom = if (obj.isNull("call_setup_room")) null else obj.optString("call_setup_room"),
callSetupId = if (obj.isNull("call_setup_id")) null else obj.optString("call_setup_id"),
)
} catch (e: Exception) {
SignalState()
}
}
/** Place a direct call to a target fingerprint. */
fun placeCall(targetFp: String): Int {
if (handle == 0L) return -1
return nativeSignalPlaceCall(handle, targetFp)
}
/** Answer an incoming call. mode: 0=Reject, 1=AcceptTrusted, 2=AcceptGeneric */
fun answerCall(callId: String, mode: Int = 2): Int {
if (handle == 0L) return -1
return nativeSignalAnswerCall(handle, callId, mode)
}
/** Send hangup signal. */
fun hangup() {
if (handle != 0L) nativeSignalHangup(handle)
}
/** Destroy the signal manager. */
fun destroy() {
if (handle != 0L) {
nativeSignalDestroy(handle)
handle = 0L
}
}
// JNI native methods
private external fun nativeSignalConnect(relay: String, seed: String): Long
private external fun nativeSignalGetState(handle: Long): String?
private external fun nativeSignalPlaceCall(handle: Long, targetFp: String): Int
private external fun nativeSignalAnswerCall(handle: Long, callId: String, mode: Int): Int
private external fun nativeSignalHangup(handle: Long)
private external fun nativeSignalDestroy(handle: Long)
companion object {
init { System.loadLibrary("wzp_android") }
}
}
/** Signal connection state. */
data class SignalState(
val status: String = "idle",
val fingerprint: String = "",
val incomingCallId: String? = null,
val incomingCallerFp: String? = null,
val incomingCallerAlias: String? = null,
val callSetupRelay: String? = null,
val callSetupRoom: String? = null,
val callSetupId: String? = null,
)

View File

@@ -141,9 +141,9 @@ class CallViewModel : ViewModel(), WzpCallback {
private val _targetFingerprint = MutableStateFlow("")
val targetFingerprint: StateFlow<String> = _targetFingerprint.asStateFlow()
/** Signal connection state: 0=idle, 5=registered, 6=ringing, 7=incoming */
private val _signalState = MutableStateFlow(0)
val signalState: StateFlow<Int> = _signalState.asStateFlow()
/** Signal state string: "idle", "registered", "ringing", "incoming", "setup" */
private val _signalState = MutableStateFlow("idle")
val signalState: StateFlow<String> = _signalState.asStateFlow()
/** Incoming call info */
private val _incomingCallId = MutableStateFlow<String?>(null)
@@ -155,37 +155,65 @@ class CallViewModel : ViewModel(), WzpCallback {
private val _incomingCallerAlias = MutableStateFlow<String?>(null)
val incomingCallerAlias: StateFlow<String?> = _incomingCallerAlias.asStateFlow()
/** Separate signal manager (persistent, survives calls) */
private var signalManager: com.wzp.engine.SignalManager? = null
private var signalPollJob: Job? = null
fun setCallMode(mode: Int) { _callMode.value = mode }
fun setTargetFingerprint(fp: String) { _targetFingerprint.value = fp }
/** Register on relay for direct calls */
fun registerForCalls() {
if (engine == null) {
engine = WzpEngine(this).also { it.init() }
}
val serverIdx = _selectedServer.value
val serverList = _servers.value
if (serverIdx >= serverList.size) return
val relay = serverList[serverIdx].address
val seed = _seedHex.value
val alias = _alias.value
// Start stats polling BEFORE blocking — startSignaling blocks the thread forever
startStatsPolling()
// Use a Java Thread with 8MB stack — blocks forever in signal recv loop
val resolvedRelay = resolveToIp(relay) ?: relay
// Connect on a thread with 8MB stack (QUIC + TLS needs it)
Thread(null, {
val result = engine?.startSignaling(resolvedRelay, seed, "", alias)
// Only reached if signaling disconnects
val mgr = com.wzp.engine.SignalManager()
val ok = mgr.connect(resolvedRelay, seed)
viewModelScope.launch {
if (result != 0) {
_errorMessage.value = "Signal connection lost"
if (ok) {
signalManager = mgr
startSignalPolling()
} else {
_errorMessage.value = "Failed to register on relay"
}
_signalState.value = 0
}
}, "wzp-register", 8 * 1024 * 1024).start()
}, "wzp-signal-connect", 8 * 1024 * 1024).start()
}
/** Poll signal manager state every 500ms */
private fun startSignalPolling() {
signalPollJob?.cancel()
signalPollJob = viewModelScope.launch {
while (isActive) {
val mgr = signalManager
if (mgr != null && mgr.isConnected) {
val state = mgr.getState()
_signalState.value = state.status
_incomingCallId.value = state.incomingCallId
_incomingCallerFp.value = state.incomingCallerFp
_incomingCallerAlias.value = state.incomingCallerAlias
// Auto-connect to media room when call is set up
if (state.status == "setup" && state.callSetupRelay != null && state.callSetupRoom != null) {
Log.i(TAG, "CallSetup: connecting to ${state.callSetupRelay} room ${state.callSetupRoom}")
startCallInternal(state.callSetupRelay, state.callSetupRoom)
}
}
delay(500L)
}
}
}
private fun stopSignalPolling() {
signalPollJob?.cancel()
signalPollJob = null
}
/** Place a direct call to the target fingerprint */
@@ -195,24 +223,28 @@ class CallViewModel : ViewModel(), WzpCallback {
_errorMessage.value = "Enter a fingerprint to call"
return
}
engine?.placeCall(target)
_signalState.value = 6 // Ringing
signalManager?.placeCall(target)
}
/** Answer an incoming direct call */
fun answerIncomingCall(mode: Int = 2) {
val callId = _incomingCallId.value ?: return
engine?.answerCall(callId, mode)
signalManager?.answerCall(callId, mode)
}
/** Reject an incoming direct call */
fun rejectIncomingCall() {
val callId = _incomingCallId.value ?: return
engine?.answerCall(callId, 0) // 0 = Reject
_signalState.value = 5 // Back to registered
_incomingCallId.value = null
_incomingCallerFp.value = null
_incomingCallerAlias.value = null
signalManager?.answerCall(callId, 0)
}
/** Hang up direct call — media ends, signal stays alive */
fun hangupDirectCall() {
signalManager?.hangup()
engine?.stopCall()
engine?.destroy()
engine = null
engineInitialized = false
}
companion object {
@@ -690,30 +722,10 @@ class CallViewModel : ViewModel(), WzpCallback {
val s = CallStats.fromJson(json)
lastCallDuration = s.durationSecs
_stats.value = s
// Track signal state changes for direct calling
if (s.state in 5..7) {
_signalState.value = s.state
// Don't update callState for signal-only states
} else if (s.state != 0) {
// Only update callState from media engine stats (not signal)
if (s.state != 0) {
_callState.value = s.state
}
// Incoming call detection
if (s.state == 7) { // IncomingCall
_incomingCallId.value = s.incomingCallId
_incomingCallerFp.value = s.incomingCallerFp
_incomingCallerAlias.value = s.incomingCallerAlias
}
// CallSetup: auto-connect to media room
if (s.state == 1 && s.incomingCallId != null && s.incomingCallId.contains("|")) {
// Format: "relay_addr|room_name"
val parts = s.incomingCallId.split("|", limit = 2)
if (parts.size == 2) {
val mediaRelay = parts[0]
val mediaRoom = parts[1]
Log.i(TAG, "CallSetup: connecting to $mediaRelay room $mediaRoom")
startCallInternal(mediaRelay, mediaRoom)
}
}
if (s.state == 2 && !audioStarted) {
startAudio()
}

View File

@@ -219,7 +219,7 @@ fun InCallScreen(
// Mode toggle: Room vs Direct Call
val callMode by viewModel.callMode.collectAsState()
val signalState by viewModel.signalState.collectAsState()
val signalState by viewModel.signalState.collectAsState() // "idle"/"registered"/"ringing"/etc
val targetFp by viewModel.targetFingerprint.collectAsState()
val incomingCallId by viewModel.incomingCallId.collectAsState()
val incomingCallerFp by viewModel.incomingCallerFp.collectAsState()
@@ -309,7 +309,7 @@ fun InCallScreen(
}
} else {
// ── Direct call mode ──
if (signalState < 5) {
if (signalState == "idle") {
// Not registered yet
SectionLabel("ALIAS")
OutlinedTextField(
@@ -333,7 +333,7 @@ fun InCallScreen(
color = Color.White
)
}
} else if (signalState == 5) {
} else if (signalState == "registered" || signalState == "incoming") {
// Registered — show dial pad
Text(
"\u2705 Registered — waiting for calls",
@@ -403,8 +403,7 @@ fun InCallScreen(
color = Color.White
)
}
} else if (signalState == 6) {
// Ringing
} else if (signalState == "ringing") {
Text(
"\uD83D\uDD14 Ringing...",
color = Yellow,
@@ -412,11 +411,10 @@ fun InCallScreen(
textAlign = TextAlign.Center,
modifier = Modifier.fillMaxWidth()
)
} else if (signalState == 7) {
// Incoming call (state 7 also handled above in registered view)
} else if (signalState == "setup") {
Text(
"\uD83D\uDCDE Incoming call...",
color = Green,
"Connecting to call...",
color = Accent,
style = MaterialTheme.typography.titleMedium,
textAlign = TextAlign.Center,
modifier = Modifier.fillMaxWidth()

View File

@@ -97,10 +97,6 @@ pub(crate) struct EngineState {
/// QUIC transport handle — stored so stop_call() can close it immediately,
/// triggering relay-side leave + RoomUpdate broadcast.
pub quic_transport: Mutex<Option<Arc<wzp_transport::QuinnTransport>>>,
/// Signal transport for direct calling — stored so place_call/answer_call can send.
pub signal_transport: Mutex<Option<Arc<wzp_transport::QuinnTransport>>>,
/// Our fingerprint (set during signaling registration).
pub signal_fingerprint: Mutex<Option<String>>,
}
pub struct WzpEngine {
@@ -122,8 +118,6 @@ impl WzpEngine {
playout_ring: AudioRing::new(),
audio_level_rms: AtomicU32::new(0),
quic_transport: Mutex::new(None),
signal_transport: Mutex::new(None),
signal_fingerprint: Mutex::new(None),
});
Self {
state,
@@ -250,203 +244,7 @@ impl WzpEngine {
}
/// Start persistent signaling connection for direct calls.
/// Spawns a background task that maintains the `_signal` connection.
/// Start persistent signaling for direct calls.
/// Blocks the calling thread (Kotlin provides a Thread with 8MB stack).
/// Same pattern as start_call: tokio block_on on the caller's thread.
pub fn start_signaling(
&mut self,
relay_addr: &str,
seed_hex: &str,
token: Option<&str>,
alias: Option<&str>,
) -> Result<(), anyhow::Error> {
use wzp_proto::{MediaTransport, SignalMessage};
let addr: SocketAddr = relay_addr.parse()?;
let seed = if seed_hex.is_empty() {
wzp_crypto::Seed::generate()
} else {
wzp_crypto::Seed::from_hex(seed_hex).map_err(|e| anyhow::anyhow!(e))?
};
let identity = seed.derive_identity();
let pub_id = identity.public_identity();
let identity_pub = *pub_id.signing.as_bytes();
let fp = pub_id.fingerprint.to_string();
let token = token.map(|s| s.to_string());
let alias = alias.map(|s| s.to_string());
let state = self.state.clone();
info!(fingerprint = %fp, relay = %addr, "starting signaling");
self.state.running.store(true, Ordering::Release);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let signal_state = state.clone();
rt.block_on(async move {
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = match wzp_transport::create_endpoint(bind, None) {
Ok(e) => e,
Err(e) => { error!("signal endpoint: {e}"); return; }
};
let client_cfg = wzp_transport::client_config();
let conn = match wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await {
Ok(c) => c,
Err(e) => { error!("signal connect: {e}"); return; }
};
let transport = std::sync::Arc::new(wzp_transport::QuinnTransport::new(conn));
// Auth if token provided
if let Some(ref tok) = token {
let _ = transport.send_signal(&SignalMessage::AuthToken { token: tok.clone() }).await;
}
// Register presence
let _ = transport.send_signal(&SignalMessage::RegisterPresence {
identity_pub,
signature: vec![],
alias: alias.clone(),
}).await;
// Wait for ack
match transport.recv_signal().await {
Ok(Some(SignalMessage::RegisterPresenceAck { success: true, .. })) => {
info!(fingerprint = %fp, "signal: registered");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Registered;
drop(stats);
// Store transport + fingerprint so place_call/answer_call can use them
*signal_state.signal_transport.lock().unwrap() = Some(transport.clone());
*signal_state.signal_fingerprint.lock().unwrap() = Some(fp.clone());
}
other => {
error!("signal registration failed: {other:?}");
return;
}
}
// Signal recv loop
loop {
if !signal_state.running.load(Ordering::Relaxed) {
break;
}
match transport.recv_signal().await {
Ok(Some(SignalMessage::CallRinging { call_id })) => {
info!(call_id = %call_id, "signal: ringing");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Ringing;
}
Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => {
info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::IncomingCall;
stats.incoming_call_id = Some(call_id);
stats.incoming_caller_fp = Some(caller_fingerprint);
stats.incoming_caller_alias = caller_alias;
}
Ok(Some(SignalMessage::DirectCallAnswer { call_id, accept_mode, .. })) => {
info!(call_id = %call_id, mode = ?accept_mode, "signal: call answered");
}
Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => {
info!(call_id = %call_id, room = %room, relay = %relay_addr, "signal: call setup");
// Connect to media room via the existing start_call mechanism
// Store the room info so Kotlin can call startCall with it
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Connecting;
// Store call setup info for Kotlin to pick up
stats.incoming_call_id = Some(format!("{relay_addr}|{room}"));
}
Ok(Some(SignalMessage::Hangup { reason })) => {
info!(reason = ?reason, "signal: call ended by remote");
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Closed;
stats.incoming_call_id = None;
stats.incoming_caller_fp = None;
stats.incoming_caller_alias = None;
}
Ok(Some(_)) => {}
Ok(None) => {
info!("signal: connection closed");
break;
}
Err(e) => {
error!("signal recv error: {e}");
break;
}
}
}
let mut stats = signal_state.stats.lock().unwrap();
stats.state = crate::stats::CallState::Closed;
}); // block_on
Ok(())
}
/// Place a direct call to a target fingerprint via the signal transport.
pub fn place_call(&self, target_fingerprint: &str) -> Result<(), anyhow::Error> {
use wzp_proto::SignalMessage;
let transport = self.state.signal_transport.lock().unwrap().clone()
.ok_or_else(|| anyhow::anyhow!("not registered"))?;
let caller_fp = self.state.signal_fingerprint.lock().unwrap().clone()
.unwrap_or_default();
let target = target_fingerprint.to_string();
let call_id = format!("{:016x}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
// Send on a separate thread since we can't block the UI thread
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");
rt.block_on(async {
let _ = transport.send_signal(&SignalMessage::DirectCallOffer {
caller_fingerprint: caller_fp,
caller_alias: None,
target_fingerprint: target,
call_id,
identity_pub: [0u8; 32],
ephemeral_pub: [0u8; 32],
signature: vec![],
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
}).await;
});
});
Ok(())
}
/// Answer an incoming direct call via the signal transport.
pub fn answer_call(&self, call_id: &str, mode: wzp_proto::CallAcceptMode) -> Result<(), anyhow::Error> {
use wzp_proto::SignalMessage;
let transport = self.state.signal_transport.lock().unwrap().clone()
.ok_or_else(|| anyhow::anyhow!("not registered"))?;
let call_id = call_id.to_string();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");
rt.block_on(async {
let _ = transport.send_signal(&SignalMessage::DirectCallAnswer {
call_id,
accept_mode: mode,
identity_pub: None,
ephemeral_pub: None,
signature: None,
chosen_profile: Some(wzp_proto::QualityProfile::GOOD),
}).await;
});
});
Ok(())
}
// Signal methods (start_signaling, place_call, answer_call) moved to signal_mgr.rs
pub fn set_mute(&self, muted: bool) {
self.state.muted.store(muted, Ordering::Relaxed);

View File

@@ -390,87 +390,132 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeGetFingerprint
// ── Direct calling JNI functions ──
/// Start persistent signaling connection to relay for direct calls.
/// Returns 0 immediately — the actual work happens on a dedicated thread.
/// The JNI function MUST be minimal because Android's DefaultDispatch thread
/// has a tiny stack that overflows with any Rust crypto/network code.
// ── SignalManager JNI functions ──
/// Opaque handle for SignalManager (separate from EngineHandle).
struct SignalHandle {
mgr: crate::signal_mgr::SignalManager,
}
unsafe fn signal_ref(handle: jlong) -> &'static SignalHandle {
unsafe { &*(handle as *const SignalHandle) }
}
/// Connect to relay for signaling. Returns handle (jlong) or 0 on error.
/// MUST be called from a thread with sufficient stack (8MB).
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartSignaling<'a>(
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalConnect<'a>(
mut env: JNIEnv<'a>,
_class: JClass,
relay_j: JString,
seed_j: JString,
) -> jlong {
let relay: String = env.get_string(&relay_j).map(|s| s.into()).unwrap_or_default();
let seed: String = env.get_string(&seed_j).map(|s| s.into()).unwrap_or_default();
match crate::signal_mgr::SignalManager::connect(&relay, &seed) {
Ok(mgr) => {
// Spawn recv loop on a dedicated thread
let mgr_ref = &mgr as *const crate::signal_mgr::SignalManager;
let handle = Box::new(SignalHandle { mgr });
let raw = Box::into_raw(handle);
// Get a reference for the recv thread
let recv_ref = unsafe { &(*raw).mgr };
std::thread::Builder::new()
.name("wzp-signal-recv".into())
.stack_size(4 * 1024 * 1024)
.spawn(move || {
recv_ref.run_recv_loop();
})
.ok();
raw as jlong
}
Err(e) => {
error!("signal connect failed: {e}");
0
}
}
}
/// Get signal state as JSON string.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalGetState<'a>(
mut env: JNIEnv<'a>,
_class: JClass,
handle: jlong,
relay_addr_j: JString,
seed_hex_j: JString,
token_j: JString,
alias_j: JString,
) -> jstring {
if handle == 0 { return JObject::null().into_raw(); }
let h = signal_ref(handle);
let json = h.mgr.get_state_json();
env.new_string(&json)
.map(|s| s.into_raw())
.unwrap_or(JObject::null().into_raw())
}
/// Place a direct call.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalPlaceCall<'a>(
mut env: JNIEnv<'a>,
_class: JClass,
handle: jlong,
target_j: JString,
) -> jint {
// Extract JNI strings — this is all we do on the caller's thread
let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default();
let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default();
let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default();
let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default();
// Use the existing start_call pattern — create engine, call start_signaling
// which spawns a thread internally. This is the same pattern that works for room calls.
let h = unsafe { handle_ref(handle) };
match h.engine.start_signaling(
&relay_addr,
&seed_hex,
if token.is_empty() { None } else { Some(&token) },
if alias.is_empty() { None } else { Some(&alias) },
) {
if handle == 0 { return -1; }
let h = signal_ref(handle);
let target: String = env.get_string(&target_j).map(|s| s.into()).unwrap_or_default();
match h.mgr.place_call(&target) {
Ok(()) => 0,
Err(e) => { error!("start_signaling failed: {e}"); -1 }
Err(e) => { error!("place_call: {e}"); -1 }
}
}
/// Place a direct call to a target fingerprint.
/// Returns 0 on success, -1 on error.
/// Answer an incoming call.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativePlaceCall<'a>(
mut env: JNIEnv<'a>,
_class: JClass,
handle: jlong,
target_fp_j: JString,
) -> jint {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let h = unsafe { handle_ref(handle) };
let target: String = env.get_string(&target_fp_j).map(|s| s.into()).unwrap_or_default();
h.engine.place_call(&target)
}));
match result {
Ok(Ok(())) => 0,
Ok(Err(e)) => { error!("place_call failed: {e}"); -1 }
Err(_) => { error!("place_call panicked"); -1 }
}
}
/// Answer an incoming direct call.
/// mode: 0=Reject, 1=AcceptTrusted, 2=AcceptGeneric
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeAnswerCall<'a>(
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalAnswerCall<'a>(
mut env: JNIEnv<'a>,
_class: JClass,
handle: jlong,
call_id_j: JString,
mode: jint,
) -> jint {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let h = unsafe { handle_ref(handle) };
let call_id: String = env.get_string(&call_id_j).map(|s| s.into()).unwrap_or_default();
let accept_mode = match mode {
0 => wzp_proto::CallAcceptMode::Reject,
1 => wzp_proto::CallAcceptMode::AcceptTrusted,
_ => wzp_proto::CallAcceptMode::AcceptGeneric,
};
h.engine.answer_call(&call_id, accept_mode)
}));
match result {
Ok(Ok(())) => 0,
Ok(Err(e)) => { error!("answer_call failed: {e}"); -1 }
Err(_) => { error!("answer_call panicked"); -1 }
if handle == 0 { return -1; }
let h = signal_ref(handle);
let call_id: String = env.get_string(&call_id_j).map(|s| s.into()).unwrap_or_default();
let accept_mode = match mode {
0 => wzp_proto::CallAcceptMode::Reject,
1 => wzp_proto::CallAcceptMode::AcceptTrusted,
_ => wzp_proto::CallAcceptMode::AcceptGeneric,
};
match h.mgr.answer_call(&call_id, accept_mode) {
Ok(()) => 0,
Err(e) => { error!("answer_call: {e}"); -1 }
}
}
/// Send hangup signal.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalHangup(
_env: JNIEnv,
_class: JClass,
handle: jlong,
) {
if handle == 0 { return; }
let h = signal_ref(handle);
h.mgr.hangup();
}
/// Destroy the signal manager and free resources.
#[unsafe(no_mangle)]
pub unsafe extern "system" fn Java_com_wzp_engine_SignalManager_nativeSignalDestroy(
_env: JNIEnv,
_class: JClass,
handle: jlong,
) {
if handle == 0 { return; }
let h = signal_ref(handle);
h.mgr.stop();
// Reclaim the Box
let _ = unsafe { Box::from_raw(handle as *mut SignalHandle) };
}

View File

@@ -14,5 +14,6 @@ pub mod audio_ring;
pub mod commands;
pub mod engine;
pub mod pipeline;
pub mod signal_mgr;
pub mod stats;
pub mod jni_bridge;

View File

@@ -0,0 +1,265 @@
//! Persistent signal connection manager for direct 1:1 calls.
//!
//! Separate from the media engine — survives across calls.
//! Connects to relay via `_signal` SNI, registers presence,
//! and handles call signaling (offer/answer/setup/hangup).
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{error, info, warn};
use wzp_proto::{MediaTransport, SignalMessage};
/// Signal connection status.
#[derive(Clone, Debug, Default, serde::Serialize)]
pub struct SignalState {
pub status: String, // "idle", "registered", "ringing", "incoming", "setup"
pub fingerprint: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub incoming_call_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub incoming_caller_fp: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub incoming_caller_alias: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub call_setup_relay: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub call_setup_room: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub call_setup_id: Option<String>,
}
/// Manages a persistent `_signal` QUIC connection to a relay.
pub struct SignalManager {
transport: Arc<wzp_transport::QuinnTransport>,
state: Arc<Mutex<SignalState>>,
running: Arc<AtomicBool>,
}
impl SignalManager {
/// Connect to relay, register presence, return immediately.
/// Call `run_recv_loop()` on a separate thread after this.
pub fn connect(relay_addr: &str, seed_hex: &str) -> Result<Self, anyhow::Error> {
let addr: SocketAddr = relay_addr.parse()?;
let seed = if seed_hex.is_empty() {
wzp_crypto::Seed::generate()
} else {
wzp_crypto::Seed::from_hex(seed_hex).map_err(|e| anyhow::anyhow!(e))?
};
let identity = seed.derive_identity();
let pub_id = identity.public_identity();
let identity_pub = *pub_id.signing.as_bytes();
let fp = pub_id.fingerprint.to_string();
info!(fingerprint = %fp, relay = %addr, "signal: connecting");
// Synchronous QUIC connect + register (runs on caller's thread)
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let transport = rt.block_on(async {
let bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
let endpoint = wzp_transport::create_endpoint(bind, None)?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, addr, "_signal", client_cfg).await?;
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
// Register presence
transport.send_signal(&SignalMessage::RegisterPresence {
identity_pub,
signature: vec![],
alias: None,
}).await?;
match transport.recv_signal().await? {
Some(SignalMessage::RegisterPresenceAck { success: true, .. }) => {
info!(fingerprint = %fp, "signal: registered");
}
other => {
return Err(anyhow::anyhow!("registration failed: {other:?}"));
}
}
Ok::<_, anyhow::Error>(transport)
})?;
// Don't drop the runtime — we need it for the recv loop
// Store it... actually, we'll create a new one in run_recv_loop
drop(rt);
let state = Arc::new(Mutex::new(SignalState {
status: "registered".into(),
fingerprint: fp,
..Default::default()
}));
Ok(Self {
transport,
state,
running: Arc::new(AtomicBool::new(true)),
})
}
/// Blocking signal recv loop. Run on a dedicated thread.
pub fn run_recv_loop(&self) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime for signal recv");
let transport = self.transport.clone();
let state = self.state.clone();
let running = self.running.clone();
rt.block_on(async move {
loop {
if !running.load(Ordering::Relaxed) { break; }
match transport.recv_signal().await {
Ok(Some(SignalMessage::CallRinging { call_id })) => {
info!(call_id = %call_id, "signal: ringing");
let mut s = state.lock().unwrap();
s.status = "ringing".into();
}
Ok(Some(SignalMessage::DirectCallOffer { caller_fingerprint, caller_alias, call_id, .. })) => {
info!(from = %caller_fingerprint, call_id = %call_id, "signal: incoming call");
let mut s = state.lock().unwrap();
s.status = "incoming".into();
s.incoming_call_id = Some(call_id);
s.incoming_caller_fp = Some(caller_fingerprint);
s.incoming_caller_alias = caller_alias;
}
Ok(Some(SignalMessage::DirectCallAnswer { call_id, accept_mode, .. })) => {
info!(call_id = %call_id, mode = ?accept_mode, "signal: call answered");
}
Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => {
info!(call_id = %call_id, room = %room, relay = %relay_addr, "signal: call setup");
let mut s = state.lock().unwrap();
s.status = "setup".into();
s.call_setup_relay = Some(relay_addr);
s.call_setup_room = Some(room);
s.call_setup_id = Some(call_id);
}
Ok(Some(SignalMessage::Hangup { reason })) => {
info!(reason = ?reason, "signal: hangup");
let mut s = state.lock().unwrap();
s.status = "registered".into();
s.incoming_call_id = None;
s.incoming_caller_fp = None;
s.incoming_caller_alias = None;
s.call_setup_relay = None;
s.call_setup_room = None;
s.call_setup_id = None;
}
Ok(Some(_)) => {}
Ok(None) => {
info!("signal: connection closed");
break;
}
Err(e) => {
error!("signal recv error: {e}");
break;
}
}
}
let mut s = state.lock().unwrap();
s.status = "idle".into();
});
}
/// Get current state (non-blocking).
pub fn get_state(&self) -> SignalState {
self.state.lock().unwrap().clone()
}
/// Get state as JSON string.
pub fn get_state_json(&self) -> String {
serde_json::to_string(&self.get_state()).unwrap_or_else(|_| "{}".into())
}
/// Place a direct call.
pub fn place_call(&self, target_fp: &str) -> Result<(), anyhow::Error> {
let fp = self.state.lock().unwrap().fingerprint.clone();
let target = target_fp.to_string();
let call_id = format!("{:016x}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
let transport = self.transport.clone();
// Send on a small thread (async send needs a runtime)
std::thread::Builder::new()
.name("wzp-call-send".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().expect("rt");
rt.block_on(async {
let _ = transport.send_signal(&SignalMessage::DirectCallOffer {
caller_fingerprint: fp,
caller_alias: None,
target_fingerprint: target,
call_id,
identity_pub: [0u8; 32],
ephemeral_pub: [0u8; 32],
signature: vec![],
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
}).await;
});
})?;
Ok(())
}
/// Answer an incoming call.
pub fn answer_call(&self, call_id: &str, mode: wzp_proto::CallAcceptMode) -> Result<(), anyhow::Error> {
let call_id = call_id.to_string();
let transport = self.transport.clone();
std::thread::Builder::new()
.name("wzp-answer-send".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().expect("rt");
rt.block_on(async {
let _ = transport.send_signal(&SignalMessage::DirectCallAnswer {
call_id,
accept_mode: mode,
identity_pub: None,
ephemeral_pub: None,
signature: None,
chosen_profile: Some(wzp_proto::QualityProfile::GOOD),
}).await;
});
})?;
Ok(())
}
/// Send hangup.
pub fn hangup(&self) {
let transport = self.transport.clone();
let state = self.state.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().expect("rt");
rt.block_on(async {
let _ = transport.send_signal(&SignalMessage::Hangup {
reason: wzp_proto::HangupReason::Normal,
}).await;
});
let mut s = state.lock().unwrap();
s.status = "registered".into();
s.incoming_call_id = None;
s.incoming_caller_fp = None;
s.incoming_caller_alias = None;
s.call_setup_relay = None;
s.call_setup_room = None;
s.call_setup_id = None;
});
}
/// Stop the signal connection.
pub fn stop(&self) {
self.running.store(false, Ordering::Release);
self.transport.connection().close(0u32.into(), b"shutdown");
}
}