feat: adaptive quality engine + codec indicator UI
Some checks failed
Mirror to GitHub / mirror (push) Failing after 38s
Build Release Binaries / build-amd64 (push) Failing after 2m17s

Wire AdaptiveQualityController into Android engine for auto codec
switching based on network quality reports. Add color-coded TX/RX
codec badges to the in-call screen showing active codecs and Auto mode.

- Recv task: ingest QualityReports, feed to controller, signal profile
  changes via AtomicU8 to send task
- Send task: check for pending profile switch at frame boundaries,
  update encoder/FEC/frame size
- Track peer codec from incoming packet headers
- Kotlin UI: codec badges (blue=studio, green=good, amber=degraded,
  red=catastrophic) with Auto tag
- Add .taskmaster to .gitignore

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 10:19:11 +04:00
parent f4cc3b1a6b
commit 0abecf7fd8
6 changed files with 214 additions and 12 deletions

25
.gitignore vendored
View File

@@ -4,3 +4,28 @@
*.swp *.swp
*.swo *.swo
*~ *~
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
dev-debug.log
# Dependency directories
node_modules/
# Environment variables
.env
# Editor directories and files
.idea
.vscode
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
# OS specific
# Taskmaster (local workflow tool)
.taskmaster/
.env.example

4
Cargo.lock generated
View File

@@ -4378,6 +4378,7 @@ dependencies = [
"rustls", "rustls",
"serde", "serde",
"serde_json", "serde_json",
"sha2",
"tokio", "tokio",
"toml", "toml",
"tower-http", "tower-http",
@@ -4397,10 +4398,13 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"ed25519-dalek",
"hkdf",
"quinn", "quinn",
"rcgen", "rcgen",
"rustls", "rustls",
"serde_json", "serde_json",
"sha2",
"tokio", "tokio",
"tracing", "tracing",
"wzp-proto", "wzp-proto",

View File

@@ -33,6 +33,12 @@ data class CallStats(
val fecRecovered: Long = 0, val fecRecovered: Long = 0,
/** Current mic audio level (RMS, 0-32767). */ /** Current mic audio level (RMS, 0-32767). */
val audioLevel: Int = 0, val audioLevel: Int = 0,
/** Our current outgoing codec (e.g. "Opus24k"). */
val currentCodec: String = "",
/** Last seen incoming codec from peers. */
val peerCodec: String = "",
/** Whether auto quality mode is active. */
val autoMode: Boolean = false,
/** Number of participants in the room. */ /** Number of participants in the room. */
val roomParticipantCount: Int = 0, val roomParticipantCount: Int = 0,
/** Participants in the room (fingerprint + optional alias). */ /** Participants in the room (fingerprint + optional alias). */
@@ -76,6 +82,9 @@ data class CallStats(
underruns = obj.optLong("underruns", 0), underruns = obj.optLong("underruns", 0),
fecRecovered = obj.optLong("fec_recovered", 0), fecRecovered = obj.optLong("fec_recovered", 0),
audioLevel = obj.optInt("audio_level", 0), audioLevel = obj.optInt("audio_level", 0),
currentCodec = obj.optString("current_codec", ""),
peerCodec = obj.optString("peer_codec", ""),
autoMode = obj.optBoolean("auto_mode", false),
roomParticipantCount = obj.optInt("room_participant_count", 0), roomParticipantCount = obj.optInt("room_participant_count", 0),
roomParticipants = parseParticipants(obj.optJSONArray("room_participants")) roomParticipants = parseParticipants(obj.optJSONArray("room_participants"))
) )

View File

@@ -463,7 +463,51 @@ fun InCallScreen(
Spacer(modifier = Modifier.height(12.dp)) Spacer(modifier = Modifier.height(12.dp))
// Stats // Codec + Stats
if (stats.currentCodec.isNotEmpty()) {
val codecLabel = formatCodecName(stats.currentCodec)
val peerLabel = if (stats.peerCodec.isNotEmpty()) formatCodecName(stats.peerCodec) else null
val autoTag = if (stats.autoMode) " [Auto]" else ""
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.Center,
verticalAlignment = Alignment.CenterVertically
) {
// Our codec badge
Surface(
shape = RoundedCornerShape(4.dp),
color = codecColor(stats.currentCodec)
) {
Text(
text = "TX $codecLabel$autoTag",
modifier = Modifier.padding(horizontal = 6.dp, vertical = 2.dp),
style = MaterialTheme.typography.labelSmall.copy(
fontFamily = FontFamily.Monospace,
fontSize = 10.sp
),
color = Color.White
)
}
if (peerLabel != null) {
Spacer(modifier = Modifier.width(6.dp))
Surface(
shape = RoundedCornerShape(4.dp),
color = codecColor(stats.peerCodec)
) {
Text(
text = "RX $peerLabel",
modifier = Modifier.padding(horizontal = 6.dp, vertical = 2.dp),
style = MaterialTheme.typography.labelSmall.copy(
fontFamily = FontFamily.Monospace,
fontSize = 10.sp
),
color = Color.White
)
}
}
}
Spacer(modifier = Modifier.height(4.dp))
}
Text( Text(
text = "TX: ${stats.framesEncoded} | RX: ${stats.framesDecoded}", text = "TX: ${stats.framesEncoded} | RX: ${stats.framesDecoded}",
style = MaterialTheme.typography.labelSmall.copy(fontFamily = FontFamily.Monospace), style = MaterialTheme.typography.labelSmall.copy(fontFamily = FontFamily.Monospace),
@@ -825,3 +869,25 @@ private fun DebugReportCard(
} }
} }
} }
/** Map Rust CodecId debug name to a human-readable label. */
private fun formatCodecName(codecId: String): String = when (codecId) {
"Opus64k" -> "Opus 64k"
"Opus48k" -> "Opus 48k"
"Opus32k" -> "Opus 32k"
"Opus24k" -> "Opus 24k"
"Opus16k" -> "Opus 16k"
"Opus6k" -> "Opus 6k"
"Codec2_3200" -> "C2 3.2k"
"Codec2_1200" -> "C2 1.2k"
else -> codecId
}
/** Color-code codec badges by quality tier. */
private fun codecColor(codecId: String): Color = when (codecId) {
"Opus64k", "Opus48k", "Opus32k" -> Color(0xFF0D6EFD) // blue — studio
"Opus24k", "Opus16k" -> Color(0xFF198754) // green — good
"Opus6k" -> Color(0xFFCC8800) // amber — degraded
"Codec2_3200", "Codec2_1200" -> Color(0xFFDC3545) // red — catastrophic
else -> Color(0xFF6C757D) // gray
}

View File

@@ -9,7 +9,7 @@
//! and AudioTrack. PCM samples are transferred through lock-free ring buffers. //! and AudioTrack. PCM samples are transferred through lock-free ring buffers.
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU16, AtomicU32, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Instant; use std::time::Instant;
@@ -19,8 +19,8 @@ use wzp_codec::agc::AutoGainControl;
use wzp_crypto::{KeyExchange, WarzoneKeyExchange}; use wzp_crypto::{KeyExchange, WarzoneKeyExchange};
use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder}; use wzp_fec::{RaptorQFecDecoder, RaptorQFecEncoder};
use wzp_proto::{ use wzp_proto::{
AudioDecoder, AudioEncoder, CodecId, FecDecoder, FecEncoder, AdaptiveQualityController, AudioDecoder, AudioEncoder, CodecId, FecDecoder, FecEncoder,
MediaHeader, MediaPacket, MediaTransport, QualityProfile, SignalMessage, MediaHeader, MediaPacket, MediaTransport, QualityController, QualityProfile, SignalMessage,
}; };
use crate::audio_ring::AudioRing; use crate::audio_ring::AudioRing;
@@ -30,6 +30,27 @@ use crate::stats::{CallState, CallStats};
/// Max frame size at 48kHz mono (40ms = 1920 samples, for Codec2/Opus6k). /// Max frame size at 48kHz mono (40ms = 1920 samples, for Codec2/Opus6k).
const MAX_FRAME_SAMPLES: usize = 1920; const MAX_FRAME_SAMPLES: usize = 1920;
/// Sentinel value: no profile change pending.
const PROFILE_NO_CHANGE: u8 = 0xFF;
/// All quality profiles in index order, for AtomicU8-based signaling.
const PROFILES: [QualityProfile; 6] = [
QualityProfile::STUDIO_64K, // 0
QualityProfile::STUDIO_48K, // 1
QualityProfile::STUDIO_32K, // 2
QualityProfile::GOOD, // 3
QualityProfile::DEGRADED, // 4
QualityProfile::CATASTROPHIC, // 5
];
fn profile_to_index(p: &QualityProfile) -> u8 {
PROFILES.iter().position(|pp| pp.codec == p.codec).map(|i| i as u8).unwrap_or(3)
}
fn index_to_profile(idx: u8) -> Option<QualityProfile> {
PROFILES.get(idx as usize).copied()
}
/// Compute frame samples at 48kHz for a given profile. /// Compute frame samples at 48kHz for a given profile.
fn frame_samples_for(profile: &QualityProfile) -> usize { fn frame_samples_for(profile: &QualityProfile) -> usize {
(profile.frame_duration_ms as usize) * 48 // 48000 / 1000 (profile.frame_duration_ms as usize) * 48 // 48000 / 1000
@@ -371,7 +392,7 @@ async fn run_call(
let mut capture_agc = AutoGainControl::new(); let mut capture_agc = AutoGainControl::new();
let mut playout_agc = AutoGainControl::new(); let mut playout_agc = AutoGainControl::new();
let frame_samples = frame_samples_for(&profile); let mut frame_samples = frame_samples_for(&profile);
info!( info!(
codec = ?profile.codec, codec = ?profile.codec,
fec_ratio = profile.fec_ratio, fec_ratio = profile.fec_ratio,
@@ -381,15 +402,27 @@ async fn run_call(
"codec + FEC + AGC initialized" "codec + FEC + AGC initialized"
); );
{
let mut stats = state.stats.lock().unwrap();
stats.current_codec = format!("{:?}", profile.codec);
stats.auto_mode = auto_profile;
}
let seq = AtomicU16::new(0); let seq = AtomicU16::new(0);
let ts = AtomicU32::new(0); let ts = AtomicU32::new(0);
let transport_recv = transport.clone(); let transport_recv = transport.clone();
// Adaptive quality: shared AtomicU8 between recv task (writer) and send task (reader).
// 0xFF = no change pending, 0-5 = index into PROFILES array.
let pending_profile = Arc::new(AtomicU8::new(PROFILE_NO_CHANGE));
let pending_profile_recv = pending_profile.clone();
// Pre-allocate buffers (sized for current profile) // Pre-allocate buffers (sized for current profile)
let mut capture_buf = vec![0i16; frame_samples]; let mut capture_buf = vec![0i16; frame_samples];
let mut encode_buf = vec![0u8; encoder.max_frame_bytes()]; let mut encode_buf = vec![0u8; encoder.max_frame_bytes()];
let mut frame_in_block: u8 = 0; let mut frame_in_block: u8 = 0;
let mut block_id: u8 = 0; let mut block_id: u8 = 0;
let mut current_profile = profile;
// Send task: capture ring → Opus encode → FEC → MediaPackets // Send task: capture ring → Opus encode → FEC → MediaPackets
// //
@@ -415,6 +448,39 @@ async fn run_call(
break; break;
} }
// Check for adaptive profile switch from recv task
if auto_profile {
let p = pending_profile.swap(PROFILE_NO_CHANGE, Ordering::Acquire);
if p != PROFILE_NO_CHANGE {
if let Some(new_profile) = index_to_profile(p) {
info!(
from = ?current_profile.codec,
to = ?new_profile.codec,
"auto: switching encoder profile"
);
if let Err(e) = encoder.set_profile(new_profile) {
warn!("encoder set_profile failed: {e}");
} else {
fec_enc = wzp_fec::create_encoder(&new_profile);
current_profile = new_profile;
let new_frame_samples = frame_samples_for(&new_profile);
if new_frame_samples != frame_samples {
frame_samples = new_frame_samples;
capture_buf.resize(frame_samples, 0);
}
encode_buf.resize(encoder.max_frame_bytes(), 0);
// Reset FEC block state for clean switch
frame_in_block = 0;
block_id = block_id.wrapping_add(1);
// Update stats with new codec
if let Ok(mut stats) = state.stats.lock() {
stats.current_codec = format!("{:?}", new_profile.codec);
}
}
}
}
}
let avail = state.capture_ring.available(); let avail = state.capture_ring.available();
if avail < frame_samples { if avail < frame_samples {
tokio::time::sleep(std::time::Duration::from_millis(5)).await; tokio::time::sleep(std::time::Duration::from_millis(5)).await;
@@ -457,9 +523,9 @@ async fn run_call(
header: MediaHeader { header: MediaHeader {
version: 0, version: 0,
is_repair: false, is_repair: false,
codec_id: profile.codec, codec_id: current_profile.codec,
has_quality_report: false, has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio(profile.fec_ratio), fec_ratio_encoded: MediaHeader::encode_fec_ratio(current_profile.fec_ratio),
seq: s, seq: s,
timestamp: t, timestamp: t,
fec_block: block_id, fec_block: block_id,
@@ -501,8 +567,8 @@ async fn run_call(
frame_in_block += 1; frame_in_block += 1;
// When block is full, generate repair packets // When block is full, generate repair packets
if frame_in_block >= profile.frames_per_block { if frame_in_block >= current_profile.frames_per_block {
match fec_enc.generate_repair(profile.fec_ratio) { match fec_enc.generate_repair(current_profile.fec_ratio) {
Ok(repairs) => { Ok(repairs) => {
let repair_count = repairs.len(); let repair_count = repairs.len();
for (sym_idx, repair_data) in repairs { for (sym_idx, repair_data) in repairs {
@@ -511,10 +577,10 @@ async fn run_call(
header: MediaHeader { header: MediaHeader {
version: 0, version: 0,
is_repair: true, is_repair: true,
codec_id: profile.codec, codec_id: current_profile.codec,
has_quality_report: false, has_quality_report: false,
fec_ratio_encoded: MediaHeader::encode_fec_ratio( fec_ratio_encoded: MediaHeader::encode_fec_ratio(
profile.fec_ratio, current_profile.fec_ratio,
), ),
seq: rs, seq: rs,
timestamp: t, timestamp: t,
@@ -537,7 +603,7 @@ async fn run_call(
info!( info!(
block_id, block_id,
repair_count, repair_count,
fec_ratio = profile.fec_ratio, fec_ratio = current_profile.fec_ratio,
"FEC block complete" "FEC block complete"
); );
} }
@@ -590,6 +656,8 @@ async fn run_call(
let mut last_recv_instant = Instant::now(); let mut last_recv_instant = Instant::now();
let mut max_recv_gap_ms: u64 = 0; let mut max_recv_gap_ms: u64 = 0;
let mut last_stats_log = Instant::now(); let mut last_stats_log = Instant::now();
let mut quality_ctrl = AdaptiveQualityController::new();
let mut last_peer_codec: Option<CodecId> = None;
info!("recv task started (Opus + RaptorQ FEC)"); info!("recv task started (Opus + RaptorQ FEC)");
loop { loop {
if !state.running.load(Ordering::Relaxed) { if !state.running.load(Ordering::Relaxed) {
@@ -612,6 +680,23 @@ async fn run_call(
); );
} }
// Adaptive quality: ingest quality reports from relay
if auto_profile {
if let Some(ref qr) = pkt.quality_report {
if let Some(new_profile) = quality_ctrl.observe(qr) {
let idx = profile_to_index(&new_profile);
info!(
loss = qr.loss_percent(),
rtt = qr.rtt_ms(),
tier = ?quality_ctrl.tier(),
to = ?new_profile.codec,
"auto: quality adapter recommends switch"
);
pending_profile_recv.store(idx, Ordering::Release);
}
}
}
let is_repair = pkt.header.is_repair; let is_repair = pkt.header.is_repair;
let pkt_block = pkt.header.fec_block; let pkt_block = pkt.header.fec_block;
let pkt_symbol = pkt.header.fec_symbol; let pkt_symbol = pkt.header.fec_symbol;
@@ -646,6 +731,13 @@ async fn run_call(
info!(from = ?decoder.codec_id(), to = ?pkt.header.codec_id, "recv: switching decoder"); info!(from = ?decoder.codec_id(), to = ?pkt.header.codec_id, "recv: switching decoder");
let _ = decoder.set_profile(switch_profile); let _ = decoder.set_profile(switch_profile);
} }
// Track peer codec for UI display
if last_peer_codec != Some(pkt.header.codec_id) {
last_peer_codec = Some(pkt.header.codec_id);
if let Ok(mut stats) = state.stats.lock() {
stats.peer_codec = format!("{:?}", pkt.header.codec_id);
}
}
match decoder.decode(&pkt.payload, &mut decode_buf) { match decoder.decode(&pkt.payload, &mut decode_buf) {
Ok(samples) => { Ok(samples) => {
playout_agc.process_frame(&mut decode_buf[..samples]); playout_agc.process_frame(&mut decode_buf[..samples]);

View File

@@ -59,6 +59,12 @@ pub struct CallStats {
pub capture_overflows: u64, pub capture_overflows: u64,
/// Current mic audio level (RMS of i16 samples, 0-32767). /// Current mic audio level (RMS of i16 samples, 0-32767).
pub audio_level: u32, pub audio_level: u32,
/// Our current outgoing codec name (e.g. "Opus24k", "Codec2_1200").
pub current_codec: String,
/// Last seen incoming codec from other participants.
pub peer_codec: String,
/// Whether auto quality mode is active.
pub auto_mode: bool,
/// Number of participants in the room (from last RoomUpdate). /// Number of participants in the room (from last RoomUpdate).
pub room_participant_count: u32, pub room_participant_count: u32,
/// Participant list (fingerprint + optional alias) serialized as JSON array. /// Participant list (fingerprint + optional alias) serialized as JSON array.