diff --git a/android/android/app/src/main/jniLibs/arm64-v8a/libwzp_android.so b/android/android/app/src/main/jniLibs/arm64-v8a/libwzp_android.so index c16e610..fe43ca3 100755 Binary files a/android/android/app/src/main/jniLibs/arm64-v8a/libwzp_android.so and b/android/android/app/src/main/jniLibs/arm64-v8a/libwzp_android.so differ diff --git a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt new file mode 100644 index 0000000..d5c7dde --- /dev/null +++ b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt @@ -0,0 +1,174 @@ +package com.wzp.audio + +import android.Manifest +import android.content.Context +import android.content.pm.PackageManager +import android.media.AudioAttributes +import android.media.AudioFormat +import android.media.AudioRecord +import android.media.AudioTrack +import android.media.MediaRecorder +import android.util.Log +import androidx.core.content.ContextCompat +import com.wzp.engine.WzpEngine + +/** + * Audio pipeline that captures mic audio and plays received audio using + * Android AudioRecord/AudioTrack APIs running on JVM threads. + * + * PCM samples are shuttled to/from the Rust engine via JNI ring buffers: + * - Capture: AudioRecord → WzpEngine.writeAudio() → Rust encoder → network + * - Playout: network → Rust decoder → WzpEngine.readAudio() → AudioTrack + * + * All audio is 48kHz, mono, 16-bit PCM (matching Opus codec requirements). + */ +class AudioPipeline(private val context: Context) { + + companion object { + private const val TAG = "AudioPipeline" + private const val SAMPLE_RATE = 48000 + private const val CHANNEL_IN = AudioFormat.CHANNEL_IN_MONO + private const val CHANNEL_OUT = AudioFormat.CHANNEL_OUT_MONO + private const val ENCODING = AudioFormat.ENCODING_PCM_16BIT + /** 20ms frame at 48kHz = 960 samples */ + private const val FRAME_SAMPLES = 960 + } + + @Volatile + private var running = false + private var captureThread: Thread? = null + private var playoutThread: Thread? = null + + fun start(engine: WzpEngine) { + if (running) return + running = true + + captureThread = Thread({ + runCapture(engine) + }, "wzp-capture").apply { + priority = Thread.MAX_PRIORITY + start() + } + + playoutThread = Thread({ + runPlayout(engine) + }, "wzp-playout").apply { + priority = Thread.MAX_PRIORITY + start() + } + + Log.i(TAG, "audio pipeline started") + } + + fun stop() { + running = false + captureThread?.join(1000) + playoutThread?.join(1000) + captureThread = null + playoutThread = null + Log.i(TAG, "audio pipeline stopped") + } + + private fun runCapture(engine: WzpEngine) { + if (ContextCompat.checkSelfPermission(context, Manifest.permission.RECORD_AUDIO) + != PackageManager.PERMISSION_GRANTED + ) { + Log.e(TAG, "RECORD_AUDIO permission not granted, capture disabled") + return + } + + val minBuf = AudioRecord.getMinBufferSize(SAMPLE_RATE, CHANNEL_IN, ENCODING) + val bufSize = maxOf(minBuf, FRAME_SAMPLES * 2 * 4) // at least 4 frames + + val recorder = try { + AudioRecord( + MediaRecorder.AudioSource.VOICE_COMMUNICATION, + SAMPLE_RATE, + CHANNEL_IN, + ENCODING, + bufSize + ) + } catch (e: SecurityException) { + Log.e(TAG, "AudioRecord SecurityException: ${e.message}") + return + } + + if (recorder.state != AudioRecord.STATE_INITIALIZED) { + Log.e(TAG, "AudioRecord failed to initialize") + recorder.release() + return + } + + recorder.startRecording() + Log.i(TAG, "capture started: ${SAMPLE_RATE}Hz mono, buf=$bufSize") + + val pcm = ShortArray(FRAME_SAMPLES) + try { + while (running) { + val read = recorder.read(pcm, 0, FRAME_SAMPLES) + if (read > 0) { + engine.writeAudio(pcm) + } else if (read < 0) { + Log.e(TAG, "AudioRecord.read error: $read") + break + } + } + } finally { + recorder.stop() + recorder.release() + Log.i(TAG, "capture stopped") + } + } + + private fun runPlayout(engine: WzpEngine) { + val minBuf = AudioTrack.getMinBufferSize(SAMPLE_RATE, CHANNEL_OUT, ENCODING) + val bufSize = maxOf(minBuf, FRAME_SAMPLES * 2 * 4) + + val track = AudioTrack.Builder() + .setAudioAttributes( + AudioAttributes.Builder() + .setUsage(AudioAttributes.USAGE_VOICE_COMMUNICATION) + .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) + .build() + ) + .setAudioFormat( + AudioFormat.Builder() + .setSampleRate(SAMPLE_RATE) + .setChannelMask(CHANNEL_OUT) + .setEncoding(ENCODING) + .build() + ) + .setBufferSizeInBytes(bufSize) + .setTransferMode(AudioTrack.MODE_STREAM) + .build() + + if (track.state != AudioTrack.STATE_INITIALIZED) { + Log.e(TAG, "AudioTrack failed to initialize") + track.release() + return + } + + track.play() + Log.i(TAG, "playout started: ${SAMPLE_RATE}Hz mono, buf=$bufSize") + + val pcm = ShortArray(FRAME_SAMPLES) + val silence = ShortArray(FRAME_SAMPLES) // pre-allocated silence + try { + while (running) { + val read = engine.readAudio(pcm) + if (read >= FRAME_SAMPLES) { + track.write(pcm, 0, read) + } else { + // Not enough decoded audio — write silence to keep stream alive + track.write(silence, 0, FRAME_SAMPLES) + // Sleep briefly to avoid busy-spinning + Thread.sleep(5) + } + } + } finally { + track.stop() + track.release() + Log.i(TAG, "playout stopped") + } + } +} diff --git a/android/app/src/main/java/com/wzp/engine/CallStats.kt b/android/app/src/main/java/com/wzp/engine/CallStats.kt index a2ff79a..72955f3 100644 --- a/android/app/src/main/java/com/wzp/engine/CallStats.kt +++ b/android/app/src/main/java/com/wzp/engine/CallStats.kt @@ -27,7 +27,11 @@ data class CallStats( /** Total frames decoded since call start. */ val framesDecoded: Long = 0, /** Number of playout underruns (buffer empty when audio was needed). */ - val underruns: Long = 0 + val underruns: Long = 0, + /** Frames recovered by FEC. */ + val fecRecovered: Long = 0, + /** Current mic audio level (RMS, 0-32767). */ + val audioLevel: Int = 0 ) { /** Human-readable quality label. */ val qualityLabel: String @@ -53,7 +57,9 @@ data class CallStats( jitterBufferDepth = obj.optInt("jitter_buffer_depth", 0), framesEncoded = obj.optLong("frames_encoded", 0), framesDecoded = obj.optLong("frames_decoded", 0), - underruns = obj.optLong("underruns", 0) + underruns = obj.optLong("underruns", 0), + fecRecovered = obj.optLong("fec_recovered", 0), + audioLevel = obj.optInt("audio_level", 0) ) } catch (e: Exception) { CallStats() diff --git a/android/app/src/main/java/com/wzp/engine/WzpEngine.kt b/android/app/src/main/java/com/wzp/engine/WzpEngine.kt index 7780ca9..1d071b7 100644 --- a/android/app/src/main/java/com/wzp/engine/WzpEngine.kt +++ b/android/app/src/main/java/com/wzp/engine/WzpEngine.kt @@ -97,6 +97,24 @@ class WzpEngine(private val callback: WzpCallback) { } } + /** + * Write captured PCM samples into the engine's capture ring buffer. + * Called from the AudioRecord capture thread. + */ + fun writeAudio(pcm: ShortArray): Int { + if (nativeHandle == 0L) return 0 + return nativeWriteAudio(nativeHandle, pcm) + } + + /** + * Read decoded PCM samples from the engine's playout ring buffer. + * Called from the AudioTrack playout thread. + */ + fun readAudio(pcm: ShortArray): Int { + if (nativeHandle == 0L) return 0 + return nativeReadAudio(nativeHandle, pcm) + } + // -- JNI native methods -------------------------------------------------- private external fun nativeInit(): Long @@ -108,6 +126,8 @@ class WzpEngine(private val callback: WzpCallback) { private external fun nativeSetSpeaker(handle: Long, speaker: Boolean) private external fun nativeGetStats(handle: Long): String? private external fun nativeForceProfile(handle: Long, profile: Int) + private external fun nativeWriteAudio(handle: Long, pcm: ShortArray): Int + private external fun nativeReadAudio(handle: Long, pcm: ShortArray): Int private external fun nativeDestroy(handle: Long) companion object { diff --git a/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt b/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt index ad50004..aa2c4f0 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt @@ -39,6 +39,8 @@ class CallActivity : ComponentActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) + viewModel.setContext(this) + setContent { WzpTheme { InCallScreen( diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt index e21c307..c7cfd84 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt @@ -1,7 +1,9 @@ package com.wzp.ui.call +import android.content.Context import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope +import com.wzp.audio.AudioPipeline import com.wzp.engine.CallStats import com.wzp.engine.WzpCallback import com.wzp.engine.WzpEngine @@ -17,9 +19,11 @@ class CallViewModel : ViewModel(), WzpCallback { private var engine: WzpEngine? = null private var engineInitialized = false + private var audioPipeline: AudioPipeline? = null + private var audioStarted = false private val _callState = MutableStateFlow(0) - val callState: StateFlow = _callState.asStateFlow() + val callState: StateFlow get() = _callState.asStateFlow() private val _isMuted = MutableStateFlow(false) val isMuted: StateFlow = _isMuted.asStateFlow() @@ -36,16 +40,26 @@ class CallViewModel : ViewModel(), WzpCallback { private val _errorMessage = MutableStateFlow(null) val errorMessage: StateFlow = _errorMessage.asStateFlow() + private val _roomName = MutableStateFlow(DEFAULT_ROOM) + val roomName: StateFlow = _roomName.asStateFlow() + private var statsJob: Job? = null companion object { - const val DEFAULT_RELAY = "172.16.81.175:4433" + const val DEFAULT_RELAY = "pangolin.manko.yoga:4433" const val DEFAULT_ROOM = "android" } + /** Must be called once with Activity context before startCall. */ + fun setContext(context: Context) { + if (audioPipeline == null) { + audioPipeline = AudioPipeline(context.applicationContext) + } + } + fun startCall( relayAddr: String = DEFAULT_RELAY, - room: String = DEFAULT_ROOM + room: String = _roomName.value ) { try { if (engine == null) { @@ -58,9 +72,6 @@ class CallViewModel : ViewModel(), WzpCallback { _callState.value = 1 // Connecting startStatsPolling() - // startCall blocks (runs tokio on calling thread), so dispatch - // to a background coroutine. Using Dispatchers.IO which uses - // Java threads (not native pthread_create). viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) { try { val result = engine?.startCall(relayAddr, room) ?: -1 @@ -80,6 +91,7 @@ class CallViewModel : ViewModel(), WzpCallback { } fun stopCall() { + stopAudio() stopStatsPolling() try { engine?.stopCall() @@ -101,11 +113,26 @@ class CallViewModel : ViewModel(), WzpCallback { fun clearError() { _errorMessage.value = null } + fun setRoomName(name: String) { _roomName.value = name } + // WzpCallback override fun onCallStateChanged(state: Int) { _callState.value = state } override fun onQualityTierChanged(tier: Int) { _qualityTier.value = tier } override fun onError(code: Int, message: String) { _errorMessage.value = "Error $code: $message" } + private fun startAudio() { + if (audioStarted) return + val e = engine ?: return + audioPipeline?.start(e) + audioStarted = true + } + + private fun stopAudio() { + if (!audioStarted) return + audioPipeline?.stop() + audioStarted = false + } + private fun startStatsPolling() { statsJob?.cancel() statsJob = viewModelScope.launch { @@ -113,7 +140,16 @@ class CallViewModel : ViewModel(), WzpCallback { try { val json = engine?.getStats() ?: "{}" if (json.isNotEmpty()) { - _stats.value = CallStats.fromJson(json) + val s = CallStats.fromJson(json) + _stats.value = s + // Sync call state from native engine stats + if (s.state != 0) { + _callState.value = s.state + } + // Start audio pipeline when call becomes active + if (s.state == 2 && !audioStarted) { + startAudio() + } } } catch (_: Exception) {} delay(500L) @@ -128,6 +164,7 @@ class CallViewModel : ViewModel(), WzpCallback { override fun onCleared() { super.onCleared() + stopAudio() stopStatsPolling() try { engine?.stopCall() diff --git a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt index 9fabfdf..b29c9f3 100644 --- a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt +++ b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt @@ -21,6 +21,7 @@ import androidx.compose.material3.FilledTonalIconButton import androidx.compose.material3.IconButtonDefaults import androidx.compose.material3.LinearProgressIndicator import androidx.compose.material3.MaterialTheme +import androidx.compose.material3.OutlinedTextField import androidx.compose.material3.Surface import androidx.compose.material3.Text import androidx.compose.runtime.Composable @@ -48,6 +49,7 @@ fun InCallScreen( val stats by viewModel.stats.collectAsState() val qualityTier by viewModel.qualityTier.collectAsState() val errorMessage by viewModel.errorMessage.collectAsState() + val roomName by viewModel.roomName.collectAsState() Surface( modifier = Modifier.fillMaxSize(), @@ -83,11 +85,13 @@ fun InCallScreen( style = MaterialTheme.typography.bodyMedium, color = MaterialTheme.colorScheme.onSurfaceVariant ) - Spacer(modifier = Modifier.height(4.dp)) - Text( - text = "Room: ${CallViewModel.DEFAULT_ROOM}", - style = MaterialTheme.typography.bodyMedium, - color = MaterialTheme.colorScheme.onSurfaceVariant + Spacer(modifier = Modifier.height(8.dp)) + OutlinedTextField( + value = roomName, + onValueChange = { viewModel.setRoomName(it) }, + label = { Text("Room") }, + singleLine = true, + modifier = Modifier.fillMaxWidth(0.6f) ) Spacer(modifier = Modifier.height(32.dp)) @@ -132,7 +136,7 @@ fun InCallScreen( Spacer(modifier = Modifier.height(32.dp)) - AudioLevelBar(stats.framesEncoded) + AudioLevelBar(stats.audioLevel) Spacer(modifier = Modifier.weight(1f)) @@ -222,9 +226,11 @@ private fun QualityIndicator(tier: Int, label: String) { } @Composable -private fun AudioLevelBar(framesEncoded: Long) { - val level = if (framesEncoded > 0) { - ((framesEncoded % 100).toFloat() / 100f).coerceIn(0.05f, 1f) +private fun AudioLevelBar(audioLevel: Int) { + // audioLevel is RMS of i16 samples (0-32767). + // Map to 0.0-1.0 with a log-ish curve for better visual feel. + val level = if (audioLevel > 0) { + (audioLevel.toFloat() / 8000f).coerceIn(0.02f, 1f) } else { 0f } @@ -351,7 +357,7 @@ private fun StatsOverlay(stats: CallStats) { ) { StatItem("Enc", "${stats.framesEncoded}") StatItem("Dec", "${stats.framesDecoded}") - StatItem("JB", "${stats.jitterBufferDepth}") + StatItem("FEC", "${stats.fecRecovered}") StatItem("Under", "${stats.underruns}") } } diff --git a/crates/wzp-android/src/audio_ring.rs b/crates/wzp-android/src/audio_ring.rs new file mode 100644 index 0000000..fbb881c --- /dev/null +++ b/crates/wzp-android/src/audio_ring.rs @@ -0,0 +1,91 @@ +//! Lock-free SPSC ring buffers for audio PCM transfer between +//! Kotlin AudioRecord/AudioTrack threads and the Rust engine. +//! +//! These use a simple spin-free design: the producer writes and advances +//! a write cursor, the consumer reads and advances a read cursor. +//! Both cursors are atomic so no mutex is needed. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Ring buffer capacity in i16 samples. +/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono. +const RING_CAPACITY: usize = 960 * 10; + +/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples. +pub struct AudioRing { + buf: Box<[i16; RING_CAPACITY]>, + write_pos: AtomicUsize, + read_pos: AtomicUsize, +} + +// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads. +// The atomics ensure visibility. The buffer itself is never accessed +// from the same index by both threads simultaneously because the +// producer only writes to positions between write_pos and read_pos, +// and the consumer only reads from positions between read_pos and write_pos. +unsafe impl Send for AudioRing {} +unsafe impl Sync for AudioRing {} + +impl AudioRing { + pub fn new() -> Self { + Self { + buf: Box::new([0i16; RING_CAPACITY]), + write_pos: AtomicUsize::new(0), + read_pos: AtomicUsize::new(0), + } + } + + /// Number of samples available to read. + pub fn available(&self) -> usize { + let w = self.write_pos.load(Ordering::Acquire); + let r = self.read_pos.load(Ordering::Acquire); + w.wrapping_sub(r) + } + + /// Number of samples that can be written without overwriting. + pub fn free_space(&self) -> usize { + RING_CAPACITY - self.available() + } + + /// Write samples into the ring. Returns number of samples written. + /// Drops oldest samples if the ring is full. + pub fn write(&self, samples: &[i16]) -> usize { + let w = self.write_pos.load(Ordering::Relaxed); + let count = samples.len().min(RING_CAPACITY); + + for i in 0..count { + let idx = (w + i) % RING_CAPACITY; + // SAFETY: We're the only writer, and the reader won't read + // past read_pos which we haven't advanced past yet. + unsafe { + let ptr = self.buf.as_ptr() as *mut i16; + *ptr.add(idx) = samples[i]; + } + } + + self.write_pos.store(w.wrapping_add(count), Ordering::Release); + + // If we overwrote unread data, advance read_pos + if self.available() > RING_CAPACITY { + let new_read = self.write_pos.load(Ordering::Relaxed).wrapping_sub(RING_CAPACITY); + self.read_pos.store(new_read, Ordering::Release); + } + + count + } + + /// Read samples from the ring into `out`. Returns number of samples read. + pub fn read(&self, out: &mut [i16]) -> usize { + let avail = self.available(); + let count = out.len().min(avail); + + let r = self.read_pos.load(Ordering::Relaxed); + for i in 0..count { + let idx = (r + i) % RING_CAPACITY; + out[i] = unsafe { *self.buf.as_ptr().add(idx) }; + } + + self.read_pos.store(r.wrapping_add(count), Ordering::Release); + count + } +} diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index cbe8b60..6f41d6e 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -4,6 +4,9 @@ //! static bionic stubs in the Rust std prebuilt rlibs. ALL work must happen //! on the JNI calling thread or via the tokio current_thread runtime. //! No std::thread::spawn or tokio multi_thread allowed. +//! +//! Audio capture and playout happen on Kotlin JVM threads via AudioRecord +//! and AudioTrack. PCM samples are transferred through lock-free ring buffers. use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering}; @@ -11,15 +14,23 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use bytes::Bytes; -use tracing::{error, info}; +use tracing::{error, info, warn}; +use wzp_codec::opus_dec::OpusDecoder; +use wzp_codec::opus_enc::OpusEncoder; use wzp_crypto::{KeyExchange, WarzoneKeyExchange}; +use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder}; use wzp_proto::{ - CodecId, MediaHeader, MediaPacket, MediaTransport, QualityProfile, SignalMessage, + AudioDecoder, AudioEncoder, CodecId, FecDecoder, FecEncoder, + MediaHeader, MediaPacket, MediaTransport, QualityProfile, SignalMessage, }; +use crate::audio_ring::AudioRing; use crate::commands::EngineCommand; use crate::stats::{CallState, CallStats}; +/// Opus frame size at 48kHz mono, 20ms = 960 samples. +const FRAME_SAMPLES: usize = 960; + /// Configuration to start a call. pub struct CallStartConfig { pub profile: QualityProfile, @@ -41,16 +52,22 @@ impl Default for CallStartConfig { } } -struct EngineState { - running: AtomicBool, - muted: AtomicBool, - stats: Mutex, - command_tx: std::sync::mpsc::Sender, - command_rx: Mutex>>, +pub(crate) struct EngineState { + pub running: AtomicBool, + pub muted: AtomicBool, + pub stats: Mutex, + pub command_tx: std::sync::mpsc::Sender, + pub command_rx: Mutex>>, + /// Ring buffer: Kotlin AudioRecord → Rust encoder + pub capture_ring: AudioRing, + /// Ring buffer: Rust decoder → Kotlin AudioTrack + pub playout_ring: AudioRing, + /// Current audio level (RMS) for UI display, updated by capture path. + pub audio_level_rms: AtomicU32, } pub struct WzpEngine { - state: Arc, + pub(crate) state: Arc, tokio_runtime: Option, call_start: Option, } @@ -64,6 +81,9 @@ impl WzpEngine { stats: Mutex::new(CallStats::default()), command_tx: tx, command_rx: Mutex::new(Some(rx)), + capture_ring: AudioRing::new(), + playout_ring: AudioRing::new(), + audio_level_rms: AtomicU32::new(0), }); Self { state, @@ -85,8 +105,6 @@ impl WzpEngine { }; } - // Create single-threaded tokio runtime — NO thread spawning. - // On Android, pthread_create crashes due to static bionic stubs. let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; @@ -97,17 +115,16 @@ impl WzpEngine { let room = config.room.clone(); let identity_seed = config.identity_seed; + let profile = config.profile; let state = self.state.clone(); self.state.running.store(true, Ordering::Release); self.call_start = Some(Instant::now()); - // Run the entire call on the current thread's tokio runtime. - // This blocks the JNI thread until the call ends, so Kotlin - // must call startCall from a background coroutine. let state_clone = state.clone(); runtime.block_on(async move { - if let Err(e) = run_call(relay_addr, &room, &identity_seed, state_clone).await { + if let Err(e) = run_call(relay_addr, &room, &identity_seed, profile, state_clone).await + { error!("call failed: {e}"); } }); @@ -135,19 +152,17 @@ impl WzpEngine { self.state.muted.store(muted, Ordering::Relaxed); } - pub fn set_speaker(&self, _enabled: bool) { - // TODO: route audio via AudioManager on Kotlin side - } + pub fn set_speaker(&self, _enabled: bool) {} - pub fn force_profile(&self, _profile: QualityProfile) { - // TODO: wire to pipeline when codec thread is re-enabled - } + pub fn force_profile(&self, _profile: QualityProfile) {} pub fn get_stats(&self) -> CallStats { let mut stats = self.state.stats.lock().unwrap().clone(); if let Some(start) = self.call_start { stats.duration_secs = start.elapsed().as_secs_f64(); } + // Include current audio level + stats.audio_level = self.state.audio_level_rms.load(Ordering::Relaxed); stats } @@ -155,6 +170,23 @@ impl WzpEngine { self.state.running.load(Ordering::Acquire) } + pub fn write_audio(&self, samples: &[i16]) -> usize { + if self.state.muted.load(Ordering::Relaxed) { + return samples.len(); + } + // Compute RMS for audio level display + if !samples.is_empty() { + let sum_sq: f64 = samples.iter().map(|&s| (s as f64) * (s as f64)).sum(); + let rms = (sum_sq / samples.len() as f64).sqrt() as u32; + self.state.audio_level_rms.store(rms, Ordering::Relaxed); + } + self.state.capture_ring.write(samples) + } + + pub fn read_audio(&self, out: &mut [i16]) -> usize { + self.state.playout_ring.read(out) + } + pub fn destroy(mut self) { self.stop_call(); } @@ -166,22 +198,19 @@ impl Drop for WzpEngine { } } -/// Run the full call lifecycle: connect, handshake, send/recv media. -/// All async, no thread spawning. +/// Run the full call lifecycle: connect, handshake, send/recv media with Opus + FEC. async fn run_call( relay_addr: SocketAddr, room: &str, identity_seed: &[u8; 32], + profile: QualityProfile, state: Arc, ) -> Result<(), anyhow::Error> { - // Install rustls crypto provider let _ = rustls::crypto::ring::default_provider().install_default(); - // Create QUIC endpoint let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); let endpoint = wzp_transport::create_endpoint(bind_addr, None)?; - // Connect to relay with room as SNI let sni = if room.is_empty() { "android" } else { room }; info!(%relay_addr, sni, "connecting to relay..."); let client_cfg = wzp_transport::client_config(); @@ -236,58 +265,223 @@ async fn run_call( stats.state = CallState::Active; } - // Simple media loop: send silence, recv and count frames. - // No codec thread, no Oboe — just network I/O to verify connectivity. - // Audio pipeline will be added once native threading is resolved. + // Initialize Opus codec + let mut encoder = + OpusEncoder::new(profile).map_err(|e| anyhow::anyhow!("opus encoder init: {e}"))?; + let mut decoder = + OpusDecoder::new(profile).map_err(|e| anyhow::anyhow!("opus decoder init: {e}"))?; + + // Initialize FEC encoder/decoder + let mut fec_enc = wzp_fec::create_encoder(&profile); + let mut fec_dec = wzp_fec::create_decoder(&profile); + + info!( + fec_ratio = profile.fec_ratio, + frames_per_block = profile.frames_per_block, + "codec + FEC initialized (48kHz mono, 20ms frames, RaptorQ)" + ); + let seq = AtomicU16::new(0); let ts = AtomicU32::new(0); let transport_recv = transport.clone(); + // Pre-allocate buffers + let mut capture_buf = vec![0i16; FRAME_SAMPLES]; + let mut encode_buf = vec![0u8; encoder.max_frame_bytes()]; + let mut frame_in_block: u8 = 0; + let mut block_id: u8 = 0; + + // Send task: capture ring → Opus encode → FEC → MediaPackets let send_task = async { - let silence = vec![0u8; 20]; // minimal opus silence frame + info!("send task started (Opus + RaptorQ FEC)"); loop { if !state.running.load(Ordering::Relaxed) { break; } + + let avail = state.capture_ring.available(); + if avail < FRAME_SAMPLES { + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + continue; + } + + let read = state.capture_ring.read(&mut capture_buf); + if read < FRAME_SAMPLES { + continue; + } + + // Opus encode + let encoded_len = match encoder.encode(&capture_buf, &mut encode_buf) { + Ok(n) => n, + Err(e) => { + warn!("opus encode error: {e}"); + continue; + } + }; + let encoded = &encode_buf[..encoded_len]; + + // Build source packet let s = seq.fetch_add(1, Ordering::Relaxed); - let t = ts.fetch_add(20, Ordering::Relaxed); - let packet = MediaPacket { + let t = ts.fetch_add(FRAME_SAMPLES as u32, Ordering::Relaxed); + + let source_pkt = MediaPacket { header: MediaHeader { version: 0, is_repair: false, - codec_id: CodecId::Opus24k, + codec_id: profile.codec, has_quality_report: false, - fec_ratio_encoded: 0, + fec_ratio_encoded: MediaHeader::encode_fec_ratio(profile.fec_ratio), seq: s, timestamp: t, - fec_block: 0, - fec_symbol: 0, + fec_block: block_id, + fec_symbol: frame_in_block, reserved: 0, csrc_count: 0, }, - payload: Bytes::from(silence.clone()), + payload: Bytes::copy_from_slice(encoded), quality_report: None, }; - if let Err(e) = transport.send_media(&packet).await { + + // Send source packet + if let Err(e) = transport.send_media(&source_pkt).await { error!("send error: {e}"); break; } - // 20ms frame interval - tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + // Feed encoded frame to FEC encoder + if let Err(e) = fec_enc.add_source_symbol(encoded) { + warn!("fec add_source error: {e}"); + } + frame_in_block += 1; + + // When block is full, generate repair packets + if frame_in_block >= profile.frames_per_block { + match fec_enc.generate_repair(profile.fec_ratio) { + Ok(repairs) => { + let repair_count = repairs.len(); + for (sym_idx, repair_data) in repairs { + let rs = seq.fetch_add(1, Ordering::Relaxed); + let repair_pkt = MediaPacket { + header: MediaHeader { + version: 0, + is_repair: true, + codec_id: profile.codec, + has_quality_report: false, + fec_ratio_encoded: MediaHeader::encode_fec_ratio( + profile.fec_ratio, + ), + seq: rs, + timestamp: t, + fec_block: block_id, + fec_symbol: sym_idx, + reserved: 0, + csrc_count: 0, + }, + payload: Bytes::from(repair_data), + quality_report: None, + }; + if let Err(e) = transport.send_media(&repair_pkt).await { + error!("send repair error: {e}"); + break; + } + } + if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) { + info!( + block_id, + repair_count, + fec_ratio = profile.fec_ratio, + "FEC block complete" + ); + } + } + Err(e) => { + warn!("fec generate_repair error: {e}"); + } + } + + let _ = fec_enc.finalize_block(); + block_id = block_id.wrapping_add(1); + frame_in_block = 0; + } + + if s % 500 == 0 { + info!(seq = s, block_id, frame_in_block, "sending"); + } } }; + // Pre-allocate decode buffer + let mut decode_buf = vec![0i16; FRAME_SAMPLES]; + + // Recv task: MediaPackets → FEC decode → Opus decode → playout ring let recv_task = async { let mut frames_decoded: u64 = 0; + let mut fec_recovered: u64 = 0; + info!("recv task started (Opus + RaptorQ FEC)"); loop { if !state.running.load(Ordering::Relaxed) { break; } match transport_recv.recv_media().await { - Ok(Some(_pkt)) => { - frames_decoded += 1; + Ok(Some(pkt)) => { + let is_repair = pkt.header.is_repair; + let pkt_block = pkt.header.fec_block; + let pkt_symbol = pkt.header.fec_symbol; + + // Feed every packet (source + repair) to FEC decoder + let _ = fec_dec.add_symbol( + pkt_block, + pkt_symbol, + is_repair, + &pkt.payload, + ); + + // Source packets: decode directly + if !is_repair { + match decoder.decode(&pkt.payload, &mut decode_buf) { + Ok(samples) => { + state.playout_ring.write(&decode_buf[..samples]); + frames_decoded += 1; + } + Err(e) => { + warn!("opus decode error: {e}"); + if let Ok(samples) = decoder.decode_lost(&mut decode_buf) { + state.playout_ring.write(&decode_buf[..samples]); + } + } + } + } + + // Try FEC recovery for this block + // (useful when source packets were lost but repair arrived) + if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) { + // FEC recovered the block — any previously missing frames + // are now available. In a full jitter buffer implementation, + // we'd insert recovered frames at the right position. + // For now, log recovery for telemetry. + fec_recovered += recovered_frames.len() as u64; + if fec_recovered % 50 == 1 { + info!( + fec_recovered, + block = pkt_block, + frames = recovered_frames.len(), + "FEC block recovered" + ); + } + } + + // Expire old blocks to prevent memory growth + if pkt_block > 3 { + fec_dec.expire_before(pkt_block.wrapping_sub(3)); + } + + if frames_decoded == 1 || frames_decoded % 500 == 0 { + info!(frames_decoded, fec_recovered, "recv stats"); + } + let mut stats = state.stats.lock().unwrap(); stats.frames_decoded = frames_decoded; + stats.fec_recovered = fec_recovered; } Ok(None) => { info!("relay disconnected"); @@ -301,7 +495,7 @@ async fn run_call( } }; - // Update encoded frame count in send task + // Stats task let stats_task = async { loop { if !state.running.load(Ordering::Relaxed) { diff --git a/crates/wzp-android/src/jni_bridge.rs b/crates/wzp-android/src/jni_bridge.rs index 56c1457..33c4a50 100644 --- a/crates/wzp-android/src/jni_bridge.rs +++ b/crates/wzp-android/src/jni_bridge.rs @@ -174,6 +174,56 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeForceProfile( })); } +/// Write captured PCM samples from Kotlin AudioRecord into the engine's capture ring. +/// pcm is a Java short[] array. +#[unsafe(no_mangle)] +pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeWriteAudio( + env: JNIEnv, + _class: JClass, + handle: jlong, + pcm: jni::objects::JShortArray, +) -> jint { + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let h = unsafe { handle_ref(handle) }; + let len = env.get_array_length(&pcm).unwrap_or(0) as usize; + if len == 0 { + return 0; + } + let mut buf = vec![0i16; len]; + // GetShortArrayRegion copies Java array into our buffer + if env.get_short_array_region(&pcm, 0, &mut buf).is_err() { + return 0; + } + h.engine.write_audio(&buf) as jint + })); + result.unwrap_or(0) +} + +/// Read decoded PCM samples from the engine's playout ring for Kotlin AudioTrack. +/// pcm is a Java short[] array to fill. Returns number of samples actually read. +#[unsafe(no_mangle)] +pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeReadAudio( + env: JNIEnv, + _class: JClass, + handle: jlong, + pcm: jni::objects::JShortArray, +) -> jint { + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let h = unsafe { handle_ref(handle) }; + let len = env.get_array_length(&pcm).unwrap_or(0) as usize; + if len == 0 { + return 0; + } + let mut buf = vec![0i16; len]; + let read = h.engine.read_audio(&mut buf); + if read > 0 { + let _ = env.set_short_array_region(&pcm, 0, &buf[..read]); + } + read as jint + })); + result.unwrap_or(0) +} + #[unsafe(no_mangle)] pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeDestroy( _env: JNIEnv, diff --git a/crates/wzp-android/src/lib.rs b/crates/wzp-android/src/lib.rs index a5c7f2a..8c0d5df 100644 --- a/crates/wzp-android/src/lib.rs +++ b/crates/wzp-android/src/lib.rs @@ -10,6 +10,7 @@ //! allowing `cargo check` and unit tests on the host. pub mod audio_android; +pub mod audio_ring; pub mod commands; pub mod engine; pub mod pipeline; diff --git a/crates/wzp-android/src/stats.rs b/crates/wzp-android/src/stats.rs index cc480cc..0dbb2db 100644 --- a/crates/wzp-android/src/stats.rs +++ b/crates/wzp-android/src/stats.rs @@ -1,21 +1,31 @@ //! Call statistics for the Android engine. /// State of the call. -#[derive(Clone, Debug, Default, serde::Serialize, PartialEq, Eq)] +/// Serializes as integer for easy parsing on the Kotlin side: +/// 0=Idle, 1=Connecting, 2=Active, 3=Reconnecting, 4=Closed +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub enum CallState { - /// Engine is idle, no active call. #[default] Idle, - /// Establishing connection to the relay. Connecting, - /// Call is active with audio flowing. Active, - /// Temporarily lost connection, attempting to recover. Reconnecting, - /// Call has ended. Closed, } +impl serde::Serialize for CallState { + fn serialize(&self, serializer: S) -> Result { + let n: u8 = match self { + CallState::Idle => 0, + CallState::Connecting => 1, + CallState::Active => 2, + CallState::Reconnecting => 3, + CallState::Closed => 4, + }; + serializer.serialize_u8(n) + } +} + /// Aggregated call statistics, serializable for JNI bridge. #[derive(Clone, Debug, Default, serde::Serialize)] pub struct CallStats { @@ -39,4 +49,8 @@ pub struct CallStats { pub frames_decoded: u64, /// Number of playout underruns (buffer empty when audio needed). pub underruns: u64, + /// Frames recovered by FEC. + pub fec_recovered: u64, + /// Current mic audio level (RMS of i16 samples, 0-32767). + pub audio_level: u32, } diff --git a/images/photo_2026-04-05_16-03-40.jpg b/images/photo_2026-04-05_16-03-40.jpg new file mode 100644 index 0000000..abe638c Binary files /dev/null and b/images/photo_2026-04-05_16-03-40.jpg differ diff --git a/qr-download.png b/qr-download.png deleted file mode 100644 index c92c36e..0000000 Binary files a/qr-download.png and /dev/null differ diff --git a/wzp-release.apk b/wzp-release.apk index ed77328..2c75792 100644 Binary files a/wzp-release.apk and b/wzp-release.apk differ