From 80d5bd762866432a3700ba97dd3f98394e9894e2 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Mon, 6 Apr 2026 11:48:20 +0400 Subject: [PATCH] =?UTF-8?q?fix:=20survive=20QUIC=20congestion=20=E2=80=94?= =?UTF-8?q?=20drop=20packets=20instead=20of=20killing=20send=20task?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit send_datagram() returns Err(Blocked) when the QUIC congestion window is full. This is transient — the window reopens once ACKs arrive. Previously, all send paths treated this as fatal (break/return), which killed the send task and cascaded via tokio::select! to kill the entire call. Now: log warning, drop the packet, continue. Brief audio glitch (20-100ms) instead of complete call death. FEC on the receiver side recovers most dropped packets. Fixed in: - CLI run_live send task (continue + error counter) - CLI run_file_mode send paths (2 locations) - Desktop engine send task Also hardened recv tasks: transient errors (non-closed/reset) are survived instead of causing exit. Matches the fix applied to Android client (engine.rs). Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/cli.rs | 35 +++++++++++++++++++++++++-------- desktop/src-tauri/src/engine.rs | 16 +++++++++++---- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index 5d6ac42..937ce5a 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -450,8 +450,7 @@ async fn run_silence(transport: Arc) -> anyhow::R } total_bytes += pkt.payload.len() as u64; if let Err(e) = transport.send_media(pkt).await { - error!("send error: {e}"); - break; + warn!("send_media error (dropping packet): {e}"); } } if (i + 1) % 50 == 0 { @@ -536,8 +535,7 @@ async fn run_file_mode( total_source += 1; } if let Err(e) = send_transport.send_media(pkt).await { - error!("send error: {e}"); - return; + warn!("send_media error (dropping packet): {e}"); } } if (frame_idx + 1) % 250 == 0 { @@ -824,6 +822,9 @@ async fn run_live( let mut capture_buf = vec![0i16; FRAME_SAMPLES]; let mut farend_buf = vec![0i16; FRAME_SAMPLES]; let mut frames_sent: u64 = 0; + let mut frames_dropped: u64 = 0; + let mut send_errors: u64 = 0; + let mut last_send_err = std::time::Instant::now(); let mut polls: u64 = 0; let mut last_diag = std::time::Instant::now(); @@ -873,13 +874,23 @@ async fn run_live( }; let encode_us = t0.elapsed().as_micros(); + let mut dropped = false; for pkt in &packets { if let Err(e) = send_transport.send_media(pkt).await { - error!("send error: {e}"); - return; + send_errors += 1; + frames_dropped += 1; + dropped = true; + if send_errors <= 3 || last_send_err.elapsed().as_secs() >= 1 { + warn!(send_errors, frames_dropped, + "send_media error (dropping packet): {e}"); + last_send_err = std::time::Instant::now(); + } } } + if !dropped { + send_errors = 0; // reset on success + } frames_sent += 1; if frames_sent <= 5 || frames_sent % 500 == 0 { info!(frames_sent, encode_us, pkts = packets.len(), "send progress"); @@ -904,6 +915,7 @@ async fn run_live( async move { let mut packets_received: u64 = 0; + let mut recv_errors: u64 = 0; let mut timeouts: u64 = 0; // For direct playout: raw Opus decoder + AGC let mut opus_dec = if direct_playout { @@ -972,8 +984,15 @@ async fn run_live( break; } Ok(Err(e)) => { - error!("recv error: {e}"); - break; + let msg = e.to_string(); + if msg.contains("closed") || msg.contains("reset") { + error!("recv fatal: {e}"); + break; + } + recv_errors += 1; + if recv_errors <= 3 { + warn!("recv error (continuing): {e}"); + } } Err(_) => { timeouts += 1; diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 5f25d93..8d4292d 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -171,6 +171,7 @@ impl CallEngine { let send_mic = mic_muted.clone(); let send_fs = frames_sent.clone(); let send_level = audio_level.clone(); + let send_drops = Arc::new(AtomicU64::new(0)); tokio::spawn(async move { let config = CallConfig { noise_suppression: false, @@ -205,8 +206,11 @@ impl CallEngine { Ok(pkts) => { for pkt in &pkts { if let Err(e) = send_t.send_media(pkt).await { - error!("send: {e}"); - return; + // Transient congestion (Blocked) — drop packet, keep going + send_drops.fetch_add(1, Ordering::Relaxed); + if send_drops.load(Ordering::Relaxed) <= 3 { + tracing::warn!("send_media error (dropping packet): {e}"); + } } } send_fs.fetch_add(1, Ordering::Relaxed); @@ -249,8 +253,12 @@ impl CallEngine { } Ok(Ok(None)) => break, Ok(Err(e)) => { - error!("recv: {e}"); - break; + let msg = e.to_string(); + if msg.contains("closed") || msg.contains("reset") { + error!("recv fatal: {e}"); + break; + } + // Transient error — continue } Err(_) => {} }