From ba12aae43977d1c555c6ead0b7839a97750c3276 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 13 Apr 2026 15:22:44 +0400 Subject: [PATCH] refactor: extract shared engine helpers, federation clone-before-send, constants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Engine deduplication (PRD-engine-dedup.md): - build_call_config(): shared CallConfig construction (was 23 lines × 2) - codec_to_profile(): shared CodecId → QualityProfile mapping (was 19 lines × 2) - run_signal_task(): shared signal handler (was 48 lines × 2) - Net -39 lines from engine.rs, 6 duplicated blocks → single-line calls Quick wins from REFACTOR-codebase-audit.md: - 6 magic number constants extracted (CAPTURE_POLL_MS, RECV_TIMEOUT_MS, etc.) - DRED_POLL_INTERVAL moved from 2 local defs to 1 module-level const - federation.rs: forward_to_peers, broadcast_signal, send_signal_to_peer now clone peer list and release lock before sending (was holding Mutex across async I/O — last lock-during-send pattern eliminated) - main.rs: close_transport() helper replaces 12 silent .ok() calls with debug-level logging 314 tests passing, 0 regressions. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-relay/src/federation.rs | 59 +++--- crates/wzp-relay/src/main.rs | 31 +-- desktop/src-tauri/src/engine.rs | 299 +++++++++++++---------------- docs/PRD-engine-dedup.md | 132 +++++++++++++ 4 files changed, 317 insertions(+), 204 deletions(-) create mode 100644 docs/PRD-engine-dedup.md diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 7b4370e..b632e07 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -213,16 +213,19 @@ impl FederationManager { /// `origin_relay_fp` against its own fp and drops self-sourced /// forwards. pub async fn broadcast_signal(&self, msg: &wzp_proto::SignalMessage) -> usize { - let links = self.peer_links.lock().await; + let peers: Vec<(String, String, Arc)> = { + let links = self.peer_links.lock().await; + links.iter().map(|(fp, l)| (fp.clone(), l.label.clone(), l.transport.clone())).collect() + }; // lock released let mut count = 0; - for (fp, link) in links.iter() { - match link.transport.send_signal(msg).await { + for (fp, label, transport) in &peers { + match transport.send_signal(msg).await { Ok(()) => { count += 1; - tracing::debug!(peer = %link.label, %fp, "federation: broadcast signal ok"); + tracing::debug!(peer = %label, %fp, "federation: broadcast signal ok"); } Err(e) => { - tracing::warn!(peer = %link.label, %fp, error = %e, "federation: broadcast signal failed"); + tracing::warn!(peer = %label, %fp, error = %e, "federation: broadcast signal failed"); } } } @@ -243,10 +246,12 @@ impl FederationManager { msg: &wzp_proto::SignalMessage, ) -> Result<(), String> { let normalized = normalize_fp(peer_relay_fp); - let links = self.peer_links.lock().await; - match links.get(&normalized) { - Some(link) => link - .transport + let transport = { + let links = self.peer_links.lock().await; + links.get(&normalized).map(|l| l.transport.clone()) + }; // lock released + match transport { + Some(t) => t .send_signal(msg) .await .map_err(|e| format!("send to peer {normalized}: {e}")), @@ -403,20 +408,22 @@ impl FederationManager { /// or rate limiting; the body currently forwards on `room_hash` alone /// because that's what the wire format carries. pub async fn forward_to_peers(&self, _room_name: &str, room_hash: &[u8; 8], media_data: &Bytes) { - let links = self.peer_links.lock().await; - if links.is_empty() { - return; - } - for (_fp, link) in links.iter() { + let peers: Vec<(String, Arc)> = { + let links = self.peer_links.lock().await; + if links.is_empty() { return; } + links.values().map(|l| (l.label.clone(), l.transport.clone())).collect() + }; // lock released + + for (label, transport) in &peers { let mut tagged = Vec::with_capacity(8 + media_data.len()); tagged.extend_from_slice(room_hash); tagged.extend_from_slice(media_data); - match link.transport.send_raw_datagram(&tagged) { + match transport.send_raw_datagram(&tagged) { Ok(()) => { self.metrics.federation_packets_forwarded - .with_label_values(&[&link.label, "out"]).inc(); + .with_label_values(&[label, "out"]).inc(); } - Err(e) => warn!(peer = %link.label, "federation send error: {e}"), + Err(e) => warn!(peer = %label, "federation send error: {e}"), } } } @@ -483,9 +490,12 @@ async fn run_room_event_dispatcher( let participants = fm.room_mgr.local_participant_list(&room); info!(room = %room, count = participants.len(), "global room now active, announcing to peers"); let msg = SignalMessage::GlobalRoomActive { room, participants }; - let links = fm.peer_links.lock().await; - for link in links.values() { - let _ = link.transport.send_signal(&msg).await; + let transports: Vec> = { + let links = fm.peer_links.lock().await; + links.values().map(|l| l.transport.clone()).collect() + }; + for t in &transports { + let _ = t.send_signal(&msg).await; } } } @@ -493,9 +503,12 @@ async fn run_room_event_dispatcher( if fm.is_global_room(&room) { info!(room = %room, "global room now inactive, announcing to peers"); let msg = SignalMessage::GlobalRoomInactive { room }; - let links = fm.peer_links.lock().await; - for link in links.values() { - let _ = link.transport.send_signal(&msg).await; + let transports: Vec> = { + let links = fm.peer_links.lock().await; + links.values().map(|l| l.transport.clone()).collect() + }; + for t in &transports { + let _ = t.send_signal(&msg).await; } } } diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 9e16091..0250f60 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -23,6 +23,13 @@ use wzp_relay::presence::PresenceRegistry; use wzp_relay::room::{self, RoomManager}; use wzp_relay::session_mgr::SessionManager; +/// Close a transport gracefully, logging any error at debug level. +async fn close_transport(t: &dyn wzp_proto::MediaTransport, context: &str) { + if let Err(e) = t.close().await { + tracing::debug!(context, error = %e, "transport close (non-fatal)"); + } +} + /// Parsed CLI result — config + identity path. struct CliResult { config: RelayConfig, @@ -908,7 +915,7 @@ async fn main() -> anyhow::Result<()> { } } } - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } @@ -1475,7 +1482,7 @@ async fn main() -> anyhow::Result<()> { reg.unregister_local(&client_fp); } - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } @@ -1499,14 +1506,14 @@ async fn main() -> anyhow::Result<()> { Err(e) => { metrics.auth_attempts.with_label_values(&["fail"]).inc(); error!(%addr, "auth failed: {e}"); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } } } Ok(Some(_)) => { error!(%addr, "expected AuthToken as first signal, got something else"); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } Ok(None) => { @@ -1515,7 +1522,7 @@ async fn main() -> anyhow::Result<()> { } Err(e) => { error!(%addr, "signal recv error during auth: {e}"); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } } @@ -1537,7 +1544,7 @@ async fn main() -> anyhow::Result<()> { } Err(e) => { error!(%addr, "handshake failed: {e}"); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } }; @@ -1561,7 +1568,7 @@ async fn main() -> anyhow::Result<()> { }; if !authorized { warn!(%addr, room = %room_name, fp = %participant_fp, "rejected: not authorized for this call room"); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } info!(%addr, room = %room_name, fp = %participant_fp, "authorized for call room"); @@ -1602,7 +1609,7 @@ async fn main() -> anyhow::Result<()> { tokio::select! { _ = up => {} _ = dn => {} } stats_handle.abort(); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; } else { // Room mode — enforce max sessions, then join room let session_id = { @@ -1611,7 +1618,7 @@ async fn main() -> anyhow::Result<()> { Ok(id) => id, Err(e) => { error!(%addr, room = %room_name, "session rejected: {e}"); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } } @@ -1626,7 +1633,7 @@ async fn main() -> anyhow::Result<()> { metrics.active_sessions.dec(); let mut smgr = session_mgr.lock().await; smgr.remove_session(session_id); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } } @@ -1676,7 +1683,7 @@ async fn main() -> anyhow::Result<()> { metrics.active_sessions.dec(); let mut smgr = session_mgr.lock().await; smgr.remove_session(session_id); - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; return; } } @@ -1731,7 +1738,7 @@ async fn main() -> anyhow::Result<()> { smgr.remove_session(session_id); } - transport.close().await.ok(); + close_transport(&*transport, "cleanup").await; } }); } diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 9d03269..6c079ed 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -30,6 +30,14 @@ use wzp_proto::traits::{AudioDecoder, QualityController}; use wzp_proto::{AdaptiveQualityController, CodecId, MediaTransport, QualityProfile}; const FRAME_SAMPLES_40MS: usize = 1920; +const CAPTURE_POLL_MS: u64 = 5; +const RECV_TIMEOUT_MS: u64 = 100; +const SIGNAL_TIMEOUT_MS: u64 = 200; +#[cfg_attr(not(target_os = "android"), allow(dead_code))] +const CONNECT_TIMEOUT_SECS: u64 = 10; +#[cfg_attr(not(target_os = "android"), allow(dead_code))] +const HEARTBEAT_INTERVAL_SECS: u64 = 2; +const DRED_POLL_INTERVAL: u32 = 25; /// Profile index mapping for the AtomicU8 adaptive-quality bridge. const PROFILE_NO_CHANGE: u8 = 0xFF; @@ -78,6 +86,101 @@ fn resolve_quality(quality: &str) -> Option { } } +/// Build a CallConfig from a quality string. Used by both Android and desktop send tasks. +fn build_call_config(quality: &str) -> CallConfig { + let profile = resolve_quality(quality); + match profile { + Some(p) => CallConfig { + noise_suppression: false, + suppression_enabled: false, + ..CallConfig::from_profile(p) + }, + None => CallConfig { + noise_suppression: false, + suppression_enabled: false, + ..CallConfig::default() + }, + } +} + +/// Map a received codec ID to the corresponding QualityProfile. +/// Used by recv tasks when the peer switches codecs. +fn codec_to_profile(codec: CodecId) -> QualityProfile { + match codec { + CodecId::Opus24k => QualityProfile::GOOD, + CodecId::Opus6k => QualityProfile::DEGRADED, + CodecId::Opus32k => QualityProfile::STUDIO_32K, + CodecId::Opus48k => QualityProfile::STUDIO_48K, + CodecId::Opus64k => QualityProfile::STUDIO_64K, + CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC, + CodecId::Codec2_3200 => QualityProfile { + codec: CodecId::Codec2_3200, + fec_ratio: 0.5, + frame_duration_ms: 20, + frames_per_block: 5, + }, + other => QualityProfile { codec: other, ..QualityProfile::GOOD }, + } +} + +/// Signal handler task -- shared between Android and desktop. +/// Handles RoomUpdate (participant list), QualityDirective (relay-pushed +/// codec switch), and Hangup from the relay signal stream. +async fn run_signal_task( + transport: Arc, + running: Arc, + pending_profile: Arc, + participants: Arc>>, + event_cb: Arc, +) { + loop { + if !running.load(Ordering::Relaxed) { + break; + } + match tokio::time::timeout( + std::time::Duration::from_millis(SIGNAL_TIMEOUT_MS), + transport.recv_signal(), + ) + .await + { + Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate { + participants: parts, + .. + }))) => { + let mut seen = std::collections::HashSet::new(); + let unique: Vec = parts + .into_iter() + .filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone()))) + .map(|p| ParticipantInfo { + fingerprint: p.fingerprint, + alias: p.alias, + relay_label: p.relay_label, + }) + .collect(); + let count = unique.len(); + *participants.lock().await = unique; + event_cb("room-update", &format!("{count} participants")); + } + Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective { + recommended_profile, + reason, + }))) => { + let idx = profile_to_index(&recommended_profile); + info!( + codec = ?recommended_profile.codec, + reason = reason.as_deref().unwrap_or(""), + "relay quality directive: switching profile" + ); + pending_profile.store(idx, Ordering::Release); + } + Ok(Ok(Some(_))) => {} + Ok(Ok(None)) => break, + Ok(Err(_)) => break, + Err(_) => {} + } + } +} + /// Wrapper to make non-Sync audio handles safe to store in shared state. /// The audio handle is only accessed from the thread that created it (drop), /// never shared across threads — Sync is safe. @@ -395,7 +498,7 @@ impl CallEngine { }; let client_config = wzp_transport::client_config(); let conn = match tokio::time::timeout( - std::time::Duration::from_secs(10), + std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS), wzp_transport::connect(&endpoint, relay_addr, &room, client_config), ).await { Ok(Ok(c)) => c, @@ -404,8 +507,8 @@ impl CallEngine { return Err(e.into()); } Err(_) => { - error!("connect TIMED OUT after 10s — QUIC handshake never completed. Relay may be unreachable from this endpoint."); - return Err(anyhow::anyhow!("QUIC connect timeout (10s)")); + error!("connect TIMED OUT after {CONNECT_TIMEOUT_SECS}s — QUIC handshake never completed. Relay may be unreachable from this endpoint."); + return Err(anyhow::anyhow!("QUIC connect timeout ({CONNECT_TIMEOUT_SECS}s)")); } }; info!(t_ms = call_t0.elapsed().as_millis(), "first-join diag: QUIC connection established, performing handshake"); @@ -525,19 +628,7 @@ impl CallEngine { let send_app = app.clone(); let send_pending_profile = pending_profile.clone(); tokio::spawn(async move { - let profile = resolve_quality(&send_quality); - let config = match profile { - Some(p) => CallConfig { - noise_suppression: false, - suppression_enabled: false, - ..CallConfig::from_profile(p) - }, - None => CallConfig { - noise_suppression: false, - suppression_enabled: false, - ..CallConfig::default() - }, - }; + let config = build_call_config(&send_quality); let mut frame_samples = (config.profile.frame_duration_ms as usize) * 48; info!(codec = ?config.profile.codec, frame_samples, t_ms = send_t0.elapsed().as_millis(), "first-join diag: send task spawned (android/oboe)"); *send_tx_codec.lock().await = format!("{:?}", config.profile.codec); @@ -552,7 +643,6 @@ impl CallEngine { // expected-loss hint based on real-time network conditions. let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec); let mut frames_since_dred_poll: u32 = 0; - const DRED_POLL_INTERVAL: u32 = 25; let mut heartbeat = std::time::Instant::now(); let mut last_rms: u32 = 0; @@ -576,7 +666,7 @@ impl CallEngine { // like Opus6k to produce ~11 frames/s instead of 25). if crate::wzp_native::audio_capture_available() < frame_samples { short_reads += 1; - tokio::time::sleep(std::time::Duration::from_millis(5)).await; + tokio::time::sleep(std::time::Duration::from_millis(CAPTURE_POLL_MS)).await; continue; } let read = crate::wzp_native::audio_read_capture(&mut buf[..frame_samples]); @@ -693,7 +783,7 @@ impl CallEngine { } // Heartbeat every 2s with capture+encode+send state - if heartbeat.elapsed() >= std::time::Duration::from_secs(2) { + if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) { let fs = send_fs.load(Ordering::Relaxed); let drops = send_drops.load(Ordering::Relaxed); info!( @@ -810,7 +900,7 @@ impl CallEngine { break; } match tokio::time::timeout( - std::time::Duration::from_millis(100), + std::time::Duration::from_millis(RECV_TIMEOUT_MS), recv_t.recv_media(), ) .await @@ -849,19 +939,7 @@ impl CallEngine { if *rx != codec_name { *rx = codec_name; } } if pkt.header.codec_id != current_codec { - let new_profile = match pkt.header.codec_id { - CodecId::Opus24k => QualityProfile::GOOD, - CodecId::Opus6k => QualityProfile::DEGRADED, - CodecId::Opus32k => QualityProfile::STUDIO_32K, - CodecId::Opus48k => QualityProfile::STUDIO_48K, - CodecId::Opus64k => QualityProfile::STUDIO_64K, - CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC, - CodecId::Codec2_3200 => QualityProfile { - codec: CodecId::Codec2_3200, - fec_ratio: 0.5, frame_duration_ms: 20, frames_per_block: 5, - }, - other => QualityProfile { codec: other, ..QualityProfile::GOOD }, - }; + let new_profile = codec_to_profile(pkt.header.codec_id); info!(from = ?current_codec, to = ?pkt.header.codec_id, "recv: switching decoder"); let _ = decoder.set_profile(new_profile); current_profile = new_profile; @@ -1015,7 +1093,7 @@ impl CallEngine { } // Heartbeat every 2s with decode+playout state - if heartbeat.elapsed() >= std::time::Duration::from_secs(2) { + if heartbeat.elapsed() >= std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS) { let fr = recv_fr.load(Ordering::Relaxed); if wzp_codec::dred_verbose_logs() { info!( @@ -1124,60 +1202,14 @@ impl CallEngine { }); // Signal task (presence + quality directives). - let sig_t = transport.clone(); - let sig_r = running.clone(); - let sig_p = participants.clone(); - let sig_pending_profile = pending_profile.clone(); let event_cb = Arc::new(event_cb); - let sig_cb = event_cb.clone(); - tokio::spawn(async move { - loop { - if !sig_r.load(Ordering::Relaxed) { - break; - } - match tokio::time::timeout( - std::time::Duration::from_millis(200), - sig_t.recv_signal(), - ) - .await - { - Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate { - participants: parts, - .. - }))) => { - let mut seen = std::collections::HashSet::new(); - let unique: Vec = parts - .into_iter() - .filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone()))) - .map(|p| ParticipantInfo { - fingerprint: p.fingerprint, - alias: p.alias, - relay_label: p.relay_label, - }) - .collect(); - let count = unique.len(); - *sig_p.lock().await = unique; - sig_cb("room-update", &format!("{count} participants")); - } - Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective { - recommended_profile, - reason, - }))) => { - let idx = profile_to_index(&recommended_profile); - info!( - codec = ?recommended_profile.codec, - reason = reason.as_deref().unwrap_or(""), - "relay quality directive: switching profile" - ); - sig_pending_profile.store(idx, Ordering::Release); - } - Ok(Ok(Some(_))) => {} - Ok(Ok(None)) => break, - Ok(Err(_)) => break, - Err(_) => {} - } - } - }); + tokio::spawn(run_signal_task( + transport.clone(), + running.clone(), + pending_profile.clone(), + participants.clone(), + event_cb.clone(), + )); Ok(Self { running, @@ -1349,19 +1381,7 @@ impl CallEngine { let send_tx_codec = tx_codec.clone(); let send_pending_profile = pending_profile.clone(); tokio::spawn(async move { - let profile = resolve_quality(&send_quality); - let config = match profile { - Some(p) => CallConfig { - noise_suppression: false, - suppression_enabled: false, - ..CallConfig::from_profile(p) - }, - None => CallConfig { - noise_suppression: false, - suppression_enabled: false, - ..CallConfig::default() - }, - }; + let config = build_call_config(&send_quality); let mut frame_samples = (config.profile.frame_duration_ms as usize) * 48; info!(codec = ?config.profile.codec, frame_samples, "send task starting"); *send_tx_codec.lock().await = format!("{:?}", config.profile.codec); @@ -1372,14 +1392,13 @@ impl CallEngine { // Continuous DRED tuning (same as Android send task). let mut dred_tuner = wzp_proto::DredTuner::new(config.profile.codec); let mut frames_since_dred_poll: u32 = 0; - const DRED_POLL_INTERVAL: u32 = 25; loop { if !send_r.load(Ordering::Relaxed) { break; } if capture_ring.available() < frame_samples { - tokio::time::sleep(std::time::Duration::from_millis(5)).await; + tokio::time::sleep(std::time::Duration::from_millis(CAPTURE_POLL_MS)).await; continue; } capture_ring.read(&mut buf[..frame_samples]); @@ -1470,7 +1489,7 @@ impl CallEngine { break; } match tokio::time::timeout( - std::time::Duration::from_millis(100), + std::time::Duration::from_millis(RECV_TIMEOUT_MS), recv_t.recv_media(), ) .await @@ -1485,19 +1504,7 @@ impl CallEngine { } // Auto-switch decoder if incoming codec differs if pkt.header.codec_id != current_codec { - let new_profile = match pkt.header.codec_id { - CodecId::Opus24k => QualityProfile::GOOD, - CodecId::Opus6k => QualityProfile::DEGRADED, - CodecId::Opus32k => QualityProfile::STUDIO_32K, - CodecId::Opus48k => QualityProfile::STUDIO_48K, - CodecId::Opus64k => QualityProfile::STUDIO_64K, - CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC, - CodecId::Codec2_3200 => QualityProfile { - codec: CodecId::Codec2_3200, - fec_ratio: 0.5, frame_duration_ms: 20, frames_per_block: 5, - }, - other => QualityProfile { codec: other, ..QualityProfile::GOOD }, - }; + let new_profile = codec_to_profile(pkt.header.codec_id); info!(from = ?current_codec, to = ?pkt.header.codec_id, "recv: switching decoder"); let _ = decoder.set_profile(new_profile); current_profile = new_profile; @@ -1560,60 +1567,14 @@ impl CallEngine { }); // Signal task (presence + quality directives) - let sig_t = transport.clone(); - let sig_r = running.clone(); - let sig_p = participants.clone(); - let sig_pending_profile = pending_profile.clone(); let event_cb = Arc::new(event_cb); - let sig_cb = event_cb.clone(); - tokio::spawn(async move { - loop { - if !sig_r.load(Ordering::Relaxed) { - break; - } - match tokio::time::timeout( - std::time::Duration::from_millis(200), - sig_t.recv_signal(), - ) - .await - { - Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate { - participants: parts, - .. - }))) => { - let mut seen = std::collections::HashSet::new(); - let unique: Vec = parts - .into_iter() - .filter(|p| seen.insert((p.fingerprint.clone(), p.alias.clone()))) - .map(|p| ParticipantInfo { - fingerprint: p.fingerprint, - alias: p.alias, - relay_label: p.relay_label, - }) - .collect(); - let count = unique.len(); - *sig_p.lock().await = unique; - sig_cb("room-update", &format!("{count} participants")); - } - Ok(Ok(Some(wzp_proto::SignalMessage::QualityDirective { - recommended_profile, - reason, - }))) => { - let idx = profile_to_index(&recommended_profile); - info!( - codec = ?recommended_profile.codec, - reason = reason.as_deref().unwrap_or(""), - "relay quality directive: switching profile" - ); - sig_pending_profile.store(idx, Ordering::Release); - } - Ok(Ok(Some(_))) => {} - Ok(Ok(None)) => break, - Ok(Err(_)) => break, - Err(_) => {} - } - } - }); + tokio::spawn(run_signal_task( + transport.clone(), + running.clone(), + pending_profile.clone(), + participants.clone(), + event_cb.clone(), + )); Ok(Self { running, diff --git a/docs/PRD-engine-dedup.md b/docs/PRD-engine-dedup.md new file mode 100644 index 0000000..0a98461 --- /dev/null +++ b/docs/PRD-engine-dedup.md @@ -0,0 +1,132 @@ +# PRD: Engine.rs Deduplication — Extract Shared Send/Recv Helpers + +## Problem + +`desktop/src-tauri/src/engine.rs` is 1,705 lines with two nearly identical `CallEngine::start()` implementations — one for Android (880 lines) and one for desktop (430 lines). ~350 lines are copy-pasted between them. Every change to the encode/decode/adaptive-quality pipeline requires editing both places, and they've already diverged in subtle ways (Android has extensive first-join diagnostics that desktop lacks). + +## Scope + +Extract the duplicated logic into shared helper functions. The Android and desktop paths should only differ in their audio I/O mechanism (Oboe ring via wzp-native vs CPAL capture_ring/playout_ring). + +## What's Duplicated + +| Block | Description | Lines (each) | +|-------|-------------|------| +| `build_call_config()` | Resolve quality string → CallConfig | 23 | +| Codec-to-profile match | Map CodecId → QualityProfile for decoder switch | 19 | +| Adaptive quality switch | Read AtomicU8, index_to_profile, set_profile, update frame_samples + dred_tuner | 15 | +| DRED tuner poll | Check frame counter, poll quinn stats, apply tuning | 15 | +| Quality report ingestion | Extract quality_report, feed to AdaptiveQualityController, store to AtomicU8 | 8 | +| Signal task | Accept signals, handle RoomUpdate/QualityDirective/Hangup | 48 | +| **Total** | | **~128 lines × 2 = 256 lines eliminated** | + +## Implementation + +### Phase 1: Top-Level Helper Functions + +```rust +fn build_call_config(quality: &str) -> CallConfig { + let profile = resolve_quality(quality); + match profile { + Some(p) => CallConfig { + noise_suppression: false, + suppression_enabled: false, + ..CallConfig::from_profile(p) + }, + None => CallConfig { + noise_suppression: false, + suppression_enabled: false, + ..CallConfig::default() + }, + } +} + +fn codec_to_profile(codec: CodecId) -> QualityProfile { + match codec { + CodecId::Opus24k => QualityProfile::GOOD, + CodecId::Opus6k => QualityProfile::DEGRADED, + CodecId::Opus32k => QualityProfile::STUDIO_32K, + CodecId::Opus48k => QualityProfile::STUDIO_48K, + CodecId::Opus64k => QualityProfile::STUDIO_64K, + CodecId::Codec2_1200 => QualityProfile::CATASTROPHIC, + CodecId::Codec2_3200 => QualityProfile { + codec: CodecId::Codec2_3200, + fec_ratio: 0.5, + frame_duration_ms: 20, + frames_per_block: 5, + }, + other => QualityProfile { codec: other, ..QualityProfile::GOOD }, + } +} + +fn check_adaptive_switch( + pending: &AtomicU8, + encoder: &mut CallEncoder, + tuner: &mut wzp_proto::DredTuner, + frame_samples: &mut usize, + tx_codec: &tokio::sync::Mutex, +) -> bool { + let p = pending.swap(PROFILE_NO_CHANGE, Ordering::Acquire); + if p == PROFILE_NO_CHANGE { return false; } + if let Some(new_profile) = index_to_profile(p) { + let new_fs = (new_profile.frame_duration_ms as usize) * 48; + if encoder.set_profile(new_profile).is_ok() { + *frame_samples = new_fs; + tuner.set_codec(new_profile.codec); + // Caller updates tx_codec display string + return true; + } + } + false +} +``` + +### Phase 2: Shared Signal Task + +Extract the signal task into a standalone async function: + +```rust +async fn run_signal_task( + transport: Arc, + running: Arc, + pending_profile: Arc, + participants: Arc>>, +) { + loop { + if !running.load(Ordering::Relaxed) { break; } + match tokio::time::timeout( + Duration::from_millis(SIGNAL_TIMEOUT_MS), + transport.recv_signal(), + ).await { + Ok(Ok(Some(msg))) => { + // Handle RoomUpdate, QualityDirective, Hangup... + } + _ => {} + } + } +} +``` + +### Phase 3: Shared DRED Poll + Quality Ingestion + +These are small blocks but appear in both send and recv tasks. Extract as inline helpers or closures. + +## Verification + +1. `cargo check --workspace` — must compile +2. `cargo test -p wzp-proto -p wzp-relay -p wzp-client --lib` — must pass +3. Manual test: place a call Android↔Desktop, verify audio works in both directions +4. Verify adaptive quality still switches (set one side to auto, degrade network) + +## Effort + +- Phase 1: 1 hour (extract 3 functions, update 6 call sites) +- Phase 2: 30 min (extract signal task, update 2 spawn sites) +- Phase 3: 30 min (cleanup remaining small duplicates) +- Total: ~2 hours + +## Not In Scope + +- Audio I/O trait abstraction (Oboe vs CPAL) — different project, different risk profile +- Moving Android-specific diagnostics (first-join, PCM recorder) into a feature flag +- Splitting engine.rs into multiple files