diff --git a/crates/wzp-native/cpp/oboe_bridge.cpp b/crates/wzp-native/cpp/oboe_bridge.cpp index d55e066..cf8b79f 100644 --- a/crates/wzp-native/cpp/oboe_bridge.cpp +++ b/crates/wzp-native/cpp/oboe_bridge.cpp @@ -195,6 +195,16 @@ int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) { } // Build playout stream + // + // Usage::Media (NOT VoiceCommunication) routes to the media audio + // stream which plays through the loud speaker and uses the media + // volume slider. VoiceCommunication routes to the in-call earpiece + // stream which is silent unless AudioManager.setMode(IN_COMMUNICATION) + // has been called from the Activity, and even then only the earpiece + // (or a bluetooth headset) gets audio by default. For a debug-friendly + // smoke test we want loud speaker by default. A future polish step + // will wire setMode + setSpeakerphoneOn from MainActivity.kt so we + // can switch back to VoiceCommunication (for AEC benefits etc). oboe::AudioStreamBuilder playoutBuilder; playoutBuilder.setDirection(oboe::Direction::Output) ->setPerformanceMode(oboe::PerformanceMode::LowLatency) @@ -203,7 +213,8 @@ int wzp_oboe_start(const WzpOboeConfig* config, const WzpOboeRings* rings) { ->setChannelCount(config->channel_count) ->setSampleRate(config->sample_rate) ->setFramesPerDataCallback(config->frames_per_burst) - ->setUsage(oboe::Usage::VoiceCommunication) + ->setUsage(oboe::Usage::Media) + ->setContentType(oboe::ContentType::Speech) ->setDataCallback(&g_playout_cb); result = playoutBuilder.openStream(g_playout_stream); diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 017c94e..3528c74 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -378,6 +378,31 @@ async fn main() -> anyhow::Result<()> { } let endpoint = wzp_transport::create_endpoint(config.listen_addr, Some(server_config))?; + // Compute the IP address we should advertise in CallSetup for direct + // calls. If the relay is bound to a specific IP, use it as-is; if bound + // to 0.0.0.0, use the trick of "connect" a UDP socket to an arbitrary + // external address and read its local_addr — the OS binds to whichever + // local interface IP would route packets to that destination, which is + // the primary outbound interface. This is the same IP clients on the + // LAN use to reach us. + let advertised_ip: std::net::IpAddr = { + let listen_ip = config.listen_addr.ip(); + if !listen_ip.is_unspecified() { + listen_ip + } else { + // Probe via a dummy "connected" UDP socket. Never actually sends. + match std::net::UdpSocket::bind("0.0.0.0:0") + .and_then(|s| { s.connect("8.8.8.8:80").map(|_| s) }) + .and_then(|s| s.local_addr()) + { + Ok(a) if !a.ip().is_loopback() => a.ip(), + _ => std::net::IpAddr::from([127u8, 0, 0, 1]), + } + } + }; + let advertised_addr_str = format!("{}:{}", advertised_ip, config.listen_addr.port()); + info!(%advertised_addr_str, "relay advertised address for CallSetup"); + // Forward mode let remote_transport: Option> = if let Some(remote_addr) = config.remote_relay { @@ -475,9 +500,19 @@ async fn main() -> anyhow::Result<()> { info!("Listening for connections..."); loop { - let connection = match wzp_transport::accept(&endpoint).await { - Ok(conn) => conn, - Err(e) => { error!("accept: {e}"); continue; } + // Pull the next Incoming off the queue. Deliberately do NOT await + // the QUIC handshake here — move that into the per-connection + // spawned task below. Previously we used wzp_transport::accept + // which did both, which meant a single slow handshake would block + // the entire accept loop and prevent ALL subsequent connections + // from being processed. Surfaced as direct-call hangs where the + // callee's call-* connection never completes its QUIC handshake. + let incoming = match endpoint.accept().await { + Some(inc) => inc, + None => { + error!("endpoint.accept() returned None — endpoint closed"); + break; + } }; let remote_transport = remote_transport.clone(); @@ -493,9 +528,22 @@ async fn main() -> anyhow::Result<()> { let federation_mgr = federation_mgr.clone(); let signal_hub = signal_hub.clone(); let call_registry = call_registry.clone(); - let listen_addr_str = config.listen_addr.to_string(); + let advertised_addr_str = advertised_addr_str.clone(); + + let incoming_addr = incoming.remote_address(); + info!(%incoming_addr, "accept queue: new Incoming, spawning handshake task"); tokio::spawn(async move { + // Drive the QUIC handshake inside the spawned task so that + // slow or hung handshakes never block the outer accept loop. + let connection = match incoming.await { + Ok(c) => c, + Err(e) => { + error!(%incoming_addr, "QUIC handshake failed: {e}"); + return; + } + }; + info!(%incoming_addr, "QUIC handshake complete"); let addr = connection.remote_address(); let room_name = connection @@ -793,22 +841,18 @@ async fn main() -> anyhow::Result<()> { let _ = hub.send_to(&peer_fp, &msg).await; } - // Send CallSetup to both parties - // Use the address the client connected to (their remote addr - // is our perspective, but we need our listen addr). - // Replace 0.0.0.0 with the client's destination IP. - let relay_addr_for_setup = if listen_addr_str.starts_with("0.0.0.0:") { - let port = &listen_addr_str[8..]; - // Use the local IP from the client's connection - let local_ip = addr.ip(); - if local_ip.is_loopback() { - format!("127.0.0.1:{port}") - } else { - format!("{local_ip}:{port}") - } - } else { - listen_addr_str.clone() - }; + // Send CallSetup to both parties. + // + // BUG FIX: the previous version of this used `addr.ip()` + // which is `connection.remote_address()` — the CLIENT'S + // IP, not the relay's. So CallSetup told both parties to + // dial the answerer's own IP, which meant the caller was + // sending QUIC Initials into the callee's client (no + // server listening there) and the callee was sending to + // itself. In both cases endpoint.connect() hung forever. + // + // Use the relay's precomputed advertised address instead. + let relay_addr_for_setup = advertised_addr_str.clone(); let setup = SignalMessage::CallSetup { call_id: call_id.clone(), room: room.clone(), @@ -1153,4 +1197,5 @@ async fn main() -> anyhow::Result<()> { } }); } + Ok(()) } diff --git a/desktop/src-tauri/src/engine.rs b/desktop/src-tauri/src/engine.rs index 9951275..64e91bb 100644 --- a/desktop/src-tauri/src/engine.rs +++ b/desktop/src-tauri/src/engine.rs @@ -224,6 +224,11 @@ impl CallEngine { encoder.set_aec_enabled(false); let mut buf = vec![0i16; frame_samples]; + let mut heartbeat = std::time::Instant::now(); + let mut last_rms: u32 = 0; + let mut last_pkt_bytes: usize = 0; + let mut short_reads: u64 = 0; + loop { if !send_r.load(Ordering::Relaxed) { break; @@ -234,6 +239,7 @@ impl CallEngine { // so in steady state this spins once per frame. let read = crate::wzp_native::audio_read_capture(&mut buf); if read < frame_samples { + short_reads += 1; tokio::time::sleep(std::time::Duration::from_millis(5)).await; continue; } @@ -242,6 +248,7 @@ impl CallEngine { let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum(); let rms = (sum_sq / buf.len() as f64).sqrt() as u32; send_level.store(rms, Ordering::Relaxed); + last_rms = rms; if send_mic.load(Ordering::Relaxed) { buf.fill(0); @@ -249,6 +256,7 @@ impl CallEngine { match encoder.encode_frame(&buf) { Ok(pkts) => { for pkt in &pkts { + last_pkt_bytes = pkt.payload.len(); if let Err(e) = send_t.send_media(pkt).await { send_drops.fetch_add(1, Ordering::Relaxed); if send_drops.load(Ordering::Relaxed) <= 3 { @@ -260,6 +268,21 @@ impl CallEngine { } Err(e) => error!("encode: {e}"), } + + // Heartbeat every 2s with capture+encode+send state + if heartbeat.elapsed() >= std::time::Duration::from_secs(2) { + let fs = send_fs.load(Ordering::Relaxed); + let drops = send_drops.load(Ordering::Relaxed); + info!( + frames_sent = fs, + last_rms, + last_pkt_bytes, + short_reads, + send_drops = drops, + "send heartbeat (android)" + ); + heartbeat = std::time::Instant::now(); + } } }); @@ -275,6 +298,15 @@ impl CallEngine { let mut current_codec = initial_profile.codec; let mut agc = wzp_codec::AutoGainControl::new(); let mut pcm = vec![0i16; FRAME_SAMPLES_40MS]; + info!(codec = ?current_codec, "recv task starting (android/oboe)"); + + let mut heartbeat = std::time::Instant::now(); + let mut decoded_frames: u64 = 0; + let mut written_samples: u64 = 0; + let mut last_decode_n: usize = 0; + let mut last_written: usize = 0; + let mut decode_errs: u64 = 0; + let mut first_packet_logged = false; loop { if !recv_r.load(Ordering::Relaxed) { @@ -287,6 +319,10 @@ impl CallEngine { .await { Ok(Ok(Some(pkt))) => { + if !first_packet_logged { + info!(codec_id = ?pkt.header.codec_id, payload_bytes = pkt.payload.len(), is_repair = pkt.header.is_repair, "recv: first media packet received"); + first_packet_logged = true; + } if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise { { let mut rx = recv_rx_codec.lock().await; @@ -311,10 +347,22 @@ impl CallEngine { let _ = decoder.set_profile(new_profile); current_codec = pkt.header.codec_id; } - if let Ok(n) = decoder.decode(&pkt.payload, &mut pcm) { - agc.process_frame(&mut pcm[..n]); - if !recv_spk.load(Ordering::Relaxed) { - crate::wzp_native::audio_write_playout(&pcm[..n]); + match decoder.decode(&pkt.payload, &mut pcm) { + Ok(n) => { + last_decode_n = n; + decoded_frames += 1; + agc.process_frame(&mut pcm[..n]); + if !recv_spk.load(Ordering::Relaxed) { + let w = crate::wzp_native::audio_write_playout(&pcm[..n]); + last_written = w; + written_samples = written_samples.saturating_add(w as u64); + } + } + Err(e) => { + decode_errs += 1; + if decode_errs <= 3 { + tracing::warn!("decode error: {e}"); + } } } }