merge: fj/feat/android-voip-client — congestion fix, AEC toggle, debug logging
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m36s
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m36s
Merged 10 commits from Android branch: - Send task crash fix on QUIC congestion (continue instead of break) - AEC toggle + NoiseSuppressor on Android - Debug reporter for crash diagnostics - Mic mute crackling fix - Participant dedup in UI - Proper QUIC connection close on hangup - Null alias display fix - Tracing → Android logcat - Incident reports for send-task crash and playout ring desync Conflict resolved in room.rs: kept Android's improved debug logging (recv gap tracking, lock contention, forward latency, send errors) inside our media_task async block for parallel signal handling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,7 +10,7 @@ use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
use wzp_proto::packet::TrunkFrame;
|
||||
use wzp_proto::MediaTransport;
|
||||
@@ -406,7 +406,7 @@ async fn run_participant_plain(
|
||||
) {
|
||||
let addr = transport.connection().remote_address();
|
||||
|
||||
// Media forwarding task
|
||||
// Media forwarding task (with debug logging from Android fixes)
|
||||
let media_room_mgr = room_mgr.clone();
|
||||
let media_room_name = room_name.clone();
|
||||
let media_transport = transport.clone();
|
||||
@@ -414,50 +414,102 @@ async fn run_participant_plain(
|
||||
let media_session_id = session_id.to_string();
|
||||
let media_task = async move {
|
||||
let mut packets_forwarded = 0u64;
|
||||
let mut last_recv_instant = std::time::Instant::now();
|
||||
let mut max_recv_gap_ms = 0u64;
|
||||
let mut max_forward_ms = 0u64;
|
||||
let mut send_errors = 0u64;
|
||||
let mut last_log_instant = std::time::Instant::now();
|
||||
|
||||
info!(
|
||||
room = %media_room_name,
|
||||
participant = participant_id,
|
||||
%addr,
|
||||
session = %media_session_id,
|
||||
"forwarding loop started (plain)"
|
||||
);
|
||||
|
||||
loop {
|
||||
let pkt = match media_transport.recv_media().await {
|
||||
Ok(Some(pkt)) => pkt,
|
||||
Ok(None) => {
|
||||
info!(%addr, participant = participant_id, "disconnected");
|
||||
info!(%addr, participant = participant_id, forwarded = packets_forwarded, "disconnected (stream ended)");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = e.to_string();
|
||||
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
|
||||
info!(%addr, participant = participant_id, "connection closed: {e}");
|
||||
info!(%addr, participant = participant_id, forwarded = packets_forwarded, "connection closed: {e}");
|
||||
} else {
|
||||
error!(%addr, participant = participant_id, "recv error: {e}");
|
||||
error!(%addr, participant = participant_id, forwarded = packets_forwarded, "recv error: {e}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
|
||||
last_recv_instant = std::time::Instant::now();
|
||||
if recv_gap_ms > max_recv_gap_ms {
|
||||
max_recv_gap_ms = recv_gap_ms;
|
||||
}
|
||||
if recv_gap_ms > 200 {
|
||||
warn!(
|
||||
room = %media_room_name,
|
||||
participant = participant_id,
|
||||
recv_gap_ms,
|
||||
seq = pkt.header.seq,
|
||||
"large recv gap"
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(ref report) = pkt.quality_report {
|
||||
media_metrics.update_session_quality(&media_session_id, report);
|
||||
}
|
||||
|
||||
let lock_start = std::time::Instant::now();
|
||||
let others = {
|
||||
let mgr = media_room_mgr.lock().await;
|
||||
mgr.others(&media_room_name, participant_id)
|
||||
};
|
||||
let lock_ms = lock_start.elapsed().as_millis() as u64;
|
||||
if lock_ms > 10 {
|
||||
warn!(room = %media_room_name, participant = participant_id, lock_ms, "slow room_mgr lock");
|
||||
}
|
||||
|
||||
let fwd_start = std::time::Instant::now();
|
||||
let pkt_bytes = pkt.payload.len() as u64;
|
||||
for other in &others {
|
||||
match other {
|
||||
ParticipantSender::Quic(t) => {
|
||||
let _ = t.send_media(&pkt).await;
|
||||
if let Err(e) = t.send_media(&pkt).await {
|
||||
send_errors += 1;
|
||||
if send_errors <= 5 || send_errors % 100 == 0 {
|
||||
warn!(
|
||||
room = %media_room_name,
|
||||
participant = participant_id,
|
||||
peer = %t.connection().remote_address(),
|
||||
total_send_errors = send_errors,
|
||||
"send_media error: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
ParticipantSender::WebSocket(_) => {
|
||||
let _ = other.send_raw(&pkt.payload).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
let fwd_ms = fwd_start.elapsed().as_millis() as u64;
|
||||
if fwd_ms > max_forward_ms { max_forward_ms = fwd_ms; }
|
||||
if fwd_ms > 50 {
|
||||
warn!(room = %media_room_name, participant = participant_id, fwd_ms, fan_out = others.len(), "slow forward");
|
||||
}
|
||||
|
||||
let fan_out = others.len() as u64;
|
||||
media_metrics.packets_forwarded.inc_by(fan_out);
|
||||
media_metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
|
||||
packets_forwarded += 1;
|
||||
if packets_forwarded % 500 == 0 {
|
||||
|
||||
if last_log_instant.elapsed() >= Duration::from_secs(5) {
|
||||
let room_size = {
|
||||
let mgr = media_room_mgr.lock().await;
|
||||
mgr.room_size(&media_room_name)
|
||||
@@ -466,9 +518,12 @@ async fn run_participant_plain(
|
||||
room = %media_room_name,
|
||||
participant = participant_id,
|
||||
forwarded = packets_forwarded,
|
||||
room_size,
|
||||
room_size, fan_out, max_recv_gap_ms, max_forward_ms, send_errors,
|
||||
"participant stats"
|
||||
);
|
||||
max_recv_gap_ms = 0;
|
||||
max_forward_ms = 0;
|
||||
last_log_instant = std::time::Instant::now();
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -533,6 +588,19 @@ async fn run_participant_trunked(
|
||||
|
||||
let addr = transport.connection().remote_address();
|
||||
let mut packets_forwarded = 0u64;
|
||||
let mut last_recv_instant = std::time::Instant::now();
|
||||
let mut max_recv_gap_ms = 0u64;
|
||||
let mut max_forward_ms = 0u64;
|
||||
let mut send_errors = 0u64;
|
||||
let mut last_log_instant = std::time::Instant::now();
|
||||
|
||||
info!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
%addr,
|
||||
session = session_id,
|
||||
"forwarding loop started (trunked)"
|
||||
);
|
||||
|
||||
// Per-peer TrunkedForwarders, keyed by the raw pointer of the peer
|
||||
// transport (stable for the Arc's lifetime). We use the remote address
|
||||
@@ -554,24 +622,50 @@ async fn run_participant_trunked(
|
||||
let pkt = match result {
|
||||
Ok(Some(pkt)) => pkt,
|
||||
Ok(None) => {
|
||||
info!(%addr, participant = participant_id, "disconnected");
|
||||
info!(%addr, participant = participant_id, forwarded = packets_forwarded, "disconnected (stream ended)");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%addr, participant = participant_id, "recv error: {e}");
|
||||
error!(%addr, participant = participant_id, forwarded = packets_forwarded, "recv error: {e}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
|
||||
last_recv_instant = std::time::Instant::now();
|
||||
if recv_gap_ms > max_recv_gap_ms {
|
||||
max_recv_gap_ms = recv_gap_ms;
|
||||
}
|
||||
if recv_gap_ms > 200 {
|
||||
warn!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
recv_gap_ms,
|
||||
seq = pkt.header.seq,
|
||||
"large recv gap (trunked)"
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(ref report) = pkt.quality_report {
|
||||
metrics.update_session_quality(session_id, report);
|
||||
}
|
||||
|
||||
let lock_start = std::time::Instant::now();
|
||||
let others = {
|
||||
let mgr = room_mgr.lock().await;
|
||||
mgr.others(&room_name, participant_id)
|
||||
};
|
||||
let lock_ms = lock_start.elapsed().as_millis() as u64;
|
||||
if lock_ms > 10 {
|
||||
warn!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
lock_ms,
|
||||
"slow room_mgr lock (trunked)"
|
||||
);
|
||||
}
|
||||
|
||||
let fwd_start = std::time::Instant::now();
|
||||
let pkt_bytes = pkt.payload.len() as u64;
|
||||
for other in &others {
|
||||
match other {
|
||||
@@ -581,21 +675,44 @@ async fn run_participant_trunked(
|
||||
.entry(peer_addr)
|
||||
.or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes));
|
||||
if let Err(e) = fwd.send(&pkt).await {
|
||||
let _ = e;
|
||||
send_errors += 1;
|
||||
if send_errors <= 5 || send_errors % 100 == 0 {
|
||||
warn!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
peer = %peer_addr,
|
||||
total_send_errors = send_errors,
|
||||
"trunked send error: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
ParticipantSender::WebSocket(_) => {
|
||||
// WS clients bypass trunking — send raw payload directly
|
||||
let _ = other.send_raw(&pkt.payload).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
let fwd_ms = fwd_start.elapsed().as_millis() as u64;
|
||||
if fwd_ms > max_forward_ms {
|
||||
max_forward_ms = fwd_ms;
|
||||
}
|
||||
if fwd_ms > 50 {
|
||||
warn!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
fwd_ms,
|
||||
fan_out = others.len(),
|
||||
"slow forward (trunked)"
|
||||
);
|
||||
}
|
||||
|
||||
let fan_out = others.len() as u64;
|
||||
metrics.packets_forwarded.inc_by(fan_out);
|
||||
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
|
||||
packets_forwarded += 1;
|
||||
if packets_forwarded % 500 == 0 {
|
||||
|
||||
// Periodic stats every 5 seconds
|
||||
if last_log_instant.elapsed() >= Duration::from_secs(5) {
|
||||
let room_size = {
|
||||
let mgr = room_mgr.lock().await;
|
||||
mgr.room_size(&room_name)
|
||||
@@ -605,15 +722,30 @@ async fn run_participant_trunked(
|
||||
participant = participant_id,
|
||||
forwarded = packets_forwarded,
|
||||
room_size,
|
||||
fan_out,
|
||||
max_recv_gap_ms,
|
||||
max_forward_ms,
|
||||
send_errors,
|
||||
"participant stats (trunked)"
|
||||
);
|
||||
max_recv_gap_ms = 0;
|
||||
max_forward_ms = 0;
|
||||
last_log_instant = std::time::Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
_ = flush_interval.tick() => {
|
||||
for fwd in forwarders.values_mut() {
|
||||
if let Err(e) = fwd.flush().await {
|
||||
let _ = e;
|
||||
send_errors += 1;
|
||||
if send_errors <= 5 || send_errors % 100 == 0 {
|
||||
warn!(
|
||||
room = %room_name,
|
||||
participant = participant_id,
|
||||
total_send_errors = send_errors,
|
||||
"trunk flush error: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user