2 Commits

Author SHA1 Message Date
Siavash Sameni
921856eba9 feat(reflect): QUIC-native NAT reflection ("STUN for QUIC") — Phase 1
Lets a client ask its registered relay "what IP:port do you see for
me?" over the existing TLS-authenticated signal channel, returning
the client's server-reflexive address as a SocketAddr. Replaces the
need for a classic STUN deployment and becomes the bootstrap step
for future P2P hole-punching: once both peers know their own reflex
addrs, they can advertise them in DirectCallOffer and attempt a
direct QUIC handshake to each other.

Wire protocol (wzp-proto):
- SignalMessage::Reflect — unit variant, client -> relay
- SignalMessage::ReflectResponse { observed_addr: String } — relay -> client
- JSON-serde, appended at end of enum: zero ordinal concerns,
  backward compat with pre-Phase-1 relays by construction (older
  relays log "unexpected message" and drop; newer clients time out
  cleanly within 1s).

Relay handler (wzp-relay/src/main.rs, signal loop):
- New match arm next to Ping reuses the already-bound `addr` from
  connection.remote_address() and replies with observed_addr as a
  string. debug!-level log on success, warn!-level on send failure.

Client side (desktop/src-tauri/src/lib.rs):
- SignalState gains pending_reflect: Option<oneshot::Sender<SocketAddr>>.
- get_reflected_address Tauri command installs the oneshot before
  sending Reflect and awaits it with a 1s timeout; cleans up on
  every exit path (send failure, timeout, parse error).
- recv loop's new ReflectResponse arm fires the pending sender or
  emits a debug log for unsolicited responses — never crashes the
  loop on malformed input.
- Integrated into invoke_handler! alongside the other signal
  commands.

UI (desktop/index.html + src/main.ts):
- New "Network" section in settings panel with a "Detect" button
  that displays the reflected address or a categorized warning
  ("register first" / "relay does not support reflection" / error).

Tests (crates/wzp-relay/tests/reflect.rs — 3 new, all passing):
- reflect_happy_path: client on loopback gets back 127.0.0.1:<its own port>
- reflect_two_clients_distinct_ports: two concurrent clients see
  their own distinct ports, proving per-connection remote_address
- reflect_old_relay_times_out: mock relay that ignores Reflect —
  client times out between 1000-1200ms and does not hang

Also pre-existing test bit-rot unrelated to this PR — fixed so the
full workspace `cargo test` goes green:
- handshake_integration tests in wzp-client, wzp-relay and
  featherchat_compat in wzp-crypto all missed the `alias` field
  addition to CallOffer and the 3-arg form of perform_handshake
  plus 4-tuple return of accept_handshake. Updated to the current
  API surface.

Results:
  cargo test --workspace --exclude wzp-android: 386 passed
  cargo check --workspace: clean
  cargo clippy: no new warnings in touched files

Verification excludes wzp-android because it's dead code on this
branch (Tauri mobile uses wzp-native instead) and can't link -llog
on macOS host — unchanged status quo.

PRD: .taskmaster/docs/prd_reflect_over_quic.txt
Tasks: 39-46 all completed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 12:29:07 +04:00
Siavash Sameni
7e7968b2f9 diag(android-engine): first-join no-audio ordering instrumentation
Adds a single call_t0 = Instant::now() at the top of the Android
CallEngine::start path, threaded through send + recv tasks as
send_t0 / recv_t0, and tags the following milestones with
t_ms_since_call_start so we can build a clean side-by-side log of
first-call vs rejoin:

  1. QUIC connection established
  2. handshake complete
  3. wzp-native audio_start returned (+ how long audio_start itself took)
  4. send task spawned
  5. send: first full capture frame read (+ short_reads_before count)
  6. send: first non-zero capture RMS
  7. recv task spawned
  8. recv: first media packet received
  9. recv: first successful decode
 10. recv: first playout-ring write

Combined with the existing C++-side cb#0 logs in
crates/wzp-native/cpp/oboe_bridge.cpp ("capture cb#0", "playout
cb#0") this gives us full-pipeline ordering with no native-side
changes needed.

PRD: .taskmaster/docs/prd_android_first_join_no_audio.txt
Task: 32 (first task in the chain — diagnostics before any fix
attempts so we know which of the 5 suspect causes is real).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 10:00:20 +04:00
11 changed files with 666 additions and 16 deletions

View File

@@ -120,6 +120,12 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::CallRinging { .. } => CallSignalType::Ringing, SignalMessage::CallRinging { .. } => CallSignalType::Ringing,
SignalMessage::RegisterPresence { .. } SignalMessage::RegisterPresence { .. }
| SignalMessage::RegisterPresenceAck { .. } => CallSignalType::Offer, // relay-only | SignalMessage::RegisterPresenceAck { .. } => CallSignalType::Offer, // relay-only
// NAT reflection is a client↔relay control exchange that
// never crosses the featherChat bridge — if it ever reaches
// this mapper something is wrong, but we still have to give
// an answer. "Offer" is the generic catch-all.
SignalMessage::Reflect
| SignalMessage::ReflectResponse { .. } => CallSignalType::Offer, // control-plane
} }
} }

View File

@@ -83,12 +83,12 @@ async fn full_handshake_both_sides_derive_same_session() {
// Run client and relay handshakes concurrently. // Run client and relay handshakes concurrently.
let (client_result, relay_result) = tokio::join!( let (client_result, relay_result) = tokio::join!(
wzp_client::handshake::perform_handshake(client_transport_clone.as_ref(), &client_seed), wzp_client::handshake::perform_handshake(client_transport_clone.as_ref(), &client_seed, None),
wzp_relay::handshake::accept_handshake(relay_transport_clone.as_ref(), &relay_seed), wzp_relay::handshake::accept_handshake(relay_transport_clone.as_ref(), &relay_seed),
); );
let mut client_session = client_result.expect("client handshake should succeed"); let mut client_session = client_result.expect("client handshake should succeed");
let (mut relay_session, chosen_profile) = let (mut relay_session, chosen_profile, _caller_fp, _caller_alias) =
relay_result.expect("relay handshake should succeed"); relay_result.expect("relay handshake should succeed");
// Verify a profile was chosen. // Verify a profile was chosen.
@@ -151,6 +151,7 @@ async fn handshake_rejects_tampered_signature() {
ephemeral_pub, ephemeral_pub,
signature: bad_signature, signature: bad_signature,
supported_profiles: vec![wzp_proto::QualityProfile::GOOD], supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
alias: None,
}; };
client_transport_clone client_transport_clone
.send_signal(&offer) .send_signal(&offer)

View File

@@ -115,6 +115,7 @@ fn wzp_signal_serializes_into_fc_callsignal_payload() {
ephemeral_pub: [2u8; 32], ephemeral_pub: [2u8; 32],
signature: vec![3u8; 64], signature: vec![3u8; 64],
supported_profiles: vec![wzp_proto::QualityProfile::GOOD], supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
alias: None,
}; };
// Encode as featherChat CallSignal payload // Encode as featherChat CallSignal payload
@@ -280,6 +281,7 @@ fn all_signal_types_map_correctly() {
wzp_proto::SignalMessage::CallOffer { wzp_proto::SignalMessage::CallOffer {
identity_pub: [0; 32], ephemeral_pub: [0; 32], identity_pub: [0; 32], ephemeral_pub: [0; 32],
signature: vec![], supported_profiles: vec![], signature: vec![], supported_profiles: vec![],
alias: None,
}, },
"Offer", "Offer",
), ),

View File

@@ -770,6 +770,29 @@ pub enum SignalMessage {
CallRinging { CallRinging {
call_id: String, call_id: String,
}, },
// ── NAT reflection ("STUN for QUIC") ──────────────────────────────
/// Client → relay: "please tell me the source IP:port you see on
/// this connection". A QUIC-native replacement for classic STUN
/// that reuses the TLS-authenticated signal channel to the relay
/// instead of running a separate UDP reflection service on port
/// 3478. The relay answers with `ReflectResponse`.
///
/// No payload — the relay already knows which connection the
/// request arrived on, and `connection.remote_address()` gives it
/// the exact source address (post-NAT) as observed from the
/// server side of the TLS session.
Reflect,
/// Relay → client: response to `Reflect`. Carries the socket
/// address the relay observes as the client's source for this
/// QUIC connection in `SocketAddr::to_string()` form — "a.b.c.d:p"
/// for IPv4, "[::1]:p" for IPv6. Clients parse it with
/// `SocketAddr::from_str`.
ReflectResponse {
observed_addr: String,
},
} }
/// How the callee responds to a direct call. /// How the callee responds to a direct call.
@@ -908,6 +931,58 @@ mod tests {
assert_eq!(packet.quality_report, decoded.quality_report); assert_eq!(packet.quality_report, decoded.quality_report);
} }
#[test]
fn reflect_serialize_roundtrip() {
// Reflect is a unit variant — the client sends it with no
// payload and the relay answers with the observed source addr.
let req = SignalMessage::Reflect;
let json = serde_json::to_string(&req).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(decoded, SignalMessage::Reflect));
// ReflectResponse carries a string — exercise both IPv4 and
// IPv6 shapes because SocketAddr::to_string uses [::1]:port
// for v6 and the client side has to parse that back.
for addr in ["192.0.2.17:4433", "[2001:db8::1]:4433", "127.0.0.1:54321"] {
let resp = SignalMessage::ReflectResponse {
observed_addr: addr.to_string(),
};
let json = serde_json::to_string(&resp).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::ReflectResponse { observed_addr } => {
assert_eq!(observed_addr, addr);
// Must parse back to a SocketAddr cleanly.
let _parsed: std::net::SocketAddr = observed_addr.parse()
.expect("observed_addr must parse as SocketAddr");
}
_ => panic!("wrong variant after roundtrip"),
}
}
}
#[test]
fn reflect_backward_compat_with_existing_variants() {
// Adding Reflect/ReflectResponse at the end of the enum must
// not break JSON round-tripping of existing variants. Smoke-
// test a sample of the pre-existing ones.
let cases = vec![
SignalMessage::Ping { timestamp_ms: 12345 },
SignalMessage::Hold,
SignalMessage::Hangup { reason: HangupReason::Normal },
SignalMessage::CallRinging { call_id: "abcd".into() },
];
for m in cases {
let json = serde_json::to_string(&m).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
// Discriminant equality proves variant tag survived.
assert_eq!(
std::mem::discriminant(&m),
std::mem::discriminant(&decoded)
);
}
}
#[test] #[test]
fn hold_unhold_serialize() { fn hold_unhold_serialize() {
let hold = SignalMessage::Hold; let hold = SignalMessage::Hold;

View File

@@ -13,7 +13,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{error, info, warn}; use tracing::{debug, error, info, warn};
use wzp_proto::{MediaTransport, SignalMessage}; use wzp_proto::{MediaTransport, SignalMessage};
use wzp_relay::config::RelayConfig; use wzp_relay::config::RelayConfig;
@@ -892,6 +892,31 @@ async fn main() -> anyhow::Result<()> {
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await; let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
} }
// QUIC-native NAT reflection ("STUN for QUIC").
// The client asks "what source address do you
// see for me?" and we reply with whatever
// quinn reports as this connection's remote
// address — i.e. the post-NAT public address
// as observed from the server side of the TLS
// session. Used by the P2P path to learn the
// client's server-reflexive address without
// running a separate STUN server. No auth or
// rate-limit in Phase 1 — the client is
// already TLS-authenticated by the time it
// reaches this match arm.
SignalMessage::Reflect => {
let observed_addr = addr.to_string();
if let Err(e) = transport.send_signal(
&SignalMessage::ReflectResponse {
observed_addr: observed_addr.clone(),
},
).await {
warn!(%addr, error = %e, "reflect: failed to send response");
} else {
debug!(%addr, %observed_addr, "reflect: responded");
}
}
other => { other => {
warn!(%addr, "signal: unexpected message: {:?}", std::mem::discriminant(&other)); warn!(%addr, "signal: unexpected message: {:?}", std::mem::discriminant(&other));
} }

View File

@@ -63,11 +63,11 @@ async fn handshake_succeeds() {
accept_handshake(server_t.as_ref(), &callee_seed).await accept_handshake(server_t.as_ref(), &callee_seed).await
}); });
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed) let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
.await .await
.expect("perform_handshake should succeed"); .expect("perform_handshake should succeed");
let (callee_session, chosen_profile) = callee_handle let (callee_session, chosen_profile, _caller_fp, _caller_alias) = callee_handle
.await .await
.expect("join callee task") .expect("join callee task")
.expect("accept_handshake should succeed"); .expect("accept_handshake should succeed");
@@ -124,11 +124,11 @@ async fn handshake_verifies_identity() {
accept_handshake(server_t.as_ref(), &callee_seed).await accept_handshake(server_t.as_ref(), &callee_seed).await
}); });
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed) let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
.await .await
.expect("handshake must succeed even with different identities"); .expect("handshake must succeed even with different identities");
let (callee_session, _profile) = callee_handle let (callee_session, _profile, _caller_fp, _caller_alias) = callee_handle
.await .await
.expect("join") .expect("join")
.expect("accept_handshake must succeed"); .expect("accept_handshake must succeed");
@@ -183,7 +183,7 @@ async fn auth_then_handshake() {
}; };
// 2. Run the cryptographic handshake // 2. Run the cryptographic handshake
let (session, profile) = accept_handshake(server_t.as_ref(), &callee_seed) let (session, profile, _caller_fp, _caller_alias) = accept_handshake(server_t.as_ref(), &callee_seed)
.await .await
.expect("accept_handshake after auth"); .expect("accept_handshake after auth");
@@ -199,7 +199,7 @@ async fn auth_then_handshake() {
.await .await
.expect("send AuthToken"); .expect("send AuthToken");
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed) let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
.await .await
.expect("perform_handshake after auth"); .expect("perform_handshake after auth");
@@ -270,6 +270,7 @@ async fn handshake_rejects_bad_signature() {
ephemeral_pub, ephemeral_pub,
signature, signature,
supported_profiles: vec![wzp_proto::QualityProfile::GOOD], supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
alias: None,
}; };
client_transport client_transport

View File

@@ -0,0 +1,318 @@
//! Integration tests for the "STUN for QUIC" reflect protocol
//! (PRD: .taskmaster/docs/prd_reflect_over_quic.txt, Phase 1).
//!
//! We don't spin up the full relay binary — instead we exercise the
//! same wire-level request/response dance with a mock relay loop
//! that implements exactly the match arm added to
//! `wzp-relay/src/main.rs`. This isolates the protocol test from the
//! rest of the relay state (rooms, federation, call registry, ...).
//!
//! Three test cases:
//! 1. `reflect_happy_path` — client sends `Reflect`, mock relay
//! replies with `ReflectResponse { observed_addr }`, client
//! parses it back to a `SocketAddr` and confirms the IP is
//! `127.0.0.1` and the port matches its own bound port.
//! 2. `reflect_two_clients_distinct_ports` — two simultaneous
//! client connections on different ephemeral ports get back
//! different reflected ports, proving the relay uses
//! per-connection `remote_address` rather than a global.
//! 3. `reflect_old_relay_times_out` — mock relay that *doesn't*
//! handle `Reflect`; client side times out in the expected
//! window and does not hang.
//!
//! The third test uses a `tokio::time::timeout` wrapper directly
//! (the client-side `request_reflect` helper lives in
//! `desktop/src-tauri/src/lib.rs` which isn't a library we can
//! depend on from here, so we reproduce the timeout semantics
//! inline).
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_transport::{client_config, create_endpoint, server_config, QuinnTransport};
/// Spawn a minimal mock relay that loops over `recv_signal`,
/// matches on `Reflect`, and responds with `ReflectResponse` using
/// the remote_address observed for this connection. Mirrors the
/// match arm in `crates/wzp-relay/src/main.rs`.
async fn spawn_mock_relay_with_reflect(
server_transport: Arc<QuinnTransport>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
// Observed remote address at the time the connection was
// accepted. Stable for the life of the connection under quinn's
// normal operation. This is exactly what the real relay does.
let observed = server_transport.connection().remote_address();
loop {
match server_transport.recv_signal().await {
Ok(Some(SignalMessage::Reflect)) => {
let resp = SignalMessage::ReflectResponse {
observed_addr: observed.to_string(),
};
// If the send fails the client has gone; just exit.
if server_transport.send_signal(&resp).await.is_err() {
break;
}
}
Ok(Some(_other)) => {
// Ignore anything else — not relevant to this test.
}
Ok(None) => break,
Err(_e) => break,
}
}
})
}
/// Spawn a mock relay that intentionally DOES NOT handle Reflect.
/// Models a pre-Phase-1 relay — it keeps reading signal messages and
/// logs them to stderr, but never produces a `ReflectResponse`.
async fn spawn_mock_relay_without_reflect(
server_transport: Arc<QuinnTransport>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
match server_transport.recv_signal().await {
Ok(Some(_msg)) => {
// Deliberately do nothing. Old relay.
}
Ok(None) => break,
Err(_) => break,
}
}
})
}
/// Build an in-process QUIC client/server pair on loopback and
/// return (client_transport, server_transport, endpoints). The
/// endpoints tuple must be kept alive for the test duration.
///
/// `client_port_hint` of 0 means "let OS pick". Pass an explicit
/// port to pin the client's source port (useful for the
/// distinct-ports test).
async fn connected_pair_with_port(
_client_port_hint: u16,
) -> (Arc<QuinnTransport>, Arc<QuinnTransport>, (quinn::Endpoint, quinn::Endpoint)) {
let _ = rustls::crypto::ring::default_provider().install_default();
let (sc, _cert_der) = server_config();
let server_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
let server_ep = create_endpoint(server_addr, Some(sc)).expect("server endpoint");
let server_listen = server_ep.local_addr().expect("server local addr");
// Always bind the client to an ephemeral port — we'll read back
// the actual assigned port via `local_addr()` in the assertions.
let client_bind: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
let client_ep = create_endpoint(client_bind, None).expect("client endpoint");
let server_ep_clone = server_ep.clone();
let accept_fut = tokio::spawn(async move {
let conn = wzp_transport::accept(&server_ep_clone).await.expect("accept");
Arc::new(QuinnTransport::new(conn))
});
let client_conn =
wzp_transport::connect(&client_ep, server_listen, "localhost", client_config())
.await
.expect("connect");
let client_transport = Arc::new(QuinnTransport::new(client_conn));
let server_transport = accept_fut.await.expect("join accept task");
(client_transport, server_transport, (server_ep, client_ep))
}
// -----------------------------------------------------------------------
// Test 1: happy path — client learns its own port via Reflect
// -----------------------------------------------------------------------
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reflect_happy_path() {
let (client_transport, server_transport, (_server_ep, client_ep)) =
connected_pair_with_port(0).await;
// Grab the client's actual bound port so we can cross-check
// against the reflected response.
let client_port = client_ep
.local_addr()
.expect("client local addr")
.port();
assert_ne!(client_port, 0, "client must have a real bound port");
// Start the mock relay's reflect handler.
let _relay_handle = spawn_mock_relay_with_reflect(Arc::clone(&server_transport)).await;
// Client sends Reflect and awaits the response. The real
// request_reflect helper in desktop/src-tauri/src/lib.rs uses a
// oneshot channel driven off the spawned recv loop; here we just
// do it inline because there's no spawned loop yet in this test
// — this isolates the wire protocol from the client-side state
// machine.
client_transport
.send_signal(&SignalMessage::Reflect)
.await
.expect("send Reflect");
let resp = tokio::time::timeout(Duration::from_secs(2), client_transport.recv_signal())
.await
.expect("reflect response should arrive within 2s")
.expect("recv_signal ok")
.expect("some message");
let observed_addr = match resp {
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
other => panic!("expected ReflectResponse, got {:?}", std::mem::discriminant(&other)),
};
let parsed: SocketAddr = observed_addr
.parse()
.expect("ReflectResponse.observed_addr must parse as SocketAddr");
// The relay should see the client on 127.0.0.1 (loopback in the
// test harness) and on the client's bound ephemeral port.
assert_eq!(parsed.ip().to_string(), "127.0.0.1");
assert_eq!(
parsed.port(),
client_port,
"reflected port must match the client's local_addr port"
);
drop(client_transport);
drop(server_transport);
}
// -----------------------------------------------------------------------
// Test 2: two clients get DIFFERENT reflected ports
// -----------------------------------------------------------------------
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn reflect_two_clients_distinct_ports() {
let _ = rustls::crypto::ring::default_provider().install_default();
// Shared server: one endpoint, two incoming accepts.
let (sc, _cert_der) = server_config();
let server_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
let server_ep = create_endpoint(server_addr, Some(sc)).expect("server endpoint");
let server_listen = server_ep.local_addr().expect("server local addr");
// Accept two clients in parallel.
let server_ep_a = server_ep.clone();
let accept_a = tokio::spawn(async move {
let conn = wzp_transport::accept(&server_ep_a).await.expect("accept A");
Arc::new(QuinnTransport::new(conn))
});
let server_ep_b = server_ep.clone();
let accept_b = tokio::spawn(async move {
let conn = wzp_transport::accept(&server_ep_b).await.expect("accept B");
Arc::new(QuinnTransport::new(conn))
});
// Client A
let client_ep_a = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep A");
let conn_a =
wzp_transport::connect(&client_ep_a, server_listen, "localhost", client_config())
.await
.expect("connect A");
let client_a = Arc::new(QuinnTransport::new(conn_a));
let port_a = client_ep_a.local_addr().unwrap().port();
// Client B
let client_ep_b = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep B");
let conn_b =
wzp_transport::connect(&client_ep_b, server_listen, "localhost", client_config())
.await
.expect("connect B");
let client_b = Arc::new(QuinnTransport::new(conn_b));
let port_b = client_ep_b.local_addr().unwrap().port();
assert_ne!(
port_a, port_b,
"preconditions: OS must assign two clients different ephemeral ports"
);
let server_a = accept_a.await.expect("join A");
let server_b = accept_b.await.expect("join B");
// Spawn a reflect handler for each server-side transport.
let _relay_a = spawn_mock_relay_with_reflect(Arc::clone(&server_a)).await;
let _relay_b = spawn_mock_relay_with_reflect(Arc::clone(&server_b)).await;
// Each client requests reflect concurrently.
let reflect_for = |t: Arc<QuinnTransport>| async move {
t.send_signal(&SignalMessage::Reflect).await.expect("send");
let resp = tokio::time::timeout(Duration::from_secs(2), t.recv_signal())
.await
.expect("timeout")
.expect("ok")
.expect("some");
match resp {
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
_ => panic!("wrong variant"),
}
};
let (addr_a, addr_b) = tokio::join!(reflect_for(client_a.clone()), reflect_for(client_b.clone()));
let parsed_a: SocketAddr = addr_a.parse().unwrap();
let parsed_b: SocketAddr = addr_b.parse().unwrap();
assert_eq!(parsed_a.port(), port_a, "client A's reflected port");
assert_eq!(parsed_b.port(), port_b, "client B's reflected port");
assert_ne!(
parsed_a.port(),
parsed_b.port(),
"each client must see its own port, not a shared one"
);
drop(client_a);
drop(client_b);
drop(server_a);
drop(server_b);
}
// -----------------------------------------------------------------------
// Test 3: old relay never answers — client times out cleanly
// -----------------------------------------------------------------------
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reflect_old_relay_times_out() {
let (client_transport, server_transport, _endpoints) =
connected_pair_with_port(0).await;
// Mock relay that ignores Reflect — simulates a pre-Phase-1 build.
let _relay_handle =
spawn_mock_relay_without_reflect(Arc::clone(&server_transport)).await;
client_transport
.send_signal(&SignalMessage::Reflect)
.await
.expect("send Reflect");
// 1100ms ceiling matches the 1s timeout baked into
// get_reflected_address plus a tiny bit of slack. If this
// regression ever fires it probably means recv_signal blocked
// longer than expected and the Tauri command would hang the UI.
let start = std::time::Instant::now();
let result =
tokio::time::timeout(Duration::from_millis(1100), client_transport.recv_signal()).await;
let elapsed = start.elapsed();
assert!(
result.is_err(),
"recv_signal must time out when the relay ignores Reflect"
);
assert!(
elapsed >= Duration::from_millis(1000),
"timeout fired too early ({:?})",
elapsed
);
assert!(
elapsed < Duration::from_millis(1200),
"timeout fired too late ({:?}), client would feel unresponsive",
elapsed
);
drop(client_transport);
drop(server_transport);
}

View File

@@ -185,6 +185,18 @@
<span class="fp-display">~/.wzp/identity</span> <span class="fp-display">~/.wzp/identity</span>
</div> </div>
</div> </div>
<div class="settings-section">
<h3>Network</h3>
<div class="setting-row">
<span class="setting-label">Public address</span>
<span id="s-reflected-addr" class="fp-display">(not queried)</span>
<button id="s-reflect-btn" class="secondary-btn">Detect</button>
</div>
<small style="color:var(--text-dim);display:block;margin-top:4px">
Asks the registered relay to echo back the IP:port it sees for this
connection (QUIC-native NAT reflection, replaces STUN).
</small>
</div>
<div class="settings-section"> <div class="settings-section">
<h3>Recent Rooms</h3> <h3>Recent Rooms</h3>
<div id="s-recent-rooms" class="recent-rooms-list"></div> <div id="s-recent-rooms" class="recent-rooms-list"></div>

View File

@@ -302,7 +302,14 @@ impl CallEngine {
where where
F: Fn(&str, &str) + Send + Sync + 'static, F: Fn(&str, &str) + Send + Sync + 'static,
{ {
info!(%relay, %room, %alias, %quality, has_reuse = reuse_endpoint.is_some(), "CallEngine::start (android) invoked"); // Single "call epoch" timestamp threaded through send + recv tasks
// so every milestone log can carry t_ms_since_call_start. Used to
// diagnose the first-join no-audio regression by giving us a clean
// ordering between audio_start, first capture, first recv, first
// decode, first playout-ring write, and the C++ Oboe first-callback
// logs (which already exist in cpp/oboe_bridge.cpp).
let call_t0 = std::time::Instant::now();
info!(%relay, %room, %alias, %quality, has_reuse = reuse_endpoint.is_some(), t_ms = 0u128, "CallEngine::start (android) invoked");
let _ = rustls::crypto::ring::default_provider().install_default(); let _ = rustls::crypto::ring::default_provider().install_default();
let relay_addr: SocketAddr = relay.parse()?; let relay_addr: SocketAddr = relay.parse()?;
@@ -348,7 +355,7 @@ impl CallEngine {
return Err(anyhow::anyhow!("QUIC connect timeout (10s)")); return Err(anyhow::anyhow!("QUIC connect timeout (10s)"));
} }
}; };
info!("QUIC connection established, performing handshake"); info!(t_ms = call_t0.elapsed().as_millis(), "first-join diag: QUIC connection established, performing handshake");
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
let _session = wzp_client::handshake::perform_handshake( let _session = wzp_client::handshake::perform_handshake(
@@ -358,7 +365,7 @@ impl CallEngine {
) )
.await .await
.map_err(|e| { error!("perform_handshake failed: {e}"); e })?; .map_err(|e| { error!("perform_handshake failed: {e}"); e })?;
info!("connected to relay, handshake complete"); info!(t_ms = call_t0.elapsed().as_millis(), "first-join diag: connected to relay, handshake complete");
event_cb("connected", &format!("joined room {room}")); event_cb("connected", &format!("joined room {room}"));
// Oboe audio via the wzp-native cdylib that was dlopen'd at // Oboe audio via the wzp-native cdylib that was dlopen'd at
@@ -370,10 +377,21 @@ impl CallEngine {
"wzp-native not loaded — dlopen failed at startup" "wzp-native not loaded — dlopen failed at startup"
)); ));
} }
let t_pre_audio = call_t0.elapsed().as_millis();
if let Err(code) = crate::wzp_native::audio_start() { if let Err(code) = crate::wzp_native::audio_start() {
return Err(anyhow::anyhow!("wzp_native_audio_start failed: code {code}")); return Err(anyhow::anyhow!("wzp_native_audio_start failed: code {code}"));
} }
info!("wzp-native audio started"); // Diagnostic: how long did audio_start() take, and at what
// wall-clock offset from CallEngine::start did it complete?
// Compare to the C++ "playout cb#0" log timestamp in logcat to
// see whether the Oboe playout callback fires before or after
// the recv task starts pushing decoded frames.
let t_audio_start_done = call_t0.elapsed().as_millis();
info!(
t_ms = t_audio_start_done,
audio_start_ms = t_audio_start_done.saturating_sub(t_pre_audio),
"first-join diag: wzp-native audio started"
);
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let mic_muted = Arc::new(AtomicBool::new(false)); let mic_muted = Arc::new(AtomicBool::new(false));
@@ -394,6 +412,7 @@ impl CallEngine {
let send_drops = Arc::new(AtomicU64::new(0)); let send_drops = Arc::new(AtomicU64::new(0));
let send_quality = quality.clone(); let send_quality = quality.clone();
let send_tx_codec = tx_codec.clone(); let send_tx_codec = tx_codec.clone();
let send_t0 = call_t0;
tokio::spawn(async move { tokio::spawn(async move {
let profile = resolve_quality(&send_quality); let profile = resolve_quality(&send_quality);
let config = match profile { let config = match profile {
@@ -409,7 +428,7 @@ impl CallEngine {
}, },
}; };
let frame_samples = (config.profile.frame_duration_ms as usize) * 48; let frame_samples = (config.profile.frame_duration_ms as usize) * 48;
info!(codec = ?config.profile.codec, frame_samples, "send task starting (android/oboe)"); info!(codec = ?config.profile.codec, frame_samples, t_ms = send_t0.elapsed().as_millis(), "first-join diag: send task spawned (android/oboe)");
*send_tx_codec.lock().await = format!("{:?}", config.profile.codec); *send_tx_codec.lock().await = format!("{:?}", config.profile.codec);
let mut encoder = CallEncoder::new(&config); let mut encoder = CallEncoder::new(&config);
encoder.set_aec_enabled(false); encoder.set_aec_enabled(false);
@@ -419,6 +438,13 @@ impl CallEngine {
let mut last_rms: u32 = 0; let mut last_rms: u32 = 0;
let mut last_pkt_bytes: usize = 0; let mut last_pkt_bytes: usize = 0;
let mut short_reads: u64 = 0; let mut short_reads: u64 = 0;
// First-join diagnostic: latch the wall-clock offset of the
// first full-frame capture read and the first non-zero RMS
// reading separately. The gap between them tells us how long
// Oboe input took to actually start delivering real samples
// after returning a "started" status from audio_start.
let mut first_full_read_logged = false;
let mut first_nonzero_rms_logged = false;
loop { loop {
if !send_r.load(Ordering::Relaxed) { if !send_r.load(Ordering::Relaxed) {
@@ -434,12 +460,29 @@ impl CallEngine {
tokio::time::sleep(std::time::Duration::from_millis(5)).await; tokio::time::sleep(std::time::Duration::from_millis(5)).await;
continue; continue;
} }
if !first_full_read_logged {
info!(
t_ms = send_t0.elapsed().as_millis(),
short_reads_before = short_reads,
frame_samples,
"first-join diag: send first full capture frame read"
);
first_full_read_logged = true;
}
// RMS for UI meter // RMS for UI meter
let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum(); let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum();
let rms = (sum_sq / buf.len() as f64).sqrt() as u32; let rms = (sum_sq / buf.len() as f64).sqrt() as u32;
send_level.store(rms, Ordering::Relaxed); send_level.store(rms, Ordering::Relaxed);
last_rms = rms; last_rms = rms;
if !first_nonzero_rms_logged && rms > 0 {
info!(
t_ms = send_t0.elapsed().as_millis(),
rms,
"first-join diag: send first non-zero capture RMS"
);
first_nonzero_rms_logged = true;
}
if send_mic.load(Ordering::Relaxed) { if send_mic.load(Ordering::Relaxed) {
buf.fill(0); buf.fill(0);
@@ -483,6 +526,7 @@ impl CallEngine {
let recv_spk = spk_muted.clone(); let recv_spk = spk_muted.clone();
let recv_fr = frames_received.clone(); let recv_fr = frames_received.clone();
let recv_rx_codec = rx_codec.clone(); let recv_rx_codec = rx_codec.clone();
let recv_t0 = call_t0;
tokio::spawn(async move { tokio::spawn(async move {
let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD); let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD);
// Phase 3b/3c: use concrete AdaptiveDecoder (not Box<dyn // Phase 3b/3c: use concrete AdaptiveDecoder (not Box<dyn
@@ -497,7 +541,11 @@ impl CallEngine {
// Phase 3b/3c DRED reconstruction state — see DredRecvState // Phase 3b/3c DRED reconstruction state — see DredRecvState
// above for the full flow. // above for the full flow.
let mut dred_recv = DredRecvState::new(); let mut dred_recv = DredRecvState::new();
info!(codec = ?current_codec, "recv task starting (android/oboe)"); info!(codec = ?current_codec, t_ms = recv_t0.elapsed().as_millis(), "first-join diag: recv task spawned (android/oboe)");
// First-join diagnostic latches — see send task above for the
// sibling capture milestones.
let mut first_decode_logged = false;
let mut first_playout_write_logged = false;
// ─── Decoded-PCM recorder (debug) ──────────────────────────── // ─── Decoded-PCM recorder (debug) ────────────────────────────
// Dumps the first ~10 seconds of post-AGC PCM to a raw i16 LE // Dumps the first ~10 seconds of post-AGC PCM to a raw i16 LE
@@ -546,7 +594,13 @@ impl CallEngine {
{ {
Ok(Ok(Some(pkt))) => { Ok(Ok(Some(pkt))) => {
if !first_packet_logged { if !first_packet_logged {
info!(codec_id = ?pkt.header.codec_id, payload_bytes = pkt.payload.len(), is_repair = pkt.header.is_repair, "recv: first media packet received"); info!(
t_ms = recv_t0.elapsed().as_millis(),
codec_id = ?pkt.header.codec_id,
payload_bytes = pkt.payload.len(),
is_repair = pkt.header.is_repair,
"first-join diag: recv first media packet"
);
first_packet_logged = true; first_packet_logged = true;
} }
if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise { if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
@@ -613,6 +667,15 @@ impl CallEngine {
Ok(n) => { Ok(n) => {
last_decode_n = n; last_decode_n = n;
decoded_frames += 1; decoded_frames += 1;
if !first_decode_logged {
info!(
t_ms = recv_t0.elapsed().as_millis(),
n,
codec = ?current_codec,
"first-join diag: recv first successful decode"
);
first_decode_logged = true;
}
// Log sample range for the first few decoded frames and periodically // Log sample range for the first few decoded frames and periodically
if decoded_frames <= 3 || decoded_frames % 100 == 0 { if decoded_frames <= 3 || decoded_frames % 100 == 0 {
let slice = &pcm[..n]; let slice = &pcm[..n];
@@ -663,6 +726,15 @@ impl CallEngine {
if !recv_spk.load(Ordering::Relaxed) { if !recv_spk.load(Ordering::Relaxed) {
let w = crate::wzp_native::audio_write_playout(&pcm[..n]); let w = crate::wzp_native::audio_write_playout(&pcm[..n]);
if !first_playout_write_logged {
info!(
t_ms = recv_t0.elapsed().as_millis(),
n,
w,
"first-join diag: recv first playout-ring write"
);
first_playout_write_logged = true;
}
last_written = w; last_written = w;
written_samples = written_samples.saturating_add(w as u64); written_samples = written_samples.saturating_add(w as u64);
if w < n && decoded_frames <= 10 { if w < n && decoded_frames <= 10 {

View File

@@ -465,6 +465,14 @@ struct SignalState {
incoming_call_id: Option<String>, incoming_call_id: Option<String>,
incoming_caller_fp: Option<String>, incoming_caller_fp: Option<String>,
incoming_caller_alias: Option<String>, incoming_caller_alias: Option<String>,
/// Pending `ReflectResponse` channel. When the `get_reflected_address`
/// Tauri command fires, it drops a `oneshot::Sender<SocketAddr>` here
/// before sending a `SignalMessage::Reflect`. The spawned recv loop
/// picks the response off the next bi-stream and fires the sender.
/// If another Reflect request comes in while one is pending, we
/// replace the sender — the old receiver sees a `Cancelled` error
/// and the caller retries.
pending_reflect: Option<tokio::sync::oneshot::Sender<std::net::SocketAddr>>,
} }
#[tauri::command] #[tauri::command]
@@ -542,6 +550,39 @@ async fn register_signal(
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None; let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None;
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"})); let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
} }
Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => {
// "STUN for QUIC" response — the relay told us our
// own server-reflexive address. If a Tauri command
// is currently awaiting this, fire the oneshot;
// otherwise log and drop (unsolicited responses
// from a confused relay shouldn't crash the loop).
tracing::info!(%observed_addr, "signal: ReflectResponse");
match observed_addr.parse::<std::net::SocketAddr>() {
Ok(parsed) => {
let mut sig = signal_state.lock().await;
if let Some(tx) = sig.pending_reflect.take() {
// `send` returns Err(addr) only if the
// receiver was dropped (caller timed out
// or canceled). Either way, nothing to
// do — the value is gone.
let _ = tx.send(parsed);
} else {
tracing::debug!(%observed_addr, "reflect: unsolicited response (no pending sender)");
}
let _ = app_clone.emit(
"signal-event",
serde_json::json!({"type":"reflect","observed_addr":observed_addr}),
);
}
Err(e) => {
tracing::warn!(%observed_addr, error = %e, "reflect: relay returned unparseable addr");
// Treat unparseable response as a failed
// request so the caller doesn't hang.
let mut sig = signal_state.lock().await;
let _ = sig.pending_reflect.take();
}
}
}
Ok(Some(other)) => { Ok(Some(other)) => {
tracing::debug!(?other, "signal: unhandled message"); tracing::debug!(?other, "signal: unhandled message");
} }
@@ -615,6 +656,69 @@ async fn answer_call(
Ok(()) Ok(())
} }
/// "STUN for QUIC" — ask the relay what our own public address looks
/// like from its side of the TLS-authenticated signal connection.
///
/// Wire flow:
/// 1. We install a `oneshot::Sender` in `SignalState.pending_reflect`
/// (replacing any stale one — last request wins).
/// 2. We release the state lock and send `SignalMessage::Reflect`
/// over the existing transport. The relay opens a fresh bi-stream
/// on its side to respond, which the spawned recv loop picks up.
/// 3. The recv loop's `ReflectResponse` match arm takes the sender
/// back out and fires it with the parsed `SocketAddr`.
/// 4. We await the receiver with a 1s timeout so a non-reflecting
/// relay (pre-Phase-1 build) doesn't hang the UI forever.
///
/// Returns the addr as a string so it can cross the Tauri IPC
/// boundary unchanged — JS-side can display it directly or parse it
/// with `new URL(...)` / a regex if needed.
#[tauri::command]
async fn get_reflected_address(
state: tauri::State<'_, Arc<AppState>>,
) -> Result<String, String> {
use wzp_proto::SignalMessage;
let (tx, rx) = tokio::sync::oneshot::channel::<std::net::SocketAddr>();
let transport = {
let mut sig = state.signal.lock().await;
// Drop any older pending sender — we don't support more than
// one in-flight Reflect per connection. A prior request whose
// receiver has timed out will be cleaned up here automatically.
sig.pending_reflect = Some(tx);
sig.transport
.as_ref()
.ok_or_else(|| "not registered".to_string())?
.clone()
};
if let Err(e) = transport.send_signal(&SignalMessage::Reflect).await {
// Clean up the pending sender so the next attempt doesn't see
// a stale channel. Re-acquire the lock inline since we already
// released it above to release `transport` back to the caller.
let mut sig = state.signal.lock().await;
sig.pending_reflect = None;
return Err(format!("send Reflect: {e}"));
}
// 1s is plenty for a same-datacenter relay (< 50ms RTT) and also
// the ceiling for "something's wrong, tell the user" — any older
// relay will never reply at all. 1100ms in the integration test.
match tokio::time::timeout(std::time::Duration::from_millis(1000), rx).await {
Ok(Ok(addr)) => Ok(addr.to_string()),
Ok(Err(_canceled)) => {
// The recv loop dropped the sender (relay returned
// unparseable addr, or loop exited mid-request).
Err("reflect channel canceled (signal loop exited or parse error)".into())
}
Err(_elapsed) => {
// Timeout — strip the pending sender so the next attempt
// starts clean. Old (pre-Phase-1) relays will land here.
let mut sig = state.signal.lock().await;
sig.pending_reflect = None;
Err("reflect timeout (relay may not support reflection)".into())
}
}
}
#[tauri::command] #[tauri::command]
async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<serde_json::Value, String> { async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<serde_json::Value, String> {
let sig = state.signal.lock().await; let sig = state.signal.lock().await;
@@ -651,6 +755,7 @@ pub fn run() {
signal: Arc::new(Mutex::new(SignalState { signal: Arc::new(Mutex::new(SignalState {
transport: None, endpoint: None, fingerprint: String::new(), signal_status: "idle".into(), transport: None, endpoint: None, fingerprint: String::new(), signal_status: "idle".into(),
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None, incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
pending_reflect: None,
})), })),
}); });
@@ -700,6 +805,7 @@ pub fn run() {
ping_relay, get_identity, get_app_info, ping_relay, get_identity, get_app_info,
connect, disconnect, toggle_mic, toggle_speaker, get_status, connect, disconnect, toggle_mic, toggle_speaker, get_status,
register_signal, place_call, answer_call, get_signal_status, register_signal, place_call, answer_call, get_signal_status,
get_reflected_address,
deregister, deregister,
set_speakerphone, is_speakerphone_on, set_speakerphone, is_speakerphone_on,
get_call_history, get_recent_contacts, clear_call_history, get_call_history, get_recent_contacts, clear_call_history,

View File

@@ -83,6 +83,8 @@ const sRoom = document.getElementById("s-room") as HTMLInputElement;
const sAlias = document.getElementById("s-alias") as HTMLInputElement; const sAlias = document.getElementById("s-alias") as HTMLInputElement;
const sOsAec = document.getElementById("s-os-aec") as HTMLInputElement; const sOsAec = document.getElementById("s-os-aec") as HTMLInputElement;
const sDredDebug = document.getElementById("s-dred-debug") as HTMLInputElement; const sDredDebug = document.getElementById("s-dred-debug") as HTMLInputElement;
const sReflectedAddr = document.getElementById("s-reflected-addr") as HTMLSpanElement;
const sReflectBtn = document.getElementById("s-reflect-btn") as HTMLButtonElement;
const sAgc = document.getElementById("s-agc") as HTMLInputElement; const sAgc = document.getElementById("s-agc") as HTMLInputElement;
const sQuality = document.getElementById("s-quality") as HTMLInputElement; const sQuality = document.getElementById("s-quality") as HTMLInputElement;
const sQualityLabel = document.getElementById("s-quality-label")!; const sQualityLabel = document.getElementById("s-quality-label")!;
@@ -757,6 +759,36 @@ function renderSettingsRecentRooms(rooms: RecentRoom[]) {
settingsBtnHome.addEventListener("click", openSettings); settingsBtnHome.addEventListener("click", openSettings);
settingsBtnCall.addEventListener("click", openSettings); settingsBtnCall.addEventListener("click", openSettings);
// "STUN for QUIC" — ask the registered relay for our own public
// address. Requires register_signal to have been run first
// (otherwise the Rust side returns "not registered"). The button
// shows its working state inline so the user knows it's waiting on
// the relay rather than the network.
sReflectBtn.addEventListener("click", async () => {
sReflectedAddr.textContent = "querying...";
sReflectBtn.disabled = true;
try {
const addr = await invoke<string>("get_reflected_address");
sReflectedAddr.textContent = addr;
sReflectedAddr.style.color = "var(--green)";
} catch (e: any) {
// Two main failure modes surfaced via the error string:
// - "not registered" — user hasn't registered
// against a relay yet
// - "reflect timeout (relay may not support reflection)"
// — old relay, pre-Phase-1
const msg = String(e);
sReflectedAddr.textContent = msg.includes("not registered")
? "⚠ register first"
: msg.includes("timeout")
? "⚠ relay does not support reflection"
: `${msg}`;
sReflectedAddr.style.color = "var(--yellow)";
} finally {
sReflectBtn.disabled = false;
}
});
settingsClose.addEventListener("click", closeSettings); settingsClose.addEventListener("click", closeSettings);
settingsPanel.addEventListener("click", (e) => { if (e.target === settingsPanel) closeSettings(); }); settingsPanel.addEventListener("click", (e) => { if (e.target === settingsPanel) closeSettings(); });