diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index 5d6ac42..937ce5a 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -450,8 +450,7 @@ async fn run_silence(transport: Arc) -> anyhow::R } total_bytes += pkt.payload.len() as u64; if let Err(e) = transport.send_media(pkt).await { - error!("send error: {e}"); - break; + warn!("send_media error (dropping packet): {e}"); } } if (i + 1) % 50 == 0 { @@ -536,8 +535,7 @@ async fn run_file_mode( total_source += 1; } if let Err(e) = send_transport.send_media(pkt).await { - error!("send error: {e}"); - return; + warn!("send_media error (dropping packet): {e}"); } } if (frame_idx + 1) % 250 == 0 { @@ -824,6 +822,9 @@ async fn run_live( let mut capture_buf = vec![0i16; FRAME_SAMPLES]; let mut farend_buf = vec![0i16; FRAME_SAMPLES]; let mut frames_sent: u64 = 0; + let mut frames_dropped: u64 = 0; + let mut send_errors: u64 = 0; + let mut last_send_err = std::time::Instant::now(); let mut polls: u64 = 0; let mut last_diag = std::time::Instant::now(); @@ -873,13 +874,23 @@ async fn run_live( }; let encode_us = t0.elapsed().as_micros(); + let mut dropped = false; for pkt in &packets { if let Err(e) = send_transport.send_media(pkt).await { - error!("send error: {e}"); - return; + send_errors += 1; + frames_dropped += 1; + dropped = true; + if send_errors <= 3 || last_send_err.elapsed().as_secs() >= 1 { + warn!(send_errors, frames_dropped, + "send_media error (dropping packet): {e}"); + last_send_err = std::time::Instant::now(); + } } } + if !dropped { + send_errors = 0; // reset on success + } frames_sent += 1; if frames_sent <= 5 || frames_sent % 500 == 0 { info!(frames_sent, encode_us, pkts = packets.len(), "send progress"); @@ -904,6 +915,7 @@ async fn run_live( async move { let mut packets_received: u64 = 0; + let mut recv_errors: u64 = 0; let mut timeouts: u64 = 0; // For direct playout: raw Opus decoder + AGC let mut opus_dec = if direct_playout { @@ -972,8 +984,15 @@ async fn run_live( break; } Ok(Err(e)) => { - error!("recv error: {e}"); - break; + let msg = e.to_string(); + if msg.contains("closed") || msg.contains("reset") { + error!("recv fatal: {e}"); + break; + } + recv_errors += 1; + if recv_errors <= 3 { + warn!("recv error (continuing): {e}"); + } } Err(_) => { timeouts += 1; diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 5f25d93..8d4292d 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -171,6 +171,7 @@ impl CallEngine { let send_mic = mic_muted.clone(); let send_fs = frames_sent.clone(); let send_level = audio_level.clone(); + let send_drops = Arc::new(AtomicU64::new(0)); tokio::spawn(async move { let config = CallConfig { noise_suppression: false, @@ -205,8 +206,11 @@ impl CallEngine { Ok(pkts) => { for pkt in &pkts { if let Err(e) = send_t.send_media(pkt).await { - error!("send: {e}"); - return; + // Transient congestion (Blocked) — drop packet, keep going + send_drops.fetch_add(1, Ordering::Relaxed); + if send_drops.load(Ordering::Relaxed) <= 3 { + tracing::warn!("send_media error (dropping packet): {e}"); + } } } send_fs.fetch_add(1, Ordering::Relaxed); @@ -249,8 +253,12 @@ impl CallEngine { } Ok(Ok(None)) => break, Ok(Err(e)) => { - error!("recv: {e}"); - break; + let msg = e.to_string(); + if msg.contains("closed") || msg.contains("reset") { + error!("recv fatal: {e}"); + break; + } + // Transient error — continue } Err(_) => {} }