feat: Android VoIP client — Phase 1 (audio quality, network adaptation, crate skeleton)

- New wzp-android crate with Oboe C++ backend, lock-free SPSC ring buffers,
  engine orchestrator, codec pipeline, and Android Gradle project structure
- AEC (NLMS adaptive filter), AGC (two-stage with fast attack/slow release),
  windowed-sinc FIR resampler replacing linear interpolation (wzp-codec)
- Opus encoder tuning: complexity 7 default, set_expected_loss support
- Mobile jitter buffer: asymmetric EMA (fast up/slow down), handoff spike
  detection with 2s cooldown, configurable safety margin
- Network-aware quality control: cellular-specific thresholds, faster
  downgrade on cellular, proactive tier drop on WiFi→cellular handoff,
  FEC ratio boost during network transitions
- Handoff detection in PathMonitor via RTT jitter spike analysis

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude
2026-04-04 18:07:55 +00:00
parent aa09275015
commit 26e9c55f1f
31 changed files with 2775 additions and 245 deletions

View File

@@ -0,0 +1,30 @@
[package]
name = "wzp-android"
version.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
description = "WarzonePhone Android native VoIP engine — Oboe audio, JNI bridge, call pipeline"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
wzp-proto = { workspace = true }
wzp-codec = { workspace = true }
wzp-fec = { workspace = true }
wzp-crypto = { workspace = true }
wzp-transport = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
bytes = { workspace = true }
serde = { workspace = true }
serde_json = "1"
thiserror = { workspace = true }
async-trait = { workspace = true }
anyhow = "1"
libc = "0.2"
[build-dependencies]
cc = "1"

View File

@@ -0,0 +1,20 @@
fn main() {
let target = std::env::var("TARGET").unwrap_or_default();
if target.contains("android") {
// Real Oboe build for Android targets
cc::Build::new()
.cpp(true)
.std("c++17")
.file("cpp/oboe_bridge.cpp")
.include("cpp")
.compile("oboe_bridge");
} else {
// Stub for host builds / testing
cc::Build::new()
.cpp(true)
.std("c++17")
.file("cpp/oboe_stub.cpp")
.include("cpp")
.compile("oboe_bridge");
}
}

View File

@@ -0,0 +1,278 @@
// Full Oboe implementation for Android
// This file is compiled only when targeting Android
#include "oboe_bridge.h"
#ifdef __ANDROID__
#include <oboe/Oboe.h>
#include <android/log.h>
#include <cstring>
#include <atomic>
#define LOG_TAG "wzp-oboe"
#define LOGI(...) __android_log_print(ANDROID_LOG_INFO, LOG_TAG, __VA_ARGS__)
#define LOGW(...) __android_log_print(ANDROID_LOG_WARN, LOG_TAG, __VA_ARGS__)
#define LOGE(...) __android_log_print(ANDROID_LOG_ERROR, LOG_TAG, __VA_ARGS__)
// ---------------------------------------------------------------------------
// Ring buffer helpers (SPSC, lock-free)
// ---------------------------------------------------------------------------
static inline int32_t ring_available_read(const wzp_atomic_int* write_idx,
const wzp_atomic_int* read_idx,
int32_t capacity) {
int32_t w = std::atomic_load_explicit(write_idx, std::memory_order_acquire);
int32_t r = std::atomic_load_explicit(read_idx, std::memory_order_relaxed);
int32_t avail = w - r;
if (avail < 0) avail += capacity;
return avail;
}
static inline int32_t ring_available_write(const wzp_atomic_int* write_idx,
const wzp_atomic_int* read_idx,
int32_t capacity) {
return capacity - 1 - ring_available_read(write_idx, read_idx, capacity);
}
static inline void ring_write(int16_t* buf, int32_t capacity,
wzp_atomic_int* write_idx, const wzp_atomic_int* read_idx,
const int16_t* src, int32_t count) {
int32_t w = std::atomic_load_explicit(write_idx, std::memory_order_relaxed);
for (int32_t i = 0; i < count; i++) {
buf[w] = src[i];
w++;
if (w >= capacity) w = 0;
}
std::atomic_store_explicit(write_idx, w, std::memory_order_release);
}
static inline void ring_read(int16_t* buf, int32_t capacity,
const wzp_atomic_int* write_idx, wzp_atomic_int* read_idx,
int16_t* dst, int32_t count) {
int32_t r = std::atomic_load_explicit(read_idx, std::memory_order_relaxed);
for (int32_t i = 0; i < count; i++) {
dst[i] = buf[r];
r++;
if (r >= capacity) r = 0;
}
std::atomic_store_explicit(read_idx, r, std::memory_order_release);
}
// ---------------------------------------------------------------------------
// Global state
// ---------------------------------------------------------------------------
static std::shared_ptr<oboe::AudioStream> g_capture_stream;
static std::shared_ptr<oboe::AudioStream> g_playout_stream;
static const WzpOboeRings* g_rings = nullptr;
static std::atomic<bool> g_running{false};
static std::atomic<float> g_capture_latency_ms{0.0f};
static std::atomic<float> g_playout_latency_ms{0.0f};
// ---------------------------------------------------------------------------
// Capture callback
// ---------------------------------------------------------------------------
class CaptureCallback : public oboe::AudioStreamDataCallback {
public:
oboe::DataCallbackResult onAudioReady(
oboe::AudioStream* stream,
void* audioData,
int32_t numFrames) override {
if (!g_running.load(std::std::memory_order_relaxed) || !g_rings) {
return oboe::DataCallbackResult::Stop;
}
const int16_t* src = static_cast<const int16_t*>(audioData);
int32_t avail = ring_available_write(g_rings->capture_write_idx,
g_rings->capture_read_idx,
g_rings->capture_capacity);
int32_t to_write = (numFrames < avail) ? numFrames : avail;
if (to_write > 0) {
ring_write(g_rings->capture_buf, g_rings->capture_capacity,
g_rings->capture_write_idx, g_rings->capture_read_idx,
src, to_write);
}
// Update latency estimate
auto result = stream->calculateLatencyMillis();
if (result) {
g_capture_latency_ms.store(static_cast<float>(result.value()),
std::std::memory_order_relaxed);
}
return oboe::DataCallbackResult::Continue;
}
};
// ---------------------------------------------------------------------------
// Playout callback
// ---------------------------------------------------------------------------
class PlayoutCallback : public oboe::AudioStreamDataCallback {
public:
oboe::DataCallbackResult onAudioReady(
oboe::AudioStream* stream,
void* audioData,
int32_t numFrames) override {
if (!g_running.load(std::std::memory_order_relaxed) || !g_rings) {
memset(audioData, 0, numFrames * sizeof(int16_t));
return oboe::DataCallbackResult::Stop;
}
int16_t* dst = static_cast<int16_t*>(audioData);
int32_t avail = ring_available_read(g_rings->playout_write_idx,
g_rings->playout_read_idx,
g_rings->playout_capacity);
int32_t to_read = (numFrames < avail) ? numFrames : avail;
if (to_read > 0) {
ring_read(g_rings->playout_buf, g_rings->playout_capacity,
g_rings->playout_write_idx, g_rings->playout_read_idx,
dst, to_read);
}
// Fill remainder with silence on underrun
if (to_read < numFrames) {
memset(dst + to_read, 0, (numFrames - to_read) * sizeof(int16_t));
}
// Update latency estimate
auto result = stream->calculateLatencyMillis();
if (result) {
g_playout_latency_ms.store(static_cast<float>(result.value()),
std::std::memory_order_relaxed);
}
return oboe::DataCallbackResult::Continue;
}
};
static CaptureCallback g_capture_cb;
static PlayoutCallback g_playout_cb;
// ---------------------------------------------------------------------------
// Public C API
// ---------------------------------------------------------------------------
int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) {
if (g_running.load(std::std::memory_order_relaxed)) {
LOGW("wzp_oboe_start: already running");
return -1;
}
g_rings = rings;
// Build capture stream
oboe::AudioStreamBuilder captureBuilder;
captureBuilder.setDirection(oboe::Direction::Input)
->setPerformanceMode(oboe::PerformanceMode::LowLatency)
->setSharingMode(oboe::SharingMode::Exclusive)
->setFormat(oboe::AudioFormat::I16)
->setChannelCount(config->channel_count)
->setSampleRate(config->sample_rate)
->setFramesPerDataCallback(config->frames_per_burst)
->setInputPreset(oboe::InputPreset::VoiceCommunication)
->setDataCallback(&g_capture_cb);
oboe::Result result = captureBuilder.openStream(g_capture_stream);
if (result != oboe::Result::OK) {
LOGE("Failed to open capture stream: %s", oboe::convertToText(result));
return -2;
}
// Build playout stream
oboe::AudioStreamBuilder playoutBuilder;
playoutBuilder.setDirection(oboe::Direction::Output)
->setPerformanceMode(oboe::PerformanceMode::LowLatency)
->setSharingMode(oboe::SharingMode::Exclusive)
->setFormat(oboe::AudioFormat::I16)
->setChannelCount(config->channel_count)
->setSampleRate(config->sample_rate)
->setFramesPerDataCallback(config->frames_per_burst)
->setUsage(oboe::Usage::VoiceCommunication)
->setDataCallback(&g_playout_cb);
result = playoutBuilder.openStream(g_playout_stream);
if (result != oboe::Result::OK) {
LOGE("Failed to open playout stream: %s", oboe::convertToText(result));
g_capture_stream->close();
g_capture_stream.reset();
return -3;
}
g_running.store(true, std::std::memory_order_release);
// Start both streams
result = g_capture_stream->requestStart();
if (result != oboe::Result::OK) {
LOGE("Failed to start capture: %s", oboe::convertToText(result));
g_running.store(false, std::std::memory_order_release);
g_capture_stream->close();
g_playout_stream->close();
g_capture_stream.reset();
g_playout_stream.reset();
return -4;
}
result = g_playout_stream->requestStart();
if (result != oboe::Result::OK) {
LOGE("Failed to start playout: %s", oboe::convertToText(result));
g_running.store(false, std::std::memory_order_release);
g_capture_stream->requestStop();
g_capture_stream->close();
g_playout_stream->close();
g_capture_stream.reset();
g_playout_stream.reset();
return -5;
}
LOGI("Oboe started: sr=%d burst=%d ch=%d",
config->sample_rate, config->frames_per_burst, config->channel_count);
return 0;
}
void wzp_oboe_stop(void) {
g_running.store(false, std::std::memory_order_release);
if (g_capture_stream) {
g_capture_stream->requestStop();
g_capture_stream->close();
g_capture_stream.reset();
}
if (g_playout_stream) {
g_playout_stream->requestStop();
g_playout_stream->close();
g_playout_stream.reset();
}
g_rings = nullptr;
LOGI("Oboe stopped");
}
float wzp_oboe_capture_latency_ms(void) {
return g_capture_latency_ms.load(std::std::memory_order_relaxed);
}
float wzp_oboe_playout_latency_ms(void) {
return g_playout_latency_ms.load(std::std::memory_order_relaxed);
}
int wzp_oboe_is_running(void) {
return g_running.load(std::std::memory_order_relaxed) ? 1 : 0;
}
#else
// Non-Android fallback — should not be reached; oboe_stub.cpp is used instead.
// Provide empty implementations just in case.
int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) {
(void)config; (void)rings;
return -99;
}
void wzp_oboe_stop(void) {}
float wzp_oboe_capture_latency_ms(void) { return 0.0f; }
float wzp_oboe_playout_latency_ms(void) { return 0.0f; }
int wzp_oboe_is_running(void) { return 0; }
#endif // __ANDROID__

View File

@@ -0,0 +1,43 @@
#ifndef WZP_OBOE_BRIDGE_H
#define WZP_OBOE_BRIDGE_H
#include <stdint.h>
#ifdef __cplusplus
#include <atomic>
typedef std::atomic<int32_t> wzp_atomic_int;
extern "C" {
#else
#include <stdatomic.h>
typedef atomic_int wzp_atomic_int;
#endif
typedef struct {
int32_t sample_rate;
int32_t frames_per_burst;
int32_t channel_count;
} WzpOboeConfig;
typedef struct {
int16_t* capture_buf;
int32_t capture_capacity;
wzp_atomic_int* capture_write_idx;
wzp_atomic_int* capture_read_idx;
int16_t* playout_buf;
int32_t playout_capacity;
wzp_atomic_int* playout_write_idx;
wzp_atomic_int* playout_read_idx;
} WzpOboeRings;
int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings);
void wzp_oboe_stop(void);
float wzp_oboe_capture_latency_ms(void);
float wzp_oboe_playout_latency_ms(void);
int wzp_oboe_is_running(void);
#ifdef __cplusplus
}
#endif
#endif // WZP_OBOE_BRIDGE_H

View File

@@ -0,0 +1,27 @@
// Stub implementation for non-Android host builds (testing, cargo check, etc.)
#include "oboe_bridge.h"
#include <stdio.h>
int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) {
(void)config;
(void)rings;
fprintf(stderr, "wzp_oboe_start: stub (not on Android)\n");
return 0;
}
void wzp_oboe_stop(void) {
fprintf(stderr, "wzp_oboe_stop: stub (not on Android)\n");
}
float wzp_oboe_capture_latency_ms(void) {
return 0.0f;
}
float wzp_oboe_playout_latency_ms(void) {
return 0.0f;
}
int wzp_oboe_is_running(void) {
return 0;
}

View File

@@ -0,0 +1,424 @@
//! Lock-free SPSC ring buffer audio backend for Android (Oboe).
//!
//! The ring buffers are shared between Rust and C++: the Oboe callbacks
//! (running on a high-priority audio thread) read/write directly into
//! the buffers via atomic indices, while the Rust codec thread on the
//! other side does the same.
use std::sync::atomic::{AtomicI32, Ordering};
use tracing::info;
#[allow(unused_imports)]
use tracing::warn;
/// Number of samples per 20 ms frame at 48 kHz mono.
pub const FRAME_SAMPLES: usize = 960;
/// Default ring buffer capacity: 8 frames = 160 ms at 48 kHz.
const RING_CAPACITY: usize = 7680;
// ---------------------------------------------------------------------------
// FFI declarations matching oboe_bridge.h
// ---------------------------------------------------------------------------
#[repr(C)]
#[allow(non_snake_case)]
struct WzpOboeConfig {
sample_rate: i32,
frames_per_burst: i32,
channel_count: i32,
}
#[repr(C)]
#[allow(non_snake_case)]
struct WzpOboeRings {
capture_buf: *mut i16,
capture_capacity: i32,
capture_write_idx: *mut AtomicI32,
capture_read_idx: *mut AtomicI32,
playout_buf: *mut i16,
playout_capacity: i32,
playout_write_idx: *mut AtomicI32,
playout_read_idx: *mut AtomicI32,
}
unsafe impl Send for WzpOboeRings {}
unsafe impl Sync for WzpOboeRings {}
unsafe extern "C" {
fn wzp_oboe_start(config: *const WzpOboeConfig, rings: *const WzpOboeRings) -> i32;
fn wzp_oboe_stop();
fn wzp_oboe_capture_latency_ms() -> f32;
fn wzp_oboe_playout_latency_ms() -> f32;
fn wzp_oboe_is_running() -> i32;
}
// ---------------------------------------------------------------------------
// SPSC Ring Buffer
// ---------------------------------------------------------------------------
/// Single-producer single-consumer lock-free ring buffer.
///
/// The producer calls `write()` and the consumer calls `read()`.
/// Atomics use acquire/release ordering to ensure correct visibility
/// across the Oboe audio thread and the Rust codec thread.
pub struct RingBuffer {
buf: Vec<i16>,
capacity: usize,
write_idx: AtomicI32,
read_idx: AtomicI32,
}
impl RingBuffer {
/// Create a new ring buffer with the given capacity (in samples).
///
/// The actual usable capacity is `capacity - 1` to distinguish
/// full from empty.
pub fn new(capacity: usize) -> Self {
Self {
buf: vec![0i16; capacity],
capacity,
write_idx: AtomicI32::new(0),
read_idx: AtomicI32::new(0),
}
}
/// Number of samples available to read.
pub fn available_read(&self) -> usize {
let w = self.write_idx.load(Ordering::Acquire);
let r = self.read_idx.load(Ordering::Relaxed);
let avail = w - r;
if avail < 0 {
(avail + self.capacity as i32) as usize
} else {
avail as usize
}
}
/// Number of samples that can be written before the buffer is full.
pub fn available_write(&self) -> usize {
self.capacity - 1 - self.available_read()
}
/// Write samples into the ring buffer (producer side).
///
/// Returns the number of samples actually written (may be less than
/// `data.len()` if the buffer is nearly full).
pub fn write(&self, data: &[i16]) -> usize {
let avail = self.available_write();
let count = data.len().min(avail);
if count == 0 {
return 0;
}
let mut w = self.write_idx.load(Ordering::Relaxed) as usize;
let cap = self.capacity;
let buf_ptr = self.buf.as_ptr() as *mut i16;
for i in 0..count {
// SAFETY: w is always in [0, capacity) and we are the sole producer.
unsafe {
*buf_ptr.add(w) = data[i];
}
w += 1;
if w >= cap {
w = 0;
}
}
self.write_idx.store(w as i32, Ordering::Release);
count
}
/// Read samples from the ring buffer (consumer side).
///
/// Returns the number of samples actually read (may be less than
/// `out.len()` if the buffer doesn't have enough data).
pub fn read(&self, out: &mut [i16]) -> usize {
let avail = self.available_read();
let count = out.len().min(avail);
if count == 0 {
return 0;
}
let mut r = self.read_idx.load(Ordering::Relaxed) as usize;
let cap = self.capacity;
let buf_ptr = self.buf.as_ptr();
for i in 0..count {
// SAFETY: r is always in [0, capacity) and we are the sole consumer.
unsafe {
out[i] = *buf_ptr.add(r);
}
r += 1;
if r >= cap {
r = 0;
}
}
self.read_idx.store(r as i32, Ordering::Release);
count
}
/// Get a raw pointer to the buffer data (for FFI).
fn buf_ptr(&self) -> *mut i16 {
self.buf.as_ptr() as *mut i16
}
/// Get a raw pointer to the write index atomic (for FFI).
fn write_idx_ptr(&self) -> *mut AtomicI32 {
&self.write_idx as *const AtomicI32 as *mut AtomicI32
}
/// Get a raw pointer to the read index atomic (for FFI).
fn read_idx_ptr(&self) -> *mut AtomicI32 {
&self.read_idx as *const AtomicI32 as *mut AtomicI32
}
}
// SAFETY: The ring buffer is designed for SPSC use where producer and consumer
// are on different threads. The atomic indices provide the synchronization.
unsafe impl Send for RingBuffer {}
unsafe impl Sync for RingBuffer {}
// ---------------------------------------------------------------------------
// Oboe Backend
// ---------------------------------------------------------------------------
/// Oboe-based audio backend for Android.
///
/// Owns two SPSC ring buffers (capture and playout) that are shared with
/// the C++ Oboe callbacks via raw pointers. The Oboe callbacks run on
/// high-priority audio threads managed by the Android audio system.
pub struct OboeBackend {
capture_ring: RingBuffer,
playout_ring: RingBuffer,
started: bool,
}
impl OboeBackend {
/// Create a new backend with default ring buffer sizes (160 ms each).
pub fn new() -> Self {
Self {
capture_ring: RingBuffer::new(RING_CAPACITY),
playout_ring: RingBuffer::new(RING_CAPACITY),
started: false,
}
}
/// Start Oboe audio streams.
///
/// This sets up the ring buffer pointers and calls into the C++ layer
/// to open and start the capture and playout Oboe streams.
pub fn start(&mut self) -> Result<(), anyhow::Error> {
if self.started {
return Ok(());
}
let config = WzpOboeConfig {
sample_rate: 48_000,
frames_per_burst: FRAME_SAMPLES as i32,
channel_count: 1,
};
let rings = WzpOboeRings {
capture_buf: self.capture_ring.buf_ptr(),
capture_capacity: self.capture_ring.capacity as i32,
capture_write_idx: self.capture_ring.write_idx_ptr(),
capture_read_idx: self.capture_ring.read_idx_ptr(),
playout_buf: self.playout_ring.buf_ptr(),
playout_capacity: self.playout_ring.capacity as i32,
playout_write_idx: self.playout_ring.write_idx_ptr(),
playout_read_idx: self.playout_ring.read_idx_ptr(),
};
let ret = unsafe { wzp_oboe_start(&config, &rings) };
if ret != 0 {
return Err(anyhow::anyhow!("wzp_oboe_start failed with code {}", ret));
}
self.started = true;
info!("Oboe backend started");
Ok(())
}
/// Stop Oboe audio streams.
pub fn stop(&mut self) {
if !self.started {
return;
}
unsafe { wzp_oboe_stop() };
self.started = false;
info!("Oboe backend stopped");
}
/// Read captured audio samples from the capture ring buffer.
///
/// Returns the number of samples actually read. The caller should
/// provide a buffer of at least `FRAME_SAMPLES` (960) samples.
pub fn read_capture(&self, out: &mut [i16]) -> usize {
self.capture_ring.read(out)
}
/// Write audio samples to the playout ring buffer.
///
/// Returns the number of samples actually written.
pub fn write_playout(&self, samples: &[i16]) -> usize {
self.playout_ring.write(samples)
}
/// Get the current capture latency in milliseconds (from Oboe).
#[allow(unused)]
pub fn capture_latency_ms(&self) -> f32 {
unsafe { wzp_oboe_capture_latency_ms() }
}
/// Get the current playout latency in milliseconds (from Oboe).
#[allow(unused)]
pub fn playout_latency_ms(&self) -> f32 {
unsafe { wzp_oboe_playout_latency_ms() }
}
/// Check if the Oboe streams are currently running.
#[allow(unused)]
pub fn is_running(&self) -> bool {
unsafe { wzp_oboe_is_running() != 0 }
}
}
impl Drop for OboeBackend {
fn drop(&mut self) {
self.stop();
}
}
// ---------------------------------------------------------------------------
// Thread affinity / priority helpers
// ---------------------------------------------------------------------------
/// Pin the current thread to the highest-numbered CPU cores (big cores on
/// ARM big.LITTLE architectures). Falls back silently on failure.
#[allow(unused)]
pub fn pin_to_big_core() {
#[cfg(target_os = "android")]
{
unsafe {
let num_cpus = libc::sysconf(libc::_SC_NPROCESSORS_ONLN);
if num_cpus <= 0 {
warn!("pin_to_big_core: could not determine CPU count");
return;
}
let num_cpus = num_cpus as usize;
// Target the upper half of CPUs (big cores on most big.LITTLE SoCs)
let start = num_cpus / 2;
let mut set: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_ZERO(&mut set);
for cpu in start..num_cpus {
libc::CPU_SET(cpu, &mut set);
}
let ret = libc::sched_setaffinity(
0, // current thread
std::mem::size_of::<libc::cpu_set_t>(),
&set,
);
if ret != 0 {
warn!("sched_setaffinity failed: {}", std::io::Error::last_os_error());
} else {
info!(start, num_cpus, "pinned to big cores");
}
}
}
#[cfg(not(target_os = "android"))]
{
// No-op on non-Android
}
}
/// Attempt to set SCHED_FIFO real-time priority for the current thread.
/// Falls back silently on failure (requires appropriate permissions on Android).
#[allow(unused)]
pub fn set_realtime_priority() {
#[cfg(target_os = "android")]
{
unsafe {
let param = libc::sched_param {
sched_priority: 2, // Low RT priority — enough for audio, safe
};
let ret = libc::sched_setscheduler(0, libc::SCHED_FIFO, &param);
if ret != 0 {
warn!(
"sched_setscheduler(SCHED_FIFO) failed: {}",
std::io::Error::last_os_error()
);
} else {
info!("set SCHED_FIFO priority 2");
}
}
}
#[cfg(not(target_os = "android"))]
{
// No-op on non-Android
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ring_buffer_write_read() {
let ring = RingBuffer::new(16);
let data = [1i16, 2, 3, 4, 5];
assert_eq!(ring.write(&data), 5);
assert_eq!(ring.available_read(), 5);
let mut out = [0i16; 5];
assert_eq!(ring.read(&mut out), 5);
assert_eq!(out, [1, 2, 3, 4, 5]);
assert_eq!(ring.available_read(), 0);
}
#[test]
fn ring_buffer_wraparound() {
let ring = RingBuffer::new(8);
let data = [10i16, 20, 30, 40, 50, 60]; // 6 samples, capacity 8 (usable 7)
assert_eq!(ring.write(&data), 6);
let mut out = [0i16; 4];
assert_eq!(ring.read(&mut out), 4);
assert_eq!(out, [10, 20, 30, 40]);
// Now write more, which should wrap around
let data2 = [70i16, 80, 90, 100];
assert_eq!(ring.write(&data2), 4);
let mut out2 = [0i16; 6];
assert_eq!(ring.read(&mut out2), 6);
assert_eq!(out2, [50, 60, 70, 80, 90, 100]);
}
#[test]
fn ring_buffer_full() {
let ring = RingBuffer::new(4); // usable capacity = 3
let data = [1i16, 2, 3, 4, 5];
assert_eq!(ring.write(&data), 3); // Only 3 fit
assert_eq!(ring.available_write(), 0);
}
#[test]
fn oboe_backend_stub_start_stop() {
let mut backend = OboeBackend::new();
backend.start().expect("stub start should succeed");
assert!(backend.started);
backend.stop();
assert!(!backend.started);
}
}

View File

@@ -0,0 +1,15 @@
//! Engine commands sent from the JNI/UI thread to the engine.
use wzp_proto::QualityProfile;
/// Commands that can be sent to the running engine.
pub enum EngineCommand {
/// Mute or unmute the microphone.
SetMute(bool),
/// Enable or disable speaker (loudspeaker) mode.
SetSpeaker(bool),
/// Force a specific quality profile (overrides adaptive logic).
ForceProfile(QualityProfile),
/// Stop the call and shut down the engine.
Stop,
}

View File

@@ -0,0 +1,357 @@
//! Engine orchestrator — manages the call lifecycle.
//!
//! The engine owns:
//! - The Oboe audio backend (start/stop)
//! - A codec thread running the `Pipeline`
//! - A tokio runtime for async network I/O
//! - Command channel for control from the JNI/UI thread
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tracing::{error, info, warn};
use wzp_proto::QualityProfile;
use crate::audio_android::{OboeBackend, FRAME_SAMPLES};
use crate::commands::EngineCommand;
use crate::pipeline::Pipeline;
use crate::stats::{CallState, CallStats};
/// Configuration to start a call.
pub struct CallStartConfig {
/// Initial quality profile.
pub profile: QualityProfile,
/// Relay server address (host:port).
pub relay_addr: String,
/// Authentication token for the relay.
pub auth_token: Vec<u8>,
/// 32-byte identity seed for key derivation.
pub identity_seed: [u8; 32],
}
impl Default for CallStartConfig {
fn default() -> Self {
Self {
profile: QualityProfile::GOOD,
relay_addr: String::new(),
auth_token: Vec::new(),
identity_seed: [0u8; 32],
}
}
}
/// Shared state between the engine owner and background threads.
struct EngineState {
running: AtomicBool,
muted: AtomicBool,
speaker: AtomicBool,
stats: Mutex<CallStats>,
command_tx: std::sync::mpsc::Sender<EngineCommand>,
command_rx: Mutex<Option<std::sync::mpsc::Receiver<EngineCommand>>>,
}
/// The WarzonePhone Android engine.
///
/// Manages the entire call pipeline: audio capture/playout via Oboe,
/// codec encode/decode, FEC, jitter buffer, and network transport.
///
/// Thread model:
/// - **UI/JNI thread**: calls `start_call`, `stop_call`, `set_mute`, etc.
/// - **Codec thread**: runs `Pipeline` encode/decode loop, reads/writes ring buffers
/// - **Tokio runtime** (2 worker threads): async network send/recv
pub struct WzpEngine {
state: Arc<EngineState>,
codec_thread: Option<std::thread::JoinHandle<()>>,
#[allow(unused)]
tokio_runtime: Option<tokio::runtime::Runtime>,
call_start: Option<Instant>,
}
impl WzpEngine {
/// Create a new idle engine.
pub fn new() -> Self {
let (tx, rx) = std::sync::mpsc::channel();
let state = Arc::new(EngineState {
running: AtomicBool::new(false),
muted: AtomicBool::new(false),
speaker: AtomicBool::new(false),
stats: Mutex::new(CallStats::default()),
command_tx: tx,
command_rx: Mutex::new(Some(rx)),
});
Self {
state,
codec_thread: None,
tokio_runtime: None,
call_start: None,
}
}
/// Start a call with the given configuration.
///
/// This creates the tokio runtime, starts the Oboe audio backend,
/// and spawns the codec thread.
pub fn start_call(&mut self, config: CallStartConfig) -> Result<(), anyhow::Error> {
if self.state.running.load(Ordering::Acquire) {
return Err(anyhow::anyhow!("call already active"));
}
// Update state
{
let mut stats = self.state.stats.lock().unwrap();
*stats = CallStats {
state: CallState::Connecting,
..Default::default()
};
}
// Create tokio runtime with 2 worker threads
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.thread_name("wzp-net")
.enable_all()
.build()?;
// Create async channels for network send/recv
let (send_tx, mut _send_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(64);
let (_recv_tx, mut recv_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(64);
// Spawn network tasks (placeholder — will use wzp-transport)
let _relay_addr = config.relay_addr.clone();
runtime.spawn(async move {
// Network send task: reads from send_rx, sends via transport
// This will be implemented when wzp-transport Android support is added
while let Some(_packet) = _send_rx.recv().await {
// TODO: send via wzp-transport
}
});
let recv_tx_clone = _recv_tx.clone();
runtime.spawn(async move {
// Network recv task: reads from transport, writes to recv_rx
// This will be implemented when wzp-transport Android support is added
let _tx = recv_tx_clone;
// TODO: recv from wzp-transport and forward
});
// Take the command receiver (it can only be taken once)
let command_rx = self
.state
.command_rx
.lock()
.unwrap()
.take()
.ok_or_else(|| anyhow::anyhow!("command receiver already taken"))?;
// Start the codec thread
let state = self.state.clone();
let profile = config.profile;
let codec_thread = std::thread::Builder::new()
.name("wzp-codec".into())
.spawn(move || {
// Pin to big cores and set RT priority on Android
crate::audio_android::pin_to_big_core();
crate::audio_android::set_realtime_priority();
// Create audio backend
let mut audio = OboeBackend::new();
if let Err(e) = audio.start() {
error!("failed to start audio: {e}");
state.running.store(false, Ordering::Release);
return;
}
// Create pipeline
let mut pipeline = match Pipeline::new(profile) {
Ok(p) => p,
Err(e) => {
error!("failed to create pipeline: {e}");
audio.stop();
state.running.store(false, Ordering::Release);
return;
}
};
state.running.store(true, Ordering::Release);
{
let mut stats = state.stats.lock().unwrap();
stats.state = CallState::Active;
}
info!("codec thread started");
let mut capture_buf = vec![0i16; FRAME_SAMPLES];
#[allow(unused_assignments)]
let mut recv_buf: Vec<u8> = Vec::new();
// Main codec loop: 20ms per iteration
let frame_duration = std::time::Duration::from_millis(20);
while state.running.load(Ordering::Relaxed) {
let loop_start = Instant::now();
// Process commands (non-blocking)
while let Ok(cmd) = command_rx.try_recv() {
match cmd {
EngineCommand::SetMute(m) => {
state.muted.store(m, Ordering::Relaxed);
info!(muted = m, "mute toggled");
}
EngineCommand::SetSpeaker(s) => {
state.speaker.store(s, Ordering::Relaxed);
info!(speaker = s, "speaker toggled");
}
EngineCommand::ForceProfile(p) => {
pipeline.force_profile(p);
info!(?p, "profile forced");
}
EngineCommand::Stop => {
info!("stop command received");
state.running.store(false, Ordering::Release);
break;
}
}
}
if !state.running.load(Ordering::Relaxed) {
break;
}
// --- Capture → Encode → Send ---
let captured = audio.read_capture(&mut capture_buf);
if captured >= FRAME_SAMPLES {
let muted = state.muted.load(Ordering::Relaxed);
if let Some(encoded) = pipeline.encode_frame(&capture_buf, muted) {
// Send to network (best-effort)
let _ = send_tx.try_send(encoded);
}
}
// --- Recv → Decode → Playout ---
// Drain received packets from the network channel
while let Ok(data) = recv_rx.try_recv() {
recv_buf = data;
// Deserialize the packet and feed to pipeline
// For now, feed raw bytes — full MediaPacket deserialization
// will be added with the transport integration
let _ = &recv_buf; // suppress unused warning
}
// Decode from jitter buffer
if let Some(pcm) = pipeline.decode_frame() {
audio.write_playout(&pcm);
}
// --- Update stats ---
{
let pstats = pipeline.stats();
let mut stats = state.stats.lock().unwrap();
stats.frames_encoded = pstats.frames_encoded;
stats.frames_decoded = pstats.frames_decoded;
stats.underruns = pstats.underruns;
stats.jitter_buffer_depth = pstats.jitter_depth;
stats.quality_tier = pstats.quality_tier;
}
// Sleep for remainder of the 20ms frame period
let elapsed = loop_start.elapsed();
if elapsed < frame_duration {
std::thread::sleep(frame_duration - elapsed);
}
}
// Cleanup
audio.stop();
{
let mut stats = state.stats.lock().unwrap();
stats.state = CallState::Closed;
}
info!("codec thread exited");
})?;
self.codec_thread = Some(codec_thread);
self.tokio_runtime = Some(runtime);
self.call_start = Some(Instant::now());
info!("call started");
Ok(())
}
/// Stop the current call and clean up all resources.
pub fn stop_call(&mut self) {
if !self.state.running.load(Ordering::Acquire) {
return;
}
// Signal stop
self.state.running.store(false, Ordering::Release);
let _ = self.state.command_tx.send(EngineCommand::Stop);
// Join codec thread
if let Some(handle) = self.codec_thread.take() {
if let Err(e) = handle.join() {
warn!("codec thread panicked: {e:?}");
}
}
// Shut down tokio runtime
if let Some(rt) = self.tokio_runtime.take() {
rt.shutdown_timeout(std::time::Duration::from_secs(2));
}
self.call_start = None;
info!("call stopped");
}
/// Set microphone mute state.
pub fn set_mute(&self, muted: bool) {
let _ = self.state.command_tx.send(EngineCommand::SetMute(muted));
}
/// Set speaker (loudspeaker) mode.
#[allow(unused)]
pub fn set_speaker(&self, enabled: bool) {
let _ = self
.state
.command_tx
.send(EngineCommand::SetSpeaker(enabled));
}
/// Force a specific quality profile (overrides adaptive logic).
#[allow(unused)]
pub fn force_profile(&self, profile: QualityProfile) {
let _ = self
.state
.command_tx
.send(EngineCommand::ForceProfile(profile));
}
/// Get a snapshot of the current call statistics.
pub fn get_stats(&self) -> CallStats {
let mut stats = self.state.stats.lock().unwrap().clone();
// Update duration from wall clock
if let Some(start) = self.call_start {
stats.duration_secs = start.elapsed().as_secs_f64();
}
stats
}
/// Check if a call is currently active.
pub fn is_active(&self) -> bool {
self.state.running.load(Ordering::Acquire)
}
/// Destroy the engine, stopping any active call.
pub fn destroy(mut self) {
self.stop_call();
info!("engine destroyed");
}
}
impl Drop for WzpEngine {
fn drop(&mut self) {
self.stop_call();
}
}

View File

@@ -0,0 +1,17 @@
//! WarzonePhone Android native VoIP engine.
//!
//! Provides:
//! - Oboe audio backend with lock-free SPSC ring buffers
//! - Engine orchestrator managing call lifecycle
//! - Codec pipeline thread (encode/decode/FEC/jitter)
//! - Call statistics and command interface
//!
//! On non-Android targets, the Oboe C++ layer compiles as a stub,
//! allowing `cargo check` and unit tests on the host.
pub mod audio_android;
pub mod commands;
pub mod engine;
pub mod pipeline;
pub mod stats;
// pub mod jni_bridge; // Added later by Agent 4

View File

@@ -0,0 +1,224 @@
//! Codec pipeline — encode/decode with FEC and jitter buffer.
//!
//! Runs on a dedicated thread, processing 20 ms frames at 48 kHz.
//! The pipeline is NOT Send/Sync (Opus encoder state) — it is owned
//! exclusively by the codec thread.
use tracing::{debug, warn};
use wzp_codec::{AdaptiveDecoder, AdaptiveEncoder};
use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder};
use wzp_proto::jitter::{JitterBuffer, PlayoutResult};
use wzp_proto::quality::AdaptiveQualityController;
use wzp_proto::traits::{AudioDecoder, AudioEncoder, FecDecoder, FecEncoder};
use wzp_proto::traits::QualityController;
use wzp_proto::{MediaPacket, QualityProfile};
use crate::audio_android::FRAME_SAMPLES;
/// Maximum encoded frame size (Opus worst case at highest bitrate).
const MAX_ENCODED_BYTES: usize = 1275;
/// Pipeline statistics snapshot.
#[derive(Clone, Debug, Default)]
pub struct PipelineStats {
pub frames_encoded: u64,
pub frames_decoded: u64,
pub underruns: u64,
pub jitter_depth: usize,
pub quality_tier: u8,
}
/// The codec pipeline: encode, FEC, jitter buffer, decode.
///
/// This struct is owned by the codec thread and not shared.
pub struct Pipeline {
encoder: AdaptiveEncoder,
decoder: AdaptiveDecoder,
fec_encoder: RaptorQFecEncoder,
fec_decoder: RaptorQFecDecoder,
jitter_buffer: JitterBuffer,
quality_ctrl: AdaptiveQualityController,
// Pre-allocated scratch buffers
capture_buf: Vec<i16>,
#[allow(dead_code)]
playout_buf: Vec<i16>,
encode_out: Vec<u8>,
// Stats counters
frames_encoded: u64,
frames_decoded: u64,
underruns: u64,
}
impl Pipeline {
/// Create a new pipeline configured for the given quality profile.
pub fn new(profile: QualityProfile) -> Result<Self, anyhow::Error> {
let encoder = AdaptiveEncoder::new(profile)
.map_err(|e| anyhow::anyhow!("encoder init: {e}"))?;
let decoder = AdaptiveDecoder::new(profile)
.map_err(|e| anyhow::anyhow!("decoder init: {e}"))?;
let fec_encoder =
RaptorQFecEncoder::with_defaults(profile.frames_per_block as usize);
let fec_decoder =
RaptorQFecDecoder::with_defaults(profile.frames_per_block as usize);
let jitter_buffer = JitterBuffer::new(10, 250, 3);
let quality_ctrl = AdaptiveQualityController::new();
Ok(Self {
encoder,
decoder,
fec_encoder,
fec_decoder,
jitter_buffer,
quality_ctrl,
capture_buf: vec![0i16; FRAME_SAMPLES],
playout_buf: vec![0i16; FRAME_SAMPLES],
encode_out: vec![0u8; MAX_ENCODED_BYTES],
frames_encoded: 0,
frames_decoded: 0,
underruns: 0,
})
}
/// Encode a PCM frame into a compressed packet.
///
/// If `muted` is true, a silence frame is encoded (all zeros).
/// Returns the encoded bytes, or `None` on encoder error.
pub fn encode_frame(&mut self, pcm: &[i16], muted: bool) -> Option<Vec<u8>> {
let input = if muted {
// Zero the capture buffer for silence
for s in self.capture_buf.iter_mut() {
*s = 0;
}
&self.capture_buf[..]
} else {
pcm
};
match self.encoder.encode(input, &mut self.encode_out) {
Ok(n) => {
self.frames_encoded += 1;
let encoded = self.encode_out[..n].to_vec();
// Feed into FEC encoder
if let Err(e) = self.fec_encoder.add_source_symbol(&encoded) {
warn!("FEC encode error: {e}");
}
Some(encoded)
}
Err(e) => {
warn!("encode error: {e}");
None
}
}
}
/// Feed a received media packet into the jitter buffer.
pub fn feed_packet(&mut self, packet: MediaPacket) {
// Feed FEC symbols if present
let header = &packet.header;
if header.fec_block != 0 || header.fec_symbol != 0 {
let is_repair = header.is_repair;
if let Err(e) = self.fec_decoder.add_symbol(
header.fec_block,
header.fec_symbol,
is_repair,
&packet.payload,
) {
debug!("FEC symbol feed error: {e}");
}
}
self.jitter_buffer.push(packet);
}
/// Decode the next frame from the jitter buffer.
///
/// Returns decoded PCM samples, or `None` if the buffer is not ready.
pub fn decode_frame(&mut self) -> Option<Vec<i16>> {
match self.jitter_buffer.pop() {
PlayoutResult::Packet(pkt) => {
let mut pcm = vec![0i16; FRAME_SAMPLES];
match self.decoder.decode(&pkt.payload, &mut pcm) {
Ok(n) => {
self.frames_decoded += 1;
pcm.truncate(n);
Some(pcm)
}
Err(e) => {
warn!("decode error: {e}");
// Attempt PLC
self.generate_plc()
}
}
}
PlayoutResult::Missing { seq } => {
debug!(seq, "jitter buffer: missing packet, generating PLC");
self.generate_plc()
}
PlayoutResult::NotReady => {
self.underruns += 1;
None
}
}
}
/// Generate packet loss concealment output.
fn generate_plc(&mut self) -> Option<Vec<i16>> {
let mut pcm = vec![0i16; FRAME_SAMPLES];
match self.decoder.decode_lost(&mut pcm) {
Ok(n) => {
self.frames_decoded += 1;
pcm.truncate(n);
Some(pcm)
}
Err(e) => {
warn!("PLC error: {e}");
None
}
}
}
/// Feed a quality report into the adaptive quality controller.
///
/// Returns a new profile if a tier transition occurred.
#[allow(unused)]
pub fn observe_quality(
&mut self,
report: &wzp_proto::QualityReport,
) -> Option<QualityProfile> {
let new_profile = self.quality_ctrl.observe(report);
if let Some(ref profile) = new_profile {
if let Err(e) = self.encoder.set_profile(*profile) {
warn!("encoder set_profile error: {e}");
}
if let Err(e) = self.decoder.set_profile(*profile) {
warn!("decoder set_profile error: {e}");
}
}
new_profile
}
/// Force a specific quality profile.
#[allow(unused)]
pub fn force_profile(&mut self, profile: QualityProfile) {
self.quality_ctrl.force_profile(profile);
if let Err(e) = self.encoder.set_profile(profile) {
warn!("encoder set_profile error: {e}");
}
if let Err(e) = self.decoder.set_profile(profile) {
warn!("decoder set_profile error: {e}");
}
}
/// Get current pipeline statistics.
pub fn stats(&self) -> PipelineStats {
PipelineStats {
frames_encoded: self.frames_encoded,
frames_decoded: self.frames_decoded,
underruns: self.underruns,
jitter_depth: self.jitter_buffer.stats().current_depth,
quality_tier: self.quality_ctrl.tier() as u8,
}
}
}

View File

@@ -0,0 +1,42 @@
//! Call statistics for the Android engine.
/// State of the call.
#[derive(Clone, Debug, Default, serde::Serialize, PartialEq, Eq)]
pub enum CallState {
/// Engine is idle, no active call.
#[default]
Idle,
/// Establishing connection to the relay.
Connecting,
/// Call is active with audio flowing.
Active,
/// Temporarily lost connection, attempting to recover.
Reconnecting,
/// Call has ended.
Closed,
}
/// Aggregated call statistics, serializable for JNI bridge.
#[derive(Clone, Debug, Default, serde::Serialize)]
pub struct CallStats {
/// Current call state.
pub state: CallState,
/// Call duration in seconds.
pub duration_secs: f64,
/// Current quality tier (0=GOOD, 1=DEGRADED, 2=CATASTROPHIC).
pub quality_tier: u8,
/// Observed packet loss percentage.
pub loss_pct: f32,
/// Smoothed round-trip time in milliseconds.
pub rtt_ms: u32,
/// Jitter in milliseconds.
pub jitter_ms: u32,
/// Current jitter buffer depth in packets.
pub jitter_buffer_depth: usize,
/// Total frames encoded since call start.
pub frames_encoded: u64,
/// Total frames decoded since call start.
pub frames_decoded: u64,
/// Number of playout underruns (buffer empty when audio needed).
pub underruns: u64,
}