fix(android): playout Usage::Media + relay CallSetup advertises real IP
Three real bugs, one smoke-test session's worth of progress. 1. RELAY: wrong advertised addr in CallSetup The direct-call CallSetup computed `relay_addr = addr.ip()` where `addr = connection.remote_address()` — i.e. the CLIENT'S IP, not the relay's. So the relay was telling both parties "the call room is at the answerer's IP:4433", which meant each client dialed either the other client (no server listening) or themselves. Both endpoint.connect calls hung forever and the call never happened. Fix: compute the relay's own advertised IP once at startup. If the listen addr is 0.0.0.0, probe the primary outbound interface via the classic UDP-bind-and-connect(8.8.8.8:80) trick to discover the LAN IP the OS would use to reach external hosts. Thread the resulting advertised_addr_str into the CallSetup sender for both parties. 2. RELAY: accept loop serialized QUIC handshakes Previously the main accept loop called `wzp_transport::accept` which did both `endpoint.accept().await` AND `incoming.await` (the server- side QUIC handshake). A single slow handshake therefore blocked every subsequent client from being accepted. Unroll the helper here and move `incoming.await` into the per-connection spawned task, so every handshake runs in parallel. Also log "accept queue: new Incoming", "QUIC handshake complete", and "QUIC handshake failed" so we can tell immediately whether a client's packets are reaching the relay at all. 3. ANDROID: playout was routed to the silent in-call stream The Oboe playout stream was configured with Usage::VoiceCommunication, which routes to the Android in-call earpiece stream. That stream is silent unless the Activity has called AudioManager.setMode( IN_COMMUNICATION) and, even then, only the earpiece/BT headset get audio (not the loud speaker). Result: android→mac calls worked because mac had a normal media output, but mac→android calls were silent even though packets flowed through the relay just fine. Switch to Usage::Media + ContentType::Speech so Oboe routes to the loud speaker and uses the media volume slider. A later polish step will wire setMode + setSpeakerphoneOn from MainActivity.kt so we can go back to VoiceCommunication for AEC and proximity-sensor routing. Plus: heartbeat tracing every 2s in the send/recv tasks — frames_sent, last_rms, last_pkt_bytes, short_reads on the send side; decoded_frames, last_decode_n, last_written, decode_errs on the recv side. Will make the next "no sound" regression trivial to localize.
This commit is contained in:
@@ -195,6 +195,16 @@ int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Build playout stream
|
// Build playout stream
|
||||||
|
//
|
||||||
|
// Usage::Media (NOT VoiceCommunication) routes to the media audio
|
||||||
|
// stream which plays through the loud speaker and uses the media
|
||||||
|
// volume slider. VoiceCommunication routes to the in-call earpiece
|
||||||
|
// stream which is silent unless AudioManager.setMode(IN_COMMUNICATION)
|
||||||
|
// has been called from the Activity, and even then only the earpiece
|
||||||
|
// (or a bluetooth headset) gets audio by default. For a debug-friendly
|
||||||
|
// smoke test we want loud speaker by default. A future polish step
|
||||||
|
// will wire setMode + setSpeakerphoneOn from MainActivity.kt so we
|
||||||
|
// can switch back to VoiceCommunication (for AEC benefits etc).
|
||||||
oboe::AudioStreamBuilder playoutBuilder;
|
oboe::AudioStreamBuilder playoutBuilder;
|
||||||
playoutBuilder.setDirection(oboe::Direction::Output)
|
playoutBuilder.setDirection(oboe::Direction::Output)
|
||||||
->setPerformanceMode(oboe::PerformanceMode::LowLatency)
|
->setPerformanceMode(oboe::PerformanceMode::LowLatency)
|
||||||
@@ -203,7 +213,8 @@ int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) {
|
|||||||
->setChannelCount(config->channel_count)
|
->setChannelCount(config->channel_count)
|
||||||
->setSampleRate(config->sample_rate)
|
->setSampleRate(config->sample_rate)
|
||||||
->setFramesPerDataCallback(config->frames_per_burst)
|
->setFramesPerDataCallback(config->frames_per_burst)
|
||||||
->setUsage(oboe::Usage::VoiceCommunication)
|
->setUsage(oboe::Usage::Media)
|
||||||
|
->setContentType(oboe::ContentType::Speech)
|
||||||
->setDataCallback(&g_playout_cb);
|
->setDataCallback(&g_playout_cb);
|
||||||
|
|
||||||
result = playoutBuilder.openStream(g_playout_stream);
|
result = playoutBuilder.openStream(g_playout_stream);
|
||||||
|
|||||||
@@ -378,6 +378,31 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?;
|
let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?;
|
||||||
|
|
||||||
|
// Compute the IP address we should advertise in CallSetup for direct
|
||||||
|
// calls. If the relay is bound to a specific IP, use it as-is; if bound
|
||||||
|
// to 0.0.0.0, use the trick of "connect" a UDP socket to an arbitrary
|
||||||
|
// external address and read its local_addr — the OS binds to whichever
|
||||||
|
// local interface IP would route packets to that destination, which is
|
||||||
|
// the primary outbound interface. This is the same IP clients on the
|
||||||
|
// LAN use to reach us.
|
||||||
|
let advertised_ip: std::net::IpAddr = {
|
||||||
|
let listen_ip = config.listen_addr.ip();
|
||||||
|
if !listen_ip.is_unspecified() {
|
||||||
|
listen_ip
|
||||||
|
} else {
|
||||||
|
// Probe via a dummy "connected" UDP socket. Never actually sends.
|
||||||
|
match std::net::UdpSocket::bind("0.0.0.0:0")
|
||||||
|
.and_then(|s| { s.connect("8.8.8.8:80").map(|_| s) })
|
||||||
|
.and_then(|s| s.local_addr())
|
||||||
|
{
|
||||||
|
Ok(a) if !a.ip().is_loopback() => a.ip(),
|
||||||
|
_ => std::net::IpAddr::from([127u8, 0, 0, 1]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let advertised_addr_str = format!("{}:{}", advertised_ip, config.listen_addr.port());
|
||||||
|
info!(%advertised_addr_str, "relay advertised address for CallSetup");
|
||||||
|
|
||||||
// Forward mode
|
// Forward mode
|
||||||
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> =
|
let remote_transport: Option<Arc<wzp_transport::QuinnTransport>> =
|
||||||
if let Some(remote_addr) = config.remote_relay {
|
if let Some(remote_addr) = config.remote_relay {
|
||||||
@@ -475,9 +500,19 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
info!("Listening for connections...");
|
info!("Listening for connections...");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let connection = match wzp_transport::accept(&endpoint).await {
|
// Pull the next Incoming off the queue. Deliberately do NOT await
|
||||||
Ok(conn) => conn,
|
// the QUIC handshake here — move that into the per-connection
|
||||||
Err(e) => { error!("accept: {e}"); continue; }
|
// spawned task below. Previously we used wzp_transport::accept
|
||||||
|
// which did both, which meant a single slow handshake would block
|
||||||
|
// the entire accept loop and prevent ALL subsequent connections
|
||||||
|
// from being processed. Surfaced as direct-call hangs where the
|
||||||
|
// callee's call-* connection never completes its QUIC handshake.
|
||||||
|
let incoming = match endpoint.accept().await {
|
||||||
|
Some(inc) => inc,
|
||||||
|
None => {
|
||||||
|
error!("endpoint.accept() returned None — endpoint closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let remote_transport = remote_transport.clone();
|
let remote_transport = remote_transport.clone();
|
||||||
@@ -493,9 +528,22 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let federation_mgr = federation_mgr.clone();
|
let federation_mgr = federation_mgr.clone();
|
||||||
let signal_hub = signal_hub.clone();
|
let signal_hub = signal_hub.clone();
|
||||||
let call_registry = call_registry.clone();
|
let call_registry = call_registry.clone();
|
||||||
let listen_addr_str = config.listen_addr.to_string();
|
let advertised_addr_str = advertised_addr_str.clone();
|
||||||
|
|
||||||
|
let incoming_addr = incoming.remote_address();
|
||||||
|
info!(%incoming_addr, "accept queue: new Incoming, spawning handshake task");
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Drive the QUIC handshake inside the spawned task so that
|
||||||
|
// slow or hung handshakes never block the outer accept loop.
|
||||||
|
let connection = match incoming.await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
error!(%incoming_addr, "QUIC handshake failed: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
info!(%incoming_addr, "QUIC handshake complete");
|
||||||
let addr = connection.remote_address();
|
let addr = connection.remote_address();
|
||||||
|
|
||||||
let room_name = connection
|
let room_name = connection
|
||||||
@@ -793,22 +841,18 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let _ = hub.send_to(&peer_fp, &msg).await;
|
let _ = hub.send_to(&peer_fp, &msg).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send CallSetup to both parties
|
// Send CallSetup to both parties.
|
||||||
// Use the address the client connected to (their remote addr
|
//
|
||||||
// is our perspective, but we need our listen addr).
|
// BUG FIX: the previous version of this used `addr.ip()`
|
||||||
// Replace 0.0.0.0 with the client's destination IP.
|
// which is `connection.remote_address()` — the CLIENT'S
|
||||||
let relay_addr_for_setup = if listen_addr_str.starts_with("0.0.0.0:") {
|
// IP, not the relay's. So CallSetup told both parties to
|
||||||
let port = &listen_addr_str[8..];
|
// dial the answerer's own IP, which meant the caller was
|
||||||
// Use the local IP from the client's connection
|
// sending QUIC Initials into the callee's client (no
|
||||||
let local_ip = addr.ip();
|
// server listening there) and the callee was sending to
|
||||||
if local_ip.is_loopback() {
|
// itself. In both cases endpoint.connect() hung forever.
|
||||||
format!("127.0.0.1:{port}")
|
//
|
||||||
} else {
|
// Use the relay's precomputed advertised address instead.
|
||||||
format!("{local_ip}:{port}")
|
let relay_addr_for_setup = advertised_addr_str.clone();
|
||||||
}
|
|
||||||
} else {
|
|
||||||
listen_addr_str.clone()
|
|
||||||
};
|
|
||||||
let setup = SignalMessage::CallSetup {
|
let setup = SignalMessage::CallSetup {
|
||||||
call_id: call_id.clone(),
|
call_id: call_id.clone(),
|
||||||
room: room.clone(),
|
room: room.clone(),
|
||||||
@@ -1153,4 +1197,5 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -224,6 +224,11 @@ impl CallEngine {
|
|||||||
encoder.set_aec_enabled(false);
|
encoder.set_aec_enabled(false);
|
||||||
let mut buf = vec![0i16; frame_samples];
|
let mut buf = vec![0i16; frame_samples];
|
||||||
|
|
||||||
|
let mut heartbeat = std::time::Instant::now();
|
||||||
|
let mut last_rms: u32 = 0;
|
||||||
|
let mut last_pkt_bytes: usize = 0;
|
||||||
|
let mut short_reads: u64 = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if !send_r.load(Ordering::Relaxed) {
|
if !send_r.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
@@ -234,6 +239,7 @@ impl CallEngine {
|
|||||||
// so in steady state this spins once per frame.
|
// so in steady state this spins once per frame.
|
||||||
let read = crate::wzp_native::audio_read_capture(&mut buf);
|
let read = crate::wzp_native::audio_read_capture(&mut buf);
|
||||||
if read < frame_samples {
|
if read < frame_samples {
|
||||||
|
short_reads += 1;
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -242,6 +248,7 @@ impl CallEngine {
|
|||||||
let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum();
|
let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum();
|
||||||
let rms = (sum_sq / buf.len() as f64).sqrt() as u32;
|
let rms = (sum_sq / buf.len() as f64).sqrt() as u32;
|
||||||
send_level.store(rms, Ordering::Relaxed);
|
send_level.store(rms, Ordering::Relaxed);
|
||||||
|
last_rms = rms;
|
||||||
|
|
||||||
if send_mic.load(Ordering::Relaxed) {
|
if send_mic.load(Ordering::Relaxed) {
|
||||||
buf.fill(0);
|
buf.fill(0);
|
||||||
@@ -249,6 +256,7 @@ impl CallEngine {
|
|||||||
match encoder.encode_frame(&buf) {
|
match encoder.encode_frame(&buf) {
|
||||||
Ok(pkts) => {
|
Ok(pkts) => {
|
||||||
for pkt in &pkts {
|
for pkt in &pkts {
|
||||||
|
last_pkt_bytes = pkt.payload.len();
|
||||||
if let Err(e) = send_t.send_media(pkt).await {
|
if let Err(e) = send_t.send_media(pkt).await {
|
||||||
send_drops.fetch_add(1, Ordering::Relaxed);
|
send_drops.fetch_add(1, Ordering::Relaxed);
|
||||||
if send_drops.load(Ordering::Relaxed) <= 3 {
|
if send_drops.load(Ordering::Relaxed) <= 3 {
|
||||||
@@ -260,6 +268,21 @@ impl CallEngine {
|
|||||||
}
|
}
|
||||||
Err(e) => error!("encode: {e}"),
|
Err(e) => error!("encode: {e}"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Heartbeat every 2s with capture+encode+send state
|
||||||
|
if heartbeat.elapsed() >= std::time::Duration::from_secs(2) {
|
||||||
|
let fs = send_fs.load(Ordering::Relaxed);
|
||||||
|
let drops = send_drops.load(Ordering::Relaxed);
|
||||||
|
info!(
|
||||||
|
frames_sent = fs,
|
||||||
|
last_rms,
|
||||||
|
last_pkt_bytes,
|
||||||
|
short_reads,
|
||||||
|
send_drops = drops,
|
||||||
|
"send heartbeat (android)"
|
||||||
|
);
|
||||||
|
heartbeat = std::time::Instant::now();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -275,6 +298,15 @@ impl CallEngine {
|
|||||||
let mut current_codec = initial_profile.codec;
|
let mut current_codec = initial_profile.codec;
|
||||||
let mut agc = wzp_codec::AutoGainControl::new();
|
let mut agc = wzp_codec::AutoGainControl::new();
|
||||||
let mut pcm = vec![0i16; FRAME_SAMPLES_40MS];
|
let mut pcm = vec![0i16; FRAME_SAMPLES_40MS];
|
||||||
|
info!(codec = ?current_codec, "recv task starting (android/oboe)");
|
||||||
|
|
||||||
|
let mut heartbeat = std::time::Instant::now();
|
||||||
|
let mut decoded_frames: u64 = 0;
|
||||||
|
let mut written_samples: u64 = 0;
|
||||||
|
let mut last_decode_n: usize = 0;
|
||||||
|
let mut last_written: usize = 0;
|
||||||
|
let mut decode_errs: u64 = 0;
|
||||||
|
let mut first_packet_logged = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if !recv_r.load(Ordering::Relaxed) {
|
if !recv_r.load(Ordering::Relaxed) {
|
||||||
@@ -287,6 +319,10 @@ impl CallEngine {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(Some(pkt))) => {
|
Ok(Ok(Some(pkt))) => {
|
||||||
|
if !first_packet_logged {
|
||||||
|
info!(codec_id = ?pkt.header.codec_id, payload_bytes = pkt.payload.len(), is_repair = pkt.header.is_repair, "recv: first media packet received");
|
||||||
|
first_packet_logged = true;
|
||||||
|
}
|
||||||
if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
|
if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
|
||||||
{
|
{
|
||||||
let mut rx = recv_rx_codec.lock().await;
|
let mut rx = recv_rx_codec.lock().await;
|
||||||
@@ -311,10 +347,22 @@ impl CallEngine {
|
|||||||
let _ = decoder.set_profile(new_profile);
|
let _ = decoder.set_profile(new_profile);
|
||||||
current_codec = pkt.header.codec_id;
|
current_codec = pkt.header.codec_id;
|
||||||
}
|
}
|
||||||
if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) {
|
match decoder.decode(&pkt.payload, &mut pcm) {
|
||||||
agc.process_frame(&mut pcm[..n]);
|
Ok(n) => {
|
||||||
if !recv_spk.load(Ordering::Relaxed) {
|
last_decode_n = n;
|
||||||
crate::wzp_native::audio_write_playout(&pcm[..n]);
|
decoded_frames += 1;
|
||||||
|
agc.process_frame(&mut pcm[..n]);
|
||||||
|
if !recv_spk.load(Ordering::Relaxed) {
|
||||||
|
let w = crate::wzp_native::audio_write_playout(&pcm[..n]);
|
||||||
|
last_written = w;
|
||||||
|
written_samples = written_samples.saturating_add(w as u64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
decode_errs += 1;
|
||||||
|
if decode_errs <= 3 {
|
||||||
|
tracing::warn!("decode error: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user