fix: survive QUIC congestion — drop packets instead of killing send task
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m14s
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m14s
send_datagram() returns Err(Blocked) when the QUIC congestion window is full. This is transient — the window reopens once ACKs arrive. Previously, all send paths treated this as fatal (break/return), which killed the send task and cascaded via tokio::select! to kill the entire call. Now: log warning, drop the packet, continue. Brief audio glitch (20-100ms) instead of complete call death. FEC on the receiver side recovers most dropped packets. Fixed in: - CLI run_live send task (continue + error counter) - CLI run_file_mode send paths (2 locations) - Desktop engine send task Also hardened recv tasks: transient errors (non-closed/reset) are survived instead of causing exit. Matches the fix applied to Android client (engine.rs). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -450,8 +450,7 @@ async fn run_silence(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::R
|
|||||||
}
|
}
|
||||||
total_bytes += pkt.payload.len() as u64;
|
total_bytes += pkt.payload.len() as u64;
|
||||||
if let Err(e) = transport.send_media(pkt).await {
|
if let Err(e) = transport.send_media(pkt).await {
|
||||||
error!("send error: {e}");
|
warn!("send_media error (dropping packet): {e}");
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (i + 1) % 50 == 0 {
|
if (i + 1) % 50 == 0 {
|
||||||
@@ -536,8 +535,7 @@ async fn run_file_mode(
|
|||||||
total_source += 1;
|
total_source += 1;
|
||||||
}
|
}
|
||||||
if let Err(e) = send_transport.send_media(pkt).await {
|
if let Err(e) = send_transport.send_media(pkt).await {
|
||||||
error!("send error: {e}");
|
warn!("send_media error (dropping packet): {e}");
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (frame_idx + 1) % 250 == 0 {
|
if (frame_idx + 1) % 250 == 0 {
|
||||||
@@ -824,6 +822,9 @@ async fn run_live(
|
|||||||
let mut capture_buf = vec![0i16; FRAME_SAMPLES];
|
let mut capture_buf = vec![0i16; FRAME_SAMPLES];
|
||||||
let mut farend_buf = vec![0i16; FRAME_SAMPLES];
|
let mut farend_buf = vec![0i16; FRAME_SAMPLES];
|
||||||
let mut frames_sent: u64 = 0;
|
let mut frames_sent: u64 = 0;
|
||||||
|
let mut frames_dropped: u64 = 0;
|
||||||
|
let mut send_errors: u64 = 0;
|
||||||
|
let mut last_send_err = std::time::Instant::now();
|
||||||
let mut polls: u64 = 0;
|
let mut polls: u64 = 0;
|
||||||
let mut last_diag = std::time::Instant::now();
|
let mut last_diag = std::time::Instant::now();
|
||||||
|
|
||||||
@@ -873,13 +874,23 @@ async fn run_live(
|
|||||||
};
|
};
|
||||||
let encode_us = t0.elapsed().as_micros();
|
let encode_us = t0.elapsed().as_micros();
|
||||||
|
|
||||||
|
let mut dropped = false;
|
||||||
for pkt in &packets {
|
for pkt in &packets {
|
||||||
if let Err(e) = send_transport.send_media(pkt).await {
|
if let Err(e) = send_transport.send_media(pkt).await {
|
||||||
error!("send error: {e}");
|
send_errors += 1;
|
||||||
return;
|
frames_dropped += 1;
|
||||||
|
dropped = true;
|
||||||
|
if send_errors <= 3 || last_send_err.elapsed().as_secs() >= 1 {
|
||||||
|
warn!(send_errors, frames_dropped,
|
||||||
|
"send_media error (dropping packet): {e}");
|
||||||
|
last_send_err = std::time::Instant::now();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !dropped {
|
||||||
|
send_errors = 0; // reset on success
|
||||||
|
}
|
||||||
frames_sent += 1;
|
frames_sent += 1;
|
||||||
if frames_sent <= 5 || frames_sent % 500 == 0 {
|
if frames_sent <= 5 || frames_sent % 500 == 0 {
|
||||||
info!(frames_sent, encode_us, pkts = packets.len(), "send progress");
|
info!(frames_sent, encode_us, pkts = packets.len(), "send progress");
|
||||||
@@ -904,6 +915,7 @@ async fn run_live(
|
|||||||
|
|
||||||
async move {
|
async move {
|
||||||
let mut packets_received: u64 = 0;
|
let mut packets_received: u64 = 0;
|
||||||
|
let mut recv_errors: u64 = 0;
|
||||||
let mut timeouts: u64 = 0;
|
let mut timeouts: u64 = 0;
|
||||||
// For direct playout: raw Opus decoder + AGC
|
// For direct playout: raw Opus decoder + AGC
|
||||||
let mut opus_dec = if direct_playout {
|
let mut opus_dec = if direct_playout {
|
||||||
@@ -972,8 +984,15 @@ async fn run_live(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
error!("recv error: {e}");
|
let msg = e.to_string();
|
||||||
break;
|
if msg.contains("closed") || msg.contains("reset") {
|
||||||
|
error!("recv fatal: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
recv_errors += 1;
|
||||||
|
if recv_errors <= 3 {
|
||||||
|
warn!("recv error (continuing): {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
timeouts += 1;
|
timeouts += 1;
|
||||||
|
|||||||
@@ -171,6 +171,7 @@ impl CallEngine {
|
|||||||
let send_mic = mic_muted.clone();
|
let send_mic = mic_muted.clone();
|
||||||
let send_fs = frames_sent.clone();
|
let send_fs = frames_sent.clone();
|
||||||
let send_level = audio_level.clone();
|
let send_level = audio_level.clone();
|
||||||
|
let send_drops = Arc::new(AtomicU64::new(0));
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let config = CallConfig {
|
let config = CallConfig {
|
||||||
noise_suppression: false,
|
noise_suppression: false,
|
||||||
@@ -205,8 +206,11 @@ impl CallEngine {
|
|||||||
Ok(pkts) => {
|
Ok(pkts) => {
|
||||||
for pkt in &pkts {
|
for pkt in &pkts {
|
||||||
if let Err(e) = send_t.send_media(pkt).await {
|
if let Err(e) = send_t.send_media(pkt).await {
|
||||||
error!("send: {e}");
|
// Transient congestion (Blocked) — drop packet, keep going
|
||||||
return;
|
send_drops.fetch_add(1, Ordering::Relaxed);
|
||||||
|
if send_drops.load(Ordering::Relaxed) <= 3 {
|
||||||
|
tracing::warn!("send_media error (dropping packet): {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
send_fs.fetch_add(1, Ordering::Relaxed);
|
send_fs.fetch_add(1, Ordering::Relaxed);
|
||||||
@@ -249,8 +253,12 @@ impl CallEngine {
|
|||||||
}
|
}
|
||||||
Ok(Ok(None)) => break,
|
Ok(Ok(None)) => break,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
error!("recv: {e}");
|
let msg = e.to_string();
|
||||||
break;
|
if msg.contains("closed") || msg.contains("reset") {
|
||||||
|
error!("recv fatal: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Transient error — continue
|
||||||
}
|
}
|
||||||
Err(_) => {}
|
Err(_) => {}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user