fix: eliminate all native thread creation — run everything single-threaded
pthread_create crashes on Android due to static bionic __init_tcb stubs in the Rust std prebuilt rlibs. This is unfixable without rebuilding std. Solution: run the entire call (QUIC connect, handshake, media send/recv) on a single tokio current_thread runtime. The JNI startCall() now blocks, so Kotlin dispatches it to Dispatchers.IO (JVM thread, not pthread). Audio pipeline temporarily simplified to silence frames — will restore once threading is solved (either via Java Thread or rebuilding std). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -56,12 +56,22 @@ class CallViewModel : ViewModel(), WzpCallback {
|
|||||||
engineInitialized = true
|
engineInitialized = true
|
||||||
}
|
}
|
||||||
_callState.value = 1 // Connecting
|
_callState.value = 1 // Connecting
|
||||||
val result = engine?.startCall(relayAddr, room) ?: -1
|
startStatsPolling()
|
||||||
if (result == 0) {
|
|
||||||
startStatsPolling()
|
// startCall blocks (runs tokio on calling thread), so dispatch
|
||||||
} else {
|
// to a background coroutine. Using Dispatchers.IO which uses
|
||||||
_callState.value = 0
|
// Java threads (not native pthread_create).
|
||||||
_errorMessage.value = "Failed to start call (code $result)"
|
viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) {
|
||||||
|
try {
|
||||||
|
val result = engine?.startCall(relayAddr, room) ?: -1
|
||||||
|
if (result != 0) {
|
||||||
|
_callState.value = 0
|
||||||
|
_errorMessage.value = "Failed to start call (code $result)"
|
||||||
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
_callState.value = 0
|
||||||
|
_errorMessage.value = "Engine error: ${e.message}"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
_callState.value = 0
|
_callState.value = 0
|
||||||
|
|||||||
@@ -1,10 +1,9 @@
|
|||||||
//! Engine orchestrator — manages the call lifecycle.
|
//! Engine orchestrator — manages the call lifecycle.
|
||||||
//!
|
//!
|
||||||
//! The engine owns:
|
//! IMPORTANT: On Android, pthread_create crashes in shared libraries due to
|
||||||
//! - The Oboe audio backend (start/stop)
|
//! static bionic stubs in the Rust std prebuilt rlibs. ALL work must happen
|
||||||
//! - A codec thread running the `Pipeline`
|
//! on the JNI calling thread or via the tokio current_thread runtime.
|
||||||
//! - A tokio runtime for async network I/O
|
//! No std::thread::spawn or tokio multi_thread allowed.
|
||||||
//! - Command channel for control from the JNI/UI thread
|
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering};
|
||||||
@@ -12,28 +11,21 @@ use std::sync::{Arc, Mutex};
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info};
|
||||||
use wzp_crypto::{KeyExchange, WarzoneKeyExchange};
|
use wzp_crypto::{KeyExchange, WarzoneKeyExchange};
|
||||||
use wzp_proto::{
|
use wzp_proto::{
|
||||||
CodecId, MediaHeader, MediaPacket, MediaTransport, QualityProfile, SignalMessage,
|
CodecId, MediaHeader, MediaPacket, MediaTransport, QualityProfile, SignalMessage,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::audio_android::{OboeBackend, FRAME_SAMPLES};
|
|
||||||
use crate::commands::EngineCommand;
|
use crate::commands::EngineCommand;
|
||||||
use crate::pipeline::Pipeline;
|
|
||||||
use crate::stats::{CallState, CallStats};
|
use crate::stats::{CallState, CallStats};
|
||||||
|
|
||||||
/// Configuration to start a call.
|
/// Configuration to start a call.
|
||||||
pub struct CallStartConfig {
|
pub struct CallStartConfig {
|
||||||
/// Initial quality profile.
|
|
||||||
pub profile: QualityProfile,
|
pub profile: QualityProfile,
|
||||||
/// Relay server address (host:port).
|
|
||||||
pub relay_addr: String,
|
pub relay_addr: String,
|
||||||
/// Room name (passed as SNI).
|
|
||||||
pub room: String,
|
pub room: String,
|
||||||
/// Authentication token for the relay.
|
|
||||||
pub auth_token: Vec<u8>,
|
pub auth_token: Vec<u8>,
|
||||||
/// 32-byte identity seed for key derivation.
|
|
||||||
pub identity_seed: [u8; 32],
|
pub identity_seed: [u8; 32],
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,23 +41,16 @@ impl Default for CallStartConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shared state between the engine owner and background threads.
|
|
||||||
struct EngineState {
|
struct EngineState {
|
||||||
running: AtomicBool,
|
running: AtomicBool,
|
||||||
connected: AtomicBool,
|
|
||||||
muted: AtomicBool,
|
muted: AtomicBool,
|
||||||
speaker: AtomicBool,
|
|
||||||
aec_enabled: AtomicBool,
|
|
||||||
agc_enabled: AtomicBool,
|
|
||||||
stats: Mutex<CallStats>,
|
stats: Mutex<CallStats>,
|
||||||
command_tx: std::sync::mpsc::Sender<EngineCommand>,
|
command_tx: std::sync::mpsc::Sender<EngineCommand>,
|
||||||
command_rx: Mutex<Option<std::sync::mpsc::Receiver<EngineCommand>>>,
|
command_rx: Mutex<Option<std::sync::mpsc::Receiver<EngineCommand>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The WarzonePhone Android engine.
|
|
||||||
pub struct WzpEngine {
|
pub struct WzpEngine {
|
||||||
state: Arc<EngineState>,
|
state: Arc<EngineState>,
|
||||||
codec_thread: Option<std::thread::JoinHandle<()>>,
|
|
||||||
tokio_runtime: Option<tokio::runtime::Runtime>,
|
tokio_runtime: Option<tokio::runtime::Runtime>,
|
||||||
call_start: Option<Instant>,
|
call_start: Option<Instant>,
|
||||||
}
|
}
|
||||||
@@ -75,19 +60,13 @@ impl WzpEngine {
|
|||||||
let (tx, rx) = std::sync::mpsc::channel();
|
let (tx, rx) = std::sync::mpsc::channel();
|
||||||
let state = Arc::new(EngineState {
|
let state = Arc::new(EngineState {
|
||||||
running: AtomicBool::new(false),
|
running: AtomicBool::new(false),
|
||||||
connected: AtomicBool::new(false),
|
|
||||||
muted: AtomicBool::new(false),
|
muted: AtomicBool::new(false),
|
||||||
speaker: AtomicBool::new(false),
|
|
||||||
aec_enabled: AtomicBool::new(true),
|
|
||||||
agc_enabled: AtomicBool::new(true),
|
|
||||||
stats: Mutex::new(CallStats::default()),
|
stats: Mutex::new(CallStats::default()),
|
||||||
command_tx: tx,
|
command_tx: tx,
|
||||||
command_rx: Mutex::new(Some(rx)),
|
command_rx: Mutex::new(Some(rx)),
|
||||||
});
|
});
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
state,
|
state,
|
||||||
codec_thread: None,
|
|
||||||
tokio_runtime: None,
|
tokio_runtime: None,
|
||||||
call_start: None,
|
call_start: None,
|
||||||
}
|
}
|
||||||
@@ -106,347 +85,62 @@ impl WzpEngine {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create tokio runtime — use current_thread to avoid pthread_create
|
// Create single-threaded tokio runtime — NO thread spawning.
|
||||||
// issues on Android (SEGV_ACCERR in __init_tcb with multi_thread).
|
// On Android, pthread_create crashes due to static bionic stubs.
|
||||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
.thread_name("wzp-net")
|
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
// Channels between codec thread and network tasks
|
|
||||||
let (send_tx, mut send_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(64);
|
|
||||||
let (recv_tx, recv_rx) = tokio::sync::mpsc::channel::<MediaPacket>(64);
|
|
||||||
|
|
||||||
// Shared sequence counter for outgoing packets
|
|
||||||
let seq_counter = Arc::new(AtomicU16::new(0));
|
|
||||||
let ts_counter = Arc::new(AtomicU32::new(0));
|
|
||||||
|
|
||||||
// Parse relay address
|
|
||||||
let relay_addr: SocketAddr = config.relay_addr.parse().map_err(|e| {
|
let relay_addr: SocketAddr = config.relay_addr.parse().map_err(|e| {
|
||||||
anyhow::anyhow!("invalid relay address '{}': {e}", config.relay_addr)
|
anyhow::anyhow!("invalid relay address '{}': {e}", config.relay_addr)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let room = config.room.clone();
|
let room = config.room.clone();
|
||||||
let identity_seed = config.identity_seed;
|
let identity_seed = config.identity_seed;
|
||||||
let state_net = self.state.clone();
|
let state = self.state.clone();
|
||||||
let seq_c = seq_counter.clone();
|
|
||||||
let ts_c = ts_counter.clone();
|
|
||||||
|
|
||||||
// Spawn the combined network task (connect + handshake + send/recv)
|
self.state.running.store(true, Ordering::Release);
|
||||||
runtime.spawn(async move {
|
self.call_start = Some(Instant::now());
|
||||||
// Install rustls crypto provider
|
|
||||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
|
||||||
|
|
||||||
// Create QUIC endpoint
|
// Run the entire call on the current thread's tokio runtime.
|
||||||
let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
// This blocks the JNI thread until the call ends, so Kotlin
|
||||||
let endpoint = match wzp_transport::create_endpoint(bind_addr, None) {
|
// must call startCall from a background coroutine.
|
||||||
Ok(ep) => ep,
|
let state_clone = state.clone();
|
||||||
Err(e) => {
|
runtime.block_on(async move {
|
||||||
error!("failed to create QUIC endpoint: {e}");
|
if let Err(e) = run_call(relay_addr, &room, &identity_seed, state_clone).await {
|
||||||
return;
|
error!("call failed: {e}");
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Connect to relay with room as SNI
|
|
||||||
let sni = if room.is_empty() { "android" } else { &room };
|
|
||||||
info!(%relay_addr, sni, "connecting to relay...");
|
|
||||||
let client_cfg = wzp_transport::client_config();
|
|
||||||
let conn = match wzp_transport::connect(&endpoint, relay_addr, sni, client_cfg).await {
|
|
||||||
Ok(c) => c,
|
|
||||||
Err(e) => {
|
|
||||||
error!("QUIC connect failed: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
info!("QUIC connected to relay");
|
|
||||||
|
|
||||||
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
|
||||||
|
|
||||||
// Crypto handshake: send CallOffer, receive CallAnswer
|
|
||||||
let mut kx = WarzoneKeyExchange::from_identity_seed(&identity_seed);
|
|
||||||
let ephemeral_pub = kx.generate_ephemeral();
|
|
||||||
let identity_pub = kx.identity_public_key();
|
|
||||||
|
|
||||||
// Sign (ephemeral_pub || "call-offer")
|
|
||||||
let mut sign_data = Vec::with_capacity(32 + 10);
|
|
||||||
sign_data.extend_from_slice(&ephemeral_pub);
|
|
||||||
sign_data.extend_from_slice(b"call-offer");
|
|
||||||
let signature = kx.sign(&sign_data);
|
|
||||||
|
|
||||||
let offer = SignalMessage::CallOffer {
|
|
||||||
identity_pub,
|
|
||||||
ephemeral_pub,
|
|
||||||
signature,
|
|
||||||
supported_profiles: vec![
|
|
||||||
QualityProfile::GOOD,
|
|
||||||
QualityProfile::DEGRADED,
|
|
||||||
QualityProfile::CATASTROPHIC,
|
|
||||||
],
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = transport.send_signal(&offer).await {
|
|
||||||
error!("failed to send CallOffer: {e}");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
info!("CallOffer sent, waiting for CallAnswer...");
|
|
||||||
|
|
||||||
// Receive CallAnswer
|
|
||||||
let answer = match transport.recv_signal().await {
|
|
||||||
Ok(Some(msg)) => msg,
|
|
||||||
Ok(None) => {
|
|
||||||
error!("connection closed before CallAnswer");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("failed to receive CallAnswer: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (relay_ephemeral_pub, _chosen_profile) = match answer {
|
|
||||||
SignalMessage::CallAnswer {
|
|
||||||
ephemeral_pub,
|
|
||||||
chosen_profile,
|
|
||||||
..
|
|
||||||
} => (ephemeral_pub, chosen_profile),
|
|
||||||
other => {
|
|
||||||
error!("expected CallAnswer, got {:?}", std::mem::discriminant(&other));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Derive crypto session (not encrypting media yet for simplicity)
|
|
||||||
let _session = match kx.derive_session(&relay_ephemeral_pub) {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
error!("session derivation failed: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("handshake complete, call active");
|
|
||||||
state_net.connected.store(true, Ordering::Release);
|
|
||||||
{
|
|
||||||
let mut stats = state_net.stats.lock().unwrap();
|
|
||||||
stats.state = CallState::Active;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn recv task
|
|
||||||
let recv_transport = transport.clone();
|
|
||||||
let recv_handle = tokio::spawn(async move {
|
|
||||||
loop {
|
|
||||||
match recv_transport.recv_media().await {
|
|
||||||
Ok(Some(pkt)) => {
|
|
||||||
if recv_tx.send(pkt).await.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
info!("relay disconnected (recv)");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("recv_media error: {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Send task runs in this task
|
|
||||||
while let Some(encoded) = send_rx.recv().await {
|
|
||||||
let seq = seq_c.fetch_add(1, Ordering::Relaxed);
|
|
||||||
let ts = ts_c.fetch_add(20, Ordering::Relaxed);
|
|
||||||
let packet = MediaPacket {
|
|
||||||
header: MediaHeader {
|
|
||||||
version: 0,
|
|
||||||
is_repair: false,
|
|
||||||
codec_id: CodecId::Opus24k,
|
|
||||||
has_quality_report: false,
|
|
||||||
fec_ratio_encoded: 0,
|
|
||||||
seq,
|
|
||||||
timestamp: ts,
|
|
||||||
fec_block: 0,
|
|
||||||
fec_symbol: 0,
|
|
||||||
reserved: 0,
|
|
||||||
csrc_count: 0,
|
|
||||||
},
|
|
||||||
payload: Bytes::from(encoded),
|
|
||||||
quality_report: None,
|
|
||||||
};
|
|
||||||
if let Err(e) = transport.send_media(&packet).await {
|
|
||||||
error!("send_media error: {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
recv_handle.abort();
|
|
||||||
transport.close().await.ok();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Take the command receiver
|
state.running.store(false, Ordering::Release);
|
||||||
let command_rx = self
|
{
|
||||||
.state
|
let mut stats = state.stats.lock().unwrap();
|
||||||
.command_rx
|
stats.state = CallState::Closed;
|
||||||
.lock()
|
}
|
||||||
.unwrap()
|
|
||||||
.take()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("command receiver already taken"))?;
|
|
||||||
|
|
||||||
// Start the codec thread
|
|
||||||
let state = self.state.clone();
|
|
||||||
let profile = config.profile;
|
|
||||||
let codec_thread = std::thread::Builder::new()
|
|
||||||
.name("wzp-codec".into())
|
|
||||||
.spawn(move || {
|
|
||||||
crate::audio_android::pin_to_big_core();
|
|
||||||
crate::audio_android::set_realtime_priority();
|
|
||||||
|
|
||||||
let mut audio = OboeBackend::new();
|
|
||||||
if let Err(e) = audio.start() {
|
|
||||||
error!("failed to start audio: {e}");
|
|
||||||
state.running.store(false, Ordering::Release);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut pipeline = match Pipeline::new(profile) {
|
|
||||||
Ok(p) => p,
|
|
||||||
Err(e) => {
|
|
||||||
error!("failed to create pipeline: {e}");
|
|
||||||
audio.stop();
|
|
||||||
state.running.store(false, Ordering::Release);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
state.running.store(true, Ordering::Release);
|
|
||||||
|
|
||||||
let mut prev_aec = true;
|
|
||||||
let mut prev_agc = true;
|
|
||||||
let mut capture_buf = vec![0i16; FRAME_SAMPLES];
|
|
||||||
let frame_duration = std::time::Duration::from_millis(20);
|
|
||||||
let mut recv_rx = recv_rx;
|
|
||||||
|
|
||||||
while state.running.load(Ordering::Relaxed) {
|
|
||||||
let loop_start = Instant::now();
|
|
||||||
|
|
||||||
// Process commands
|
|
||||||
while let Ok(cmd) = command_rx.try_recv() {
|
|
||||||
match cmd {
|
|
||||||
EngineCommand::SetMute(m) => {
|
|
||||||
state.muted.store(m, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
EngineCommand::SetSpeaker(s) => {
|
|
||||||
state.speaker.store(s, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
EngineCommand::ForceProfile(p) => {
|
|
||||||
pipeline.force_profile(p);
|
|
||||||
}
|
|
||||||
EngineCommand::Stop => {
|
|
||||||
state.running.store(false, Ordering::Release);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync AEC/AGC
|
|
||||||
let cur_aec = state.aec_enabled.load(Ordering::Relaxed);
|
|
||||||
if cur_aec != prev_aec {
|
|
||||||
pipeline.set_aec_enabled(cur_aec);
|
|
||||||
prev_aec = cur_aec;
|
|
||||||
}
|
|
||||||
let cur_agc = state.agc_enabled.load(Ordering::Relaxed);
|
|
||||||
if cur_agc != prev_agc {
|
|
||||||
pipeline.set_agc_enabled(cur_agc);
|
|
||||||
prev_agc = cur_agc;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !state.running.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Capture → Encode → Send ---
|
|
||||||
let captured = audio.read_capture(&mut capture_buf);
|
|
||||||
if captured >= FRAME_SAMPLES {
|
|
||||||
let muted = state.muted.load(Ordering::Relaxed);
|
|
||||||
if let Some(encoded) = pipeline.encode_frame(&capture_buf, muted) {
|
|
||||||
let _ = send_tx.try_send(encoded);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Recv → Decode → Playout ---
|
|
||||||
while let Ok(pkt) = recv_rx.try_recv() {
|
|
||||||
pipeline.feed_packet(pkt);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(pcm) = pipeline.decode_frame() {
|
|
||||||
audio.write_playout(&pcm);
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Update stats ---
|
|
||||||
{
|
|
||||||
let pstats = pipeline.stats();
|
|
||||||
let mut stats = state.stats.lock().unwrap();
|
|
||||||
stats.frames_encoded = pstats.frames_encoded;
|
|
||||||
stats.frames_decoded = pstats.frames_decoded;
|
|
||||||
stats.underruns = pstats.underruns;
|
|
||||||
stats.jitter_buffer_depth = pstats.jitter_depth;
|
|
||||||
stats.quality_tier = pstats.quality_tier;
|
|
||||||
}
|
|
||||||
|
|
||||||
let elapsed = loop_start.elapsed();
|
|
||||||
if elapsed < frame_duration {
|
|
||||||
std::thread::sleep(frame_duration - elapsed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
audio.stop();
|
|
||||||
{
|
|
||||||
let mut stats = state.stats.lock().unwrap();
|
|
||||||
stats.state = CallState::Closed;
|
|
||||||
}
|
|
||||||
})?;
|
|
||||||
|
|
||||||
self.codec_thread = Some(codec_thread);
|
|
||||||
self.tokio_runtime = Some(runtime);
|
self.tokio_runtime = Some(runtime);
|
||||||
self.call_start = Some(Instant::now());
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop_call(&mut self) {
|
pub fn stop_call(&mut self) {
|
||||||
if !self.state.running.load(Ordering::Acquire) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
self.state.running.store(false, Ordering::Release);
|
self.state.running.store(false, Ordering::Release);
|
||||||
let _ = self.state.command_tx.send(EngineCommand::Stop);
|
let _ = self.state.command_tx.send(EngineCommand::Stop);
|
||||||
|
|
||||||
if let Some(handle) = self.codec_thread.take() {
|
|
||||||
let _ = handle.join();
|
|
||||||
}
|
|
||||||
if let Some(rt) = self.tokio_runtime.take() {
|
if let Some(rt) = self.tokio_runtime.take() {
|
||||||
rt.shutdown_timeout(std::time::Duration::from_secs(2));
|
rt.shutdown_background();
|
||||||
}
|
}
|
||||||
self.call_start = None;
|
self.call_start = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_mute(&self, muted: bool) {
|
pub fn set_mute(&self, muted: bool) {
|
||||||
let _ = self.state.command_tx.send(EngineCommand::SetMute(muted));
|
self.state.muted.store(muted, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_speaker(&self, enabled: bool) {
|
pub fn set_speaker(&self, _enabled: bool) {
|
||||||
let _ = self.state.command_tx.send(EngineCommand::SetSpeaker(enabled));
|
// TODO: route audio via AudioManager on Kotlin side
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_aec_enabled(&self, enabled: bool) {
|
pub fn force_profile(&self, _profile: QualityProfile) {
|
||||||
self.state.aec_enabled.store(enabled, Ordering::Relaxed);
|
// TODO: wire to pipeline when codec thread is re-enabled
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_agc_enabled(&self, enabled: bool) {
|
|
||||||
self.state.agc_enabled.store(enabled, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn force_profile(&self, profile: QualityProfile) {
|
|
||||||
let _ = self.state.command_tx.send(EngineCommand::ForceProfile(profile));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_stats(&self) -> CallStats {
|
pub fn get_stats(&self) -> CallStats {
|
||||||
@@ -471,3 +165,162 @@ impl Drop for WzpEngine {
|
|||||||
self.stop_call();
|
self.stop_call();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run the full call lifecycle: connect, handshake, send/recv media.
|
||||||
|
/// All async, no thread spawning.
|
||||||
|
async fn run_call(
|
||||||
|
relay_addr: SocketAddr,
|
||||||
|
room: &str,
|
||||||
|
identity_seed: &[u8; 32],
|
||||||
|
state: Arc<EngineState>,
|
||||||
|
) -> Result<(), anyhow::Error> {
|
||||||
|
// Install rustls crypto provider
|
||||||
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
|
// Create QUIC endpoint
|
||||||
|
let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let endpoint = wzp_transport::create_endpoint(bind_addr, None)?;
|
||||||
|
|
||||||
|
// Connect to relay with room as SNI
|
||||||
|
let sni = if room.is_empty() { "android" } else { room };
|
||||||
|
info!(%relay_addr, sni, "connecting to relay...");
|
||||||
|
let client_cfg = wzp_transport::client_config();
|
||||||
|
let conn = wzp_transport::connect(&endpoint, relay_addr, sni, client_cfg).await?;
|
||||||
|
info!("QUIC connected to relay");
|
||||||
|
|
||||||
|
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
||||||
|
|
||||||
|
// Crypto handshake
|
||||||
|
let mut kx = WarzoneKeyExchange::from_identity_seed(identity_seed);
|
||||||
|
let ephemeral_pub = kx.generate_ephemeral();
|
||||||
|
let identity_pub = kx.identity_public_key();
|
||||||
|
|
||||||
|
let mut sign_data = Vec::with_capacity(42);
|
||||||
|
sign_data.extend_from_slice(&ephemeral_pub);
|
||||||
|
sign_data.extend_from_slice(b"call-offer");
|
||||||
|
let signature = kx.sign(&sign_data);
|
||||||
|
|
||||||
|
let offer = SignalMessage::CallOffer {
|
||||||
|
identity_pub,
|
||||||
|
ephemeral_pub,
|
||||||
|
signature,
|
||||||
|
supported_profiles: vec![
|
||||||
|
QualityProfile::GOOD,
|
||||||
|
QualityProfile::DEGRADED,
|
||||||
|
QualityProfile::CATASTROPHIC,
|
||||||
|
],
|
||||||
|
};
|
||||||
|
transport.send_signal(&offer).await?;
|
||||||
|
info!("CallOffer sent, waiting for CallAnswer...");
|
||||||
|
|
||||||
|
let answer = transport
|
||||||
|
.recv_signal()
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("connection closed before CallAnswer"))?;
|
||||||
|
|
||||||
|
let relay_ephemeral_pub = match answer {
|
||||||
|
SignalMessage::CallAnswer { ephemeral_pub, .. } => ephemeral_pub,
|
||||||
|
other => {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"expected CallAnswer, got {:?}",
|
||||||
|
std::mem::discriminant(&other)
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let _session = kx.derive_session(&relay_ephemeral_pub)?;
|
||||||
|
info!("handshake complete, call active");
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut stats = state.stats.lock().unwrap();
|
||||||
|
stats.state = CallState::Active;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simple media loop: send silence, recv and count frames.
|
||||||
|
// No codec thread, no Oboe — just network I/O to verify connectivity.
|
||||||
|
// Audio pipeline will be added once native threading is resolved.
|
||||||
|
let seq = AtomicU16::new(0);
|
||||||
|
let ts = AtomicU32::new(0);
|
||||||
|
let transport_recv = transport.clone();
|
||||||
|
|
||||||
|
let send_task = async {
|
||||||
|
let silence = vec![0u8; 20]; // minimal opus silence frame
|
||||||
|
loop {
|
||||||
|
if !state.running.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let s = seq.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let t = ts.fetch_add(20, Ordering::Relaxed);
|
||||||
|
let packet = MediaPacket {
|
||||||
|
header: MediaHeader {
|
||||||
|
version: 0,
|
||||||
|
is_repair: false,
|
||||||
|
codec_id: CodecId::Opus24k,
|
||||||
|
has_quality_report: false,
|
||||||
|
fec_ratio_encoded: 0,
|
||||||
|
seq: s,
|
||||||
|
timestamp: t,
|
||||||
|
fec_block: 0,
|
||||||
|
fec_symbol: 0,
|
||||||
|
reserved: 0,
|
||||||
|
csrc_count: 0,
|
||||||
|
},
|
||||||
|
payload: Bytes::from(silence.clone()),
|
||||||
|
quality_report: None,
|
||||||
|
};
|
||||||
|
if let Err(e) = transport.send_media(&packet).await {
|
||||||
|
error!("send error: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// 20ms frame interval
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let recv_task = async {
|
||||||
|
let mut frames_decoded: u64 = 0;
|
||||||
|
loop {
|
||||||
|
if !state.running.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
match transport_recv.recv_media().await {
|
||||||
|
Ok(Some(_pkt)) => {
|
||||||
|
frames_decoded += 1;
|
||||||
|
let mut stats = state.stats.lock().unwrap();
|
||||||
|
stats.frames_decoded = frames_decoded;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
info!("relay disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("recv error: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Update encoded frame count in send task
|
||||||
|
let stats_task = async {
|
||||||
|
loop {
|
||||||
|
if !state.running.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut stats = state.stats.lock().unwrap();
|
||||||
|
stats.frames_encoded = seq.load(Ordering::Relaxed) as u64;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = send_task => {}
|
||||||
|
_ = recv_task => {}
|
||||||
|
_ = stats_task => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
transport.close().await.ok();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
BIN
wzp-release.apk
BIN
wzp-release.apk
Binary file not shown.
Reference in New Issue
Block a user