refactor: extract shared engine helpers, federation clone-before-send, constants
Some checks failed
Mirror to GitHub / mirror (push) Failing after 30s
Build Release Binaries / build-amd64 (push) Failing after 3m48s

Engine deduplication (PRD-engine-dedup.md):
- build_call_config(): shared CallConfig construction (was 23 lines × 2)
- codec_to_profile(): shared CodecId → QualityProfile mapping (was 19 lines × 2)
- run_signal_task(): shared signal handler (was 48 lines × 2)
- Net -39 lines from engine.rs, 6 duplicated blocks → single-line calls

Quick wins from REFACTOR-codebase-audit.md:
- 6 magic number constants extracted (CAPTURE_POLL_MS, RECV_TIMEOUT_MS, etc.)
- DRED_POLL_INTERVAL moved from 2 local defs to 1 module-level const
- federation.rs: forward_to_peers, broadcast_signal, send_signal_to_peer
  now clone peer list and release lock before sending (was holding Mutex
  across async I/O — last lock-during-send pattern eliminated)
- main.rs: close_transport() helper replaces 12 silent .ok() calls with
  debug-level logging

314 tests passing, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-13 15:22:44 +04:00
parent fdb78e08bd
commit ba12aae439
4 changed files with 317 additions and 204 deletions

View File

@@ -213,16 +213,19 @@ impl FederationManager {
/// `origin_relay_fp` against its own fp and drops self-sourced
/// forwards.
pub async fn broadcast_signal(&self, msg: &wzp_proto::SignalMessage) -> usize {
let peers: Vec<(String, String, Arc<QuinnTransport>)> = {
let links = self.peer_links.lock().await;
links.iter().map(|(fp, l)| (fp.clone(), l.label.clone(), l.transport.clone())).collect()
}; // lock released
let mut count = 0;
for (fp, link) in links.iter() {
match link.transport.send_signal(msg).await {
for (fp, label, transport) in &peers {
match transport.send_signal(msg).await {
Ok(()) => {
count += 1;
tracing::debug!(peer = %link.label, %fp, "federation: broadcast signal ok");
tracing::debug!(peer = %label, %fp, "federation: broadcast signal ok");
}
Err(e) => {
tracing::warn!(peer = %link.label, %fp, error = %e, "federation: broadcast signal failed");
tracing::warn!(peer = %label, %fp, error = %e, "federation: broadcast signal failed");
}
}
}
@@ -243,10 +246,12 @@ impl FederationManager {
msg: &wzp_proto::SignalMessage,
) -> Result<(), String> {
let normalized = normalize_fp(peer_relay_fp);
let transport = {
let links = self.peer_links.lock().await;
match links.get(&normalized) {
Some(link) => link
.transport
links.get(&normalized).map(|l| l.transport.clone())
}; // lock released
match transport {
Some(t) => t
.send_signal(msg)
.await
.map_err(|e| format!("send to peer {normalized}: {e}")),
@@ -403,20 +408,22 @@ impl FederationManager {
/// or rate limiting; the body currently forwards on `room_hash` alone
/// because that's what the wire format carries.
pub async fn forward_to_peers(&self, _room_name: &str, room_hash: &[u8; 8], media_data: &Bytes) {
let peers: Vec<(String, Arc<QuinnTransport>)> = {
let links = self.peer_links.lock().await;
if links.is_empty() {
return;
}
for (_fp, link) in links.iter() {
if links.is_empty() { return; }
links.values().map(|l| (l.label.clone(), l.transport.clone())).collect()
}; // lock released
for (label, transport) in &peers {
let mut tagged = Vec::with_capacity(8 + media_data.len());
tagged.extend_from_slice(room_hash);
tagged.extend_from_slice(media_data);
match link.transport.send_raw_datagram(&tagged) {
match transport.send_raw_datagram(&tagged) {
Ok(()) => {
self.metrics.federation_packets_forwarded
.with_label_values(&[&link.label, "out"]).inc();
.with_label_values(&[label, "out"]).inc();
}
Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
Err(e) => warn!(peer = %label, "federation send error: {e}"),
}
}
}
@@ -483,9 +490,12 @@ async fn run_room_event_dispatcher(
let participants = fm.room_mgr.local_participant_list(&room);
info!(room = %room, count = participants.len(), "global room now active, announcing to peers");
let msg = SignalMessage::GlobalRoomActive { room, participants };
let transports: Vec<Arc<QuinnTransport>> = {
let links = fm.peer_links.lock().await;
for link in links.values() {
let _ = link.transport.send_signal(&msg).await;
links.values().map(|l| l.transport.clone()).collect()
};
for t in &transports {
let _ = t.send_signal(&msg).await;
}
}
}
@@ -493,9 +503,12 @@ async fn run_room_event_dispatcher(
if fm.is_global_room(&room) {
info!(room = %room, "global room now inactive, announcing to peers");
let msg = SignalMessage::GlobalRoomInactive { room };
let transports: Vec<Arc<QuinnTransport>> = {
let links = fm.peer_links.lock().await;
for link in links.values() {
let _ = link.transport.send_signal(&msg).await;
links.values().map(|l| l.transport.clone()).collect()
};
for t in &transports {
let _ = t.send_signal(&msg).await;
}
}
}

View File

@@ -23,6 +23,13 @@ use wzp_relay::presence::PresenceRegistry;
use wzp_relay::room::{self, RoomManager};
use wzp_relay::session_mgr::SessionManager;
/// Close a transport gracefully, logging any error at debug level.
async fn close_transport(t: &dyn wzp_proto::MediaTransport, context: &str) {
if let Err(e) = t.close().await {
tracing::debug!(context, error = %e, "transport close (non-fatal)");
}
}
/// Parsed CLI result — config + identity path.
struct CliResult {
config: RelayConfig,
@@ -908,7 +915,7 @@ async fn main() -> anyhow::Result<()> {
}
}
}
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
@@ -1475,7 +1482,7 @@ async fn main() -> anyhow::Result<()> {
reg.unregister_local(&client_fp);
}
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
@@ -1499,14 +1506,14 @@ async fn main() -> anyhow::Result<()> {
Err(e) => {
metrics.auth_attempts.with_label_values(&["fail"]).inc();
error!(%addr, "auth failed: {e}");
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
}
}
Ok(Some(_)) => {
error!(%addr, "expected AuthToken as first signal, got something else");
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
Ok(None) => {
@@ -1515,7 +1522,7 @@ async fn main() -> anyhow::Result<()> {
}
Err(e) => {
error!(%addr, "signal recv error during auth: {e}");
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
}
@@ -1537,7 +1544,7 @@ async fn main() -> anyhow::Result<()> {
}
Err(e) => {
error!(%addr, "handshake failed: {e}");
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
};
@@ -1561,7 +1568,7 @@ async fn main() -> anyhow::Result<()> {
};
if !authorized {
warn!(%addr, room = %room_name, fp = %participant_fp, "rejected: not authorized for this call room");
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
info!(%addr, room = %room_name, fp = %participant_fp, "authorized for call room");
@@ -1602,7 +1609,7 @@ async fn main() -> anyhow::Result<()> {
tokio::select! { _ = up => {} _ = dn => {} }
stats_handle.abort();
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
} else {
// Room mode — enforce max sessions, then join room
let session_id = {
@@ -1611,7 +1618,7 @@ async fn main() -> anyhow::Result<()> {
Ok(id) => id,
Err(e) => {
error!(%addr, room = %room_name, "session rejected: {e}");
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
}
@@ -1626,7 +1633,7 @@ async fn main() -> anyhow::Result<()> {
metrics.active_sessions.dec();
let mut smgr = session_mgr.lock().await;
smgr.remove_session(session_id);
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
}
@@ -1676,7 +1683,7 @@ async fn main() -> anyhow::Result<()> {
metrics.active_sessions.dec();
let mut smgr = session_mgr.lock().await;
smgr.remove_session(session_id);
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
return;
}
}
@@ -1731,7 +1738,7 @@ async fn main() -> anyhow::Result<()> {
smgr.remove_session(session_id);
}
transport.close().await.ok();
close_transport(&*transport, "cleanup").await;
}
});
}

View File

@@ -30,6 +30,14 @@ use wzp_proto::traits::{AudioDecoder, QualityController};
use wzp_proto::{AdaptiveQualityController, CodecId, MediaTransport, QualityProfile};
const FRAME_SAMPLES_40MS: usize = 1920;
const CAPTURE_POLL_MS: u64 = 5;
const RECV_TIMEOUT_MS: u64 = 100;
const SIGNAL_TIMEOUT_MS: u64 = 200;
#[cfg_attr(not(target_os = "android"), allow(dead_code))]
const CONNECT_TIMEOUT_SECS: u64 = 10;
#[cfg_attr(not(target_os = "android"), allow(dead_code))]
const HEARTBEAT_INTERVAL_SECS: u64 = 2;
const DRED_POLL_INTERVAL: u32 = 25;
/// Profile index mapping for the AtomicU8 adaptive-quality bridge.
const PROFILE_NO_CHANGE: u8 = 0xFF;
@@ -78,6 +86,101 @@ fn resolve_quality(quality: &str) -> Option<QualityProfile> {
}
}
/// Build a CallConfig from a quality string. Used by both Android and desktop send tasks.
fn build_call_config(quality: &str) -> CallConfig {
let profile = resolve_quality(quality);
match profile {
Some(p) => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::from_profile(p)
},
None => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::default()
},
}
}
/// Map a received codec ID to the corresponding QualityProfile.
/// Used by recv tasks when the peer switches codecs.
fn codec_to_profile(codec: CodecId) -> QualityProfile {
match codec {
CodecId::Opus24k => QualityProfile::GOOD,
CodecId::Opus6k => QualityProfile::DEGRADED,
CodecId::Opus32k => QualityProfile::STUDIO_32K,
CodecId::Opus48k => QualityProfile::STUDIO_48K,
CodecId::Opus64k => QualityProfile::STUDIO_64K,
CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC,
CodecId::Codec2_3200 => QualityProfile {
codec: CodecId::Codec2_3200,
fec_ratio: 0.5,
frame_duration_ms: 20,
frames_per_block: 5,
},
other => QualityProfile { codec: other, ..QualityProfile::GOOD },
}
}
/// Signal handler task -- shared between Android and desktop.
/// Handles RoomUpdate (participant list), QualityDirective (relay-pushed
/// codec switch), and Hangup from the relay signal stream.
async fn run_signal_task(
transport: Arc<wzp_transport::QuinnTransport>,
running: Arc<AtomicBool>,
pending_profile: Arc<AtomicU8>,
participants: Arc<Mutex<Vec<ParticipantInfo>>>,
event_cb: Arc<dyn Fn(&str, &str) + Send + Sync>,
) {
loop {
if !running.load(Ordering::Relaxed) {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(SIGNAL_TIMEOUT_MS),
transport.recv_signal(),
)
.await
{
Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate {
participants: parts,
..
}))) => {
let mut seen = std::collections::HashSet::new();
let unique: Vec<ParticipantInfo> = parts
.into_iter()
.filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone())))
.map(|p| ParticipantInfo {
fingerprint: p.fingerprint,
alias: p.alias,
relay_label: p.relay_label,
})
.collect();
let count = unique.len();
*participants.lock().await = unique;
event_cb("room-update", &format!("{count} participants"));
}
Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
}))) => {
let idx = profile_to_index(&recommended_profile);
info!(
codec = ?recommended_profile.codec,
reason = reason.as_deref().unwrap_or(""),
"relay quality directive: switching profile"
);
pending_profile.store(idx, Ordering::Release);
}
Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break,
Ok(Err(_)) => break,
Err(_) => {}
}
}
}
/// Wrapper to make non-Sync audio handles safe to store in shared state.
/// The audio handle is only accessed from the thread that created it (drop),
/// never shared across threads — Sync is safe.
@@ -395,7 +498,7 @@ impl CallEngine {
};
let client_config = wzp_transport::client_config();
let conn = match tokio::time::timeout(
std::time::Duration::from_secs(10),
std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS),
wzp_transport::connect(&endpoint, relay_addr, &room, client_config),
).await {
Ok(Ok(c)) => c,
@@ -404,8 +507,8 @@ impl CallEngine {
return Err(e.into());
}
Err(_) => {
error!("connect TIMED OUT after 10s — QUIC handshake never completed. Relay may be unreachable from this endpoint.");
return Err(anyhow::anyhow!("QUIC connect timeout (10s)"));
error!("connect TIMED OUT after {CONNECT_TIMEOUT_SECS}s — QUIC handshake never completed. Relay may be unreachable from this endpoint.");
return Err(anyhow::anyhow!("QUIC connect timeout ({CONNECT_TIMEOUT_SECS}s)"));
}
};
info!(t_ms = call_t0.elapsed().as_millis(), "first-join diag: QUIC connection established, performing handshake");
@@ -525,19 +628,7 @@ impl CallEngine {
let send_app = app.clone();
let send_pending_profile = pending_profile.clone();
tokio::spawn(async move {
let profile = resolve_quality(&send_quality);
let config = match profile {
Some(p) => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::from_profile(p)
},
None => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::default()
},
};
let config = build_call_config(&send_quality);
let mut frame_samples = (config.profile.frame_duration_ms as usize) * 48;
info!(codec = ?config.profile.codec, frame_samples, t_ms = send_t0.elapsed().as_millis(), "first-join diag: send task spawned (android/oboe)");
*send_tx_codec.lock().await = format!("{:?}", config.profile.codec);
@@ -552,7 +643,6 @@ impl CallEngine {
// expected-loss hint based on real-time network conditions.
let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec);
let mut frames_since_dred_poll: u32 = 0;
const DRED_POLL_INTERVAL: u32 = 25;
let mut heartbeat = std::time::Instant::now();
let mut last_rms: u32 = 0;
@@ -576,7 +666,7 @@ impl CallEngine {
// like Opus6k to produce ~11 frames/s instead of 25).
if crate::wzp_native::audio_capture_available() < frame_samples {
short_reads += 1;
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
tokio::time::sleep(std::time::Duration::from_millis(CAPTURE_POLL_MS)).await;
continue;
}
let read = crate::wzp_native::audio_read_capture(&mut buf[..frame_samples]);
@@ -693,7 +783,7 @@ impl CallEngine {
}
// Heartbeat every 2s with capture+encode+send state
if heartbeat.elapsed() >= std::time::Duration::from_secs(2) {
if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) {
let fs = send_fs.load(Ordering::Relaxed);
let drops = send_drops.load(Ordering::Relaxed);
info!(
@@ -810,7 +900,7 @@ impl CallEngine {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(100),
std::time::Duration::from_millis(RECV_TIMEOUT_MS),
recv_t.recv_media(),
)
.await
@@ -849,19 +939,7 @@ impl CallEngine {
if *rx != codec_name { *rx = codec_name; }
}
if pkt.header.codec_id != current_codec {
let new_profile = match pkt.header.codec_id {
CodecId::Opus24k => QualityProfile::GOOD,
CodecId::Opus6k => QualityProfile::DEGRADED,
CodecId::Opus32k => QualityProfile::STUDIO_32K,
CodecId::Opus48k => QualityProfile::STUDIO_48K,
CodecId::Opus64k => QualityProfile::STUDIO_64K,
CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC,
CodecId::Codec2_3200 => QualityProfile {
codec: CodecId::Codec2_3200,
fec_ratio: 0.5, frame_duration_ms: 20, frames_per_block: 5,
},
other => QualityProfile { codec: other, ..QualityProfile::GOOD },
};
let new_profile = codec_to_profile(pkt.header.codec_id);
info!(from = ?current_codec, to = ?pkt.header.codec_id, "recv: switching decoder");
let _ = decoder.set_profile(new_profile);
current_profile = new_profile;
@@ -1015,7 +1093,7 @@ impl CallEngine {
}
// Heartbeat every 2s with decode+playout state
if heartbeat.elapsed() >= std::time::Duration::from_secs(2) {
if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) {
let fr = recv_fr.load(Ordering::Relaxed);
if wzp_codec::dred_verbose_logs() {
info!(
@@ -1124,60 +1202,14 @@ impl CallEngine {
});
// Signal task (presence + quality directives).
let sig_t = transport.clone();
let sig_r = running.clone();
let sig_p = participants.clone();
let sig_pending_profile = pending_profile.clone();
let event_cb = Arc::new(event_cb);
let sig_cb = event_cb.clone();
tokio::spawn(async move {
loop {
if !sig_r.load(Ordering::Relaxed) {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(200),
sig_t.recv_signal(),
)
.await
{
Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate {
participants: parts,
..
}))) => {
let mut seen = std::collections::HashSet::new();
let unique: Vec<ParticipantInfo> = parts
.into_iter()
.filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone())))
.map(|p| ParticipantInfo {
fingerprint: p.fingerprint,
alias: p.alias,
relay_label: p.relay_label,
})
.collect();
let count = unique.len();
*sig_p.lock().await = unique;
sig_cb("room-update", &format!("{count} participants"));
}
Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
}))) => {
let idx = profile_to_index(&recommended_profile);
info!(
codec = ?recommended_profile.codec,
reason = reason.as_deref().unwrap_or(""),
"relay quality directive: switching profile"
);
sig_pending_profile.store(idx, Ordering::Release);
}
Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break,
Ok(Err(_)) => break,
Err(_) => {}
}
}
});
tokio::spawn(run_signal_task(
transport.clone(),
running.clone(),
pending_profile.clone(),
participants.clone(),
event_cb.clone(),
));
Ok(Self {
running,
@@ -1349,19 +1381,7 @@ impl CallEngine {
let send_tx_codec = tx_codec.clone();
let send_pending_profile = pending_profile.clone();
tokio::spawn(async move {
let profile = resolve_quality(&send_quality);
let config = match profile {
Some(p) => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::from_profile(p)
},
None => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::default()
},
};
let config = build_call_config(&send_quality);
let mut frame_samples = (config.profile.frame_duration_ms as usize) * 48;
info!(codec = ?config.profile.codec, frame_samples, "send task starting");
*send_tx_codec.lock().await = format!("{:?}", config.profile.codec);
@@ -1372,14 +1392,13 @@ impl CallEngine {
// Continuous DRED tuning (same as Android send task).
let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec);
let mut frames_since_dred_poll: u32 = 0;
const DRED_POLL_INTERVAL: u32 = 25;
loop {
if !send_r.load(Ordering::Relaxed) {
break;
}
if capture_ring.available() < frame_samples {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
tokio::time::sleep(std::time::Duration::from_millis(CAPTURE_POLL_MS)).await;
continue;
}
capture_ring.read(&mut buf[..frame_samples]);
@@ -1470,7 +1489,7 @@ impl CallEngine {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(100),
std::time::Duration::from_millis(RECV_TIMEOUT_MS),
recv_t.recv_media(),
)
.await
@@ -1485,19 +1504,7 @@ impl CallEngine {
}
// Auto-switch decoder if incoming codec differs
if pkt.header.codec_id != current_codec {
let new_profile = match pkt.header.codec_id {
CodecId::Opus24k => QualityProfile::GOOD,
CodecId::Opus6k => QualityProfile::DEGRADED,
CodecId::Opus32k => QualityProfile::STUDIO_32K,
CodecId::Opus48k => QualityProfile::STUDIO_48K,
CodecId::Opus64k => QualityProfile::STUDIO_64K,
CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC,
CodecId::Codec2_3200 => QualityProfile {
codec: CodecId::Codec2_3200,
fec_ratio: 0.5, frame_duration_ms: 20, frames_per_block: 5,
},
other => QualityProfile { codec: other, ..QualityProfile::GOOD },
};
let new_profile = codec_to_profile(pkt.header.codec_id);
info!(from = ?current_codec, to = ?pkt.header.codec_id, "recv: switching decoder");
let _ = decoder.set_profile(new_profile);
current_profile = new_profile;
@@ -1560,60 +1567,14 @@ impl CallEngine {
});
// Signal task (presence + quality directives)
let sig_t = transport.clone();
let sig_r = running.clone();
let sig_p = participants.clone();
let sig_pending_profile = pending_profile.clone();
let event_cb = Arc::new(event_cb);
let sig_cb = event_cb.clone();
tokio::spawn(async move {
loop {
if !sig_r.load(Ordering::Relaxed) {
break;
}
match tokio::time::timeout(
std::time::Duration::from_millis(200),
sig_t.recv_signal(),
)
.await
{
Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate {
participants: parts,
..
}))) => {
let mut seen = std::collections::HashSet::new();
let unique: Vec<ParticipantInfo> = parts
.into_iter()
.filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone())))
.map(|p| ParticipantInfo {
fingerprint: p.fingerprint,
alias: p.alias,
relay_label: p.relay_label,
})
.collect();
let count = unique.len();
*sig_p.lock().await = unique;
sig_cb("room-update", &format!("{count} participants"));
}
Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
}))) => {
let idx = profile_to_index(&recommended_profile);
info!(
codec = ?recommended_profile.codec,
reason = reason.as_deref().unwrap_or(""),
"relay quality directive: switching profile"
);
sig_pending_profile.store(idx, Ordering::Release);
}
Ok(Ok(Some(_))) => {}
Ok(Ok(None)) => break,
Ok(Err(_)) => break,
Err(_) => {}
}
}
});
tokio::spawn(run_signal_task(
transport.clone(),
running.clone(),
pending_profile.clone(),
participants.clone(),
event_cb.clone(),
));
Ok(Self {
running,

132
docs/PRD-engine-dedup.md Normal file
View File

@@ -0,0 +1,132 @@
# PRD: Engine.rs Deduplication — Extract Shared Send/Recv Helpers
## Problem
`desktop/src-tauri/src/engine.rs` is 1,705 lines with two nearly identical `CallEngine::start()` implementations — one for Android (880 lines) and one for desktop (430 lines). ~350 lines are copy-pasted between them. Every change to the encode/decode/adaptive-quality pipeline requires editing both places, and they've already diverged in subtle ways (Android has extensive first-join diagnostics that desktop lacks).
## Scope
Extract the duplicated logic into shared helper functions. The Android and desktop paths should only differ in their audio I/O mechanism (Oboe ring via wzp-native vs CPAL capture_ring/playout_ring).
## What's Duplicated
| Block | Description | Lines (each) |
|-------|-------------|------|
| `build_call_config()` | Resolve quality string → CallConfig | 23 |
| Codec-to-profile match | Map CodecId → QualityProfile for decoder switch | 19 |
| Adaptive quality switch | Read AtomicU8, index_to_profile, set_profile, update frame_samples + dred_tuner | 15 |
| DRED tuner poll | Check frame counter, poll quinn stats, apply tuning | 15 |
| Quality report ingestion | Extract quality_report, feed to AdaptiveQualityController, store to AtomicU8 | 8 |
| Signal task | Accept signals, handle RoomUpdate/QualityDirective/Hangup | 48 |
| **Total** | | **~128 lines × 2 = 256 lines eliminated** |
## Implementation
### Phase 1: Top-Level Helper Functions
```rust
fn build_call_config(quality: &str) -> CallConfig {
let profile = resolve_quality(quality);
match profile {
Some(p) => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::from_profile(p)
},
None => CallConfig {
noise_suppression: false,
suppression_enabled: false,
..CallConfig::default()
},
}
}
fn codec_to_profile(codec: CodecId) -> QualityProfile {
match codec {
CodecId::Opus24k => QualityProfile::GOOD,
CodecId::Opus6k => QualityProfile::DEGRADED,
CodecId::Opus32k => QualityProfile::STUDIO_32K,
CodecId::Opus48k => QualityProfile::STUDIO_48K,
CodecId::Opus64k => QualityProfile::STUDIO_64K,
CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC,
CodecId::Codec2_3200 => QualityProfile {
codec: CodecId::Codec2_3200,
fec_ratio: 0.5,
frame_duration_ms: 20,
frames_per_block: 5,
},
other => QualityProfile { codec: other, ..QualityProfile::GOOD },
}
}
fn check_adaptive_switch(
pending: &AtomicU8,
encoder: &mut CallEncoder,
tuner: &mut wzp_proto::DredTuner,
frame_samples: &mut usize,
tx_codec: &tokio::sync::Mutex<String>,
) -> bool {
let p = pending.swap(PROFILE_NO_CHANGE, Ordering::Acquire);
if p == PROFILE_NO_CHANGE { return false; }
if let Some(new_profile) = index_to_profile(p) {
let new_fs = (new_profile.frame_duration_ms as usize) * 48;
if encoder.set_profile(new_profile).is_ok() {
*frame_samples = new_fs;
tuner.set_codec(new_profile.codec);
// Caller updates tx_codec display string
return true;
}
}
false
}
```
### Phase 2: Shared Signal Task
Extract the signal task into a standalone async function:
```rust
async fn run_signal_task(
transport: Arc<wzp_transport::QuinnTransport>,
running: Arc<AtomicBool>,
pending_profile: Arc<AtomicU8>,
participants: Arc<Mutex<Vec<ParticipantInfo>>>,
) {
loop {
if !running.load(Ordering::Relaxed) { break; }
match tokio::time::timeout(
Duration::from_millis(SIGNAL_TIMEOUT_MS),
transport.recv_signal(),
).await {
Ok(Ok(Some(msg))) => {
// Handle RoomUpdate, QualityDirective, Hangup...
}
_ => {}
}
}
}
```
### Phase 3: Shared DRED Poll + Quality Ingestion
These are small blocks but appear in both send and recv tasks. Extract as inline helpers or closures.
## Verification
1. `cargo check --workspace` — must compile
2. `cargo test -p wzp-proto -p wzp-relay -p wzp-client --lib` — must pass
3. Manual test: place a call Android↔Desktop, verify audio works in both directions
4. Verify adaptive quality still switches (set one side to auto, degrade network)
## Effort
- Phase 1: 1 hour (extract 3 functions, update 6 call sites)
- Phase 2: 30 min (extract signal task, update 2 spawn sites)
- Phase 3: 30 min (cleanup remaining small duplicates)
- Total: ~2 hours
## Not In Scope
- Audio I/O trait abstraction (Oboe vs CPAL) — different project, different risk profile
- Moving Android-specific diagnostics (first-join, PCM recorder) into a feature flag
- Splitting engine.rs into multiple files