feat(reflect): QUIC-native NAT reflection ("STUN for QUIC") — Phase 1
Lets a client ask its registered relay "what IP:port do you see for
me?" over the existing TLS-authenticated signal channel, returning
the client's server-reflexive address as a SocketAddr. Replaces the
need for a classic STUN deployment and becomes the bootstrap step
for future P2P hole-punching: once both peers know their own reflex
addrs, they can advertise them in DirectCallOffer and attempt a
direct QUIC handshake to each other.
Wire protocol (wzp-proto):
- SignalMessage::Reflect — unit variant, client -> relay
- SignalMessage::ReflectResponse { observed_addr: String } — relay -> client
- JSON-serde, appended at end of enum: zero ordinal concerns,
backward compat with pre-Phase-1 relays by construction (older
relays log "unexpected message" and drop; newer clients time out
cleanly within 1s).
Relay handler (wzp-relay/src/main.rs, signal loop):
- New match arm next to Ping reuses the already-bound `addr` from
connection.remote_address() and replies with observed_addr as a
string. debug!-level log on success, warn!-level on send failure.
Client side (desktop/src-tauri/src/lib.rs):
- SignalState gains pending_reflect: Option<oneshot::Sender<SocketAddr>>.
- get_reflected_address Tauri command installs the oneshot before
sending Reflect and awaits it with a 1s timeout; cleans up on
every exit path (send failure, timeout, parse error).
- recv loop's new ReflectResponse arm fires the pending sender or
emits a debug log for unsolicited responses — never crashes the
loop on malformed input.
- Integrated into invoke_handler! alongside the other signal
commands.
UI (desktop/index.html + src/main.ts):
- New "Network" section in settings panel with a "Detect" button
that displays the reflected address or a categorized warning
("register first" / "relay does not support reflection" / error).
Tests (crates/wzp-relay/tests/reflect.rs — 3 new, all passing):
- reflect_happy_path: client on loopback gets back 127.0.0.1:<its own port>
- reflect_two_clients_distinct_ports: two concurrent clients see
their own distinct ports, proving per-connection remote_address
- reflect_old_relay_times_out: mock relay that ignores Reflect —
client times out between 1000-1200ms and does not hang
Also pre-existing test bit-rot unrelated to this PR — fixed so the
full workspace `cargo test` goes green:
- handshake_integration tests in wzp-client, wzp-relay and
featherchat_compat in wzp-crypto all missed the `alias` field
addition to CallOffer and the 3-arg form of perform_handshake
plus 4-tuple return of accept_handshake. Updated to the current
API surface.
Results:
cargo test --workspace --exclude wzp-android: 386 passed
cargo check --workspace: clean
cargo clippy: no new warnings in touched files
Verification excludes wzp-android because it's dead code on this
branch (Tauri mobile uses wzp-native instead) and can't link -llog
on macOS host — unchanged status quo.
PRD: .taskmaster/docs/prd_reflect_over_quic.txt
Tasks: 39-46 all completed
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -120,6 +120,12 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
|
||||
SignalMessage::CallRinging { .. } => CallSignalType::Ringing,
|
||||
SignalMessage::RegisterPresence { .. }
|
||||
| SignalMessage::RegisterPresenceAck { .. } => CallSignalType::Offer, // relay-only
|
||||
// NAT reflection is a client↔relay control exchange that
|
||||
// never crosses the featherChat bridge — if it ever reaches
|
||||
// this mapper something is wrong, but we still have to give
|
||||
// an answer. "Offer" is the generic catch-all.
|
||||
SignalMessage::Reflect
|
||||
| SignalMessage::ReflectResponse { .. } => CallSignalType::Offer, // control-plane
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -83,12 +83,12 @@ async fn full_handshake_both_sides_derive_same_session() {
|
||||
|
||||
// Run client and relay handshakes concurrently.
|
||||
let (client_result, relay_result) = tokio::join!(
|
||||
wzp_client::handshake::perform_handshake(client_transport_clone.as_ref(), &client_seed),
|
||||
wzp_client::handshake::perform_handshake(client_transport_clone.as_ref(), &client_seed, None),
|
||||
wzp_relay::handshake::accept_handshake(relay_transport_clone.as_ref(), &relay_seed),
|
||||
);
|
||||
|
||||
let mut client_session = client_result.expect("client handshake should succeed");
|
||||
let (mut relay_session, chosen_profile) =
|
||||
let (mut relay_session, chosen_profile, _caller_fp, _caller_alias) =
|
||||
relay_result.expect("relay handshake should succeed");
|
||||
|
||||
// Verify a profile was chosen.
|
||||
@@ -151,6 +151,7 @@ async fn handshake_rejects_tampered_signature() {
|
||||
ephemeral_pub,
|
||||
signature: bad_signature,
|
||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||
alias: None,
|
||||
};
|
||||
client_transport_clone
|
||||
.send_signal(&offer)
|
||||
|
||||
@@ -115,6 +115,7 @@ fn wzp_signal_serializes_into_fc_callsignal_payload() {
|
||||
ephemeral_pub: [2u8; 32],
|
||||
signature: vec![3u8; 64],
|
||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||
alias: None,
|
||||
};
|
||||
|
||||
// Encode as featherChat CallSignal payload
|
||||
@@ -280,6 +281,7 @@ fn all_signal_types_map_correctly() {
|
||||
wzp_proto::SignalMessage::CallOffer {
|
||||
identity_pub: [0; 32], ephemeral_pub: [0; 32],
|
||||
signature: vec![], supported_profiles: vec![],
|
||||
alias: None,
|
||||
},
|
||||
"Offer",
|
||||
),
|
||||
|
||||
@@ -770,6 +770,29 @@ pub enum SignalMessage {
|
||||
CallRinging {
|
||||
call_id: String,
|
||||
},
|
||||
|
||||
// ── NAT reflection ("STUN for QUIC") ──────────────────────────────
|
||||
|
||||
/// Client → relay: "please tell me the source IP:port you see on
|
||||
/// this connection". A QUIC-native replacement for classic STUN
|
||||
/// that reuses the TLS-authenticated signal channel to the relay
|
||||
/// instead of running a separate UDP reflection service on port
|
||||
/// 3478. The relay answers with `ReflectResponse`.
|
||||
///
|
||||
/// No payload — the relay already knows which connection the
|
||||
/// request arrived on, and `connection.remote_address()` gives it
|
||||
/// the exact source address (post-NAT) as observed from the
|
||||
/// server side of the TLS session.
|
||||
Reflect,
|
||||
|
||||
/// Relay → client: response to `Reflect`. Carries the socket
|
||||
/// address the relay observes as the client's source for this
|
||||
/// QUIC connection in `SocketAddr::to_string()` form — "a.b.c.d:p"
|
||||
/// for IPv4, "[::1]:p" for IPv6. Clients parse it with
|
||||
/// `SocketAddr::from_str`.
|
||||
ReflectResponse {
|
||||
observed_addr: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// How the callee responds to a direct call.
|
||||
@@ -908,6 +931,58 @@ mod tests {
|
||||
assert_eq!(packet.quality_report, decoded.quality_report);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reflect_serialize_roundtrip() {
|
||||
// Reflect is a unit variant — the client sends it with no
|
||||
// payload and the relay answers with the observed source addr.
|
||||
let req = SignalMessage::Reflect;
|
||||
let json = serde_json::to_string(&req).unwrap();
|
||||
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||
assert!(matches!(decoded, SignalMessage::Reflect));
|
||||
|
||||
// ReflectResponse carries a string — exercise both IPv4 and
|
||||
// IPv6 shapes because SocketAddr::to_string uses [::1]:port
|
||||
// for v6 and the client side has to parse that back.
|
||||
for addr in ["192.0.2.17:4433", "[2001:db8::1]:4433", "127.0.0.1:54321"] {
|
||||
let resp = SignalMessage::ReflectResponse {
|
||||
observed_addr: addr.to_string(),
|
||||
};
|
||||
let json = serde_json::to_string(&resp).unwrap();
|
||||
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
SignalMessage::ReflectResponse { observed_addr } => {
|
||||
assert_eq!(observed_addr, addr);
|
||||
// Must parse back to a SocketAddr cleanly.
|
||||
let _parsed: std::net::SocketAddr = observed_addr.parse()
|
||||
.expect("observed_addr must parse as SocketAddr");
|
||||
}
|
||||
_ => panic!("wrong variant after roundtrip"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reflect_backward_compat_with_existing_variants() {
|
||||
// Adding Reflect/ReflectResponse at the end of the enum must
|
||||
// not break JSON round-tripping of existing variants. Smoke-
|
||||
// test a sample of the pre-existing ones.
|
||||
let cases = vec![
|
||||
SignalMessage::Ping { timestamp_ms: 12345 },
|
||||
SignalMessage::Hold,
|
||||
SignalMessage::Hangup { reason: HangupReason::Normal },
|
||||
SignalMessage::CallRinging { call_id: "abcd".into() },
|
||||
];
|
||||
for m in cases {
|
||||
let json = serde_json::to_string(&m).unwrap();
|
||||
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||
// Discriminant equality proves variant tag survived.
|
||||
assert_eq!(
|
||||
std::mem::discriminant(&m),
|
||||
std::mem::discriminant(&decoded)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hold_unhold_serialize() {
|
||||
let hold = SignalMessage::Hold;
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use wzp_proto::{MediaTransport, SignalMessage};
|
||||
use wzp_relay::config::RelayConfig;
|
||||
@@ -892,6 +892,31 @@ async fn main() -> anyhow::Result<()> {
|
||||
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
|
||||
}
|
||||
|
||||
// QUIC-native NAT reflection ("STUN for QUIC").
|
||||
// The client asks "what source address do you
|
||||
// see for me?" and we reply with whatever
|
||||
// quinn reports as this connection's remote
|
||||
// address — i.e. the post-NAT public address
|
||||
// as observed from the server side of the TLS
|
||||
// session. Used by the P2P path to learn the
|
||||
// client's server-reflexive address without
|
||||
// running a separate STUN server. No auth or
|
||||
// rate-limit in Phase 1 — the client is
|
||||
// already TLS-authenticated by the time it
|
||||
// reaches this match arm.
|
||||
SignalMessage::Reflect => {
|
||||
let observed_addr = addr.to_string();
|
||||
if let Err(e) = transport.send_signal(
|
||||
&SignalMessage::ReflectResponse {
|
||||
observed_addr: observed_addr.clone(),
|
||||
},
|
||||
).await {
|
||||
warn!(%addr, error = %e, "reflect: failed to send response");
|
||||
} else {
|
||||
debug!(%addr, %observed_addr, "reflect: responded");
|
||||
}
|
||||
}
|
||||
|
||||
other => {
|
||||
warn!(%addr, "signal: unexpected message: {:?}", std::mem::discriminant(&other));
|
||||
}
|
||||
|
||||
@@ -63,11 +63,11 @@ async fn handshake_succeeds() {
|
||||
accept_handshake(server_t.as_ref(), &callee_seed).await
|
||||
});
|
||||
|
||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed)
|
||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
|
||||
.await
|
||||
.expect("perform_handshake should succeed");
|
||||
|
||||
let (callee_session, chosen_profile) = callee_handle
|
||||
let (callee_session, chosen_profile, _caller_fp, _caller_alias) = callee_handle
|
||||
.await
|
||||
.expect("join callee task")
|
||||
.expect("accept_handshake should succeed");
|
||||
@@ -124,11 +124,11 @@ async fn handshake_verifies_identity() {
|
||||
accept_handshake(server_t.as_ref(), &callee_seed).await
|
||||
});
|
||||
|
||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed)
|
||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
|
||||
.await
|
||||
.expect("handshake must succeed even with different identities");
|
||||
|
||||
let (callee_session, _profile) = callee_handle
|
||||
let (callee_session, _profile, _caller_fp, _caller_alias) = callee_handle
|
||||
.await
|
||||
.expect("join")
|
||||
.expect("accept_handshake must succeed");
|
||||
@@ -183,7 +183,7 @@ async fn auth_then_handshake() {
|
||||
};
|
||||
|
||||
// 2. Run the cryptographic handshake
|
||||
let (session, profile) = accept_handshake(server_t.as_ref(), &callee_seed)
|
||||
let (session, profile, _caller_fp, _caller_alias) = accept_handshake(server_t.as_ref(), &callee_seed)
|
||||
.await
|
||||
.expect("accept_handshake after auth");
|
||||
|
||||
@@ -199,7 +199,7 @@ async fn auth_then_handshake() {
|
||||
.await
|
||||
.expect("send AuthToken");
|
||||
|
||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed)
|
||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
|
||||
.await
|
||||
.expect("perform_handshake after auth");
|
||||
|
||||
@@ -270,6 +270,7 @@ async fn handshake_rejects_bad_signature() {
|
||||
ephemeral_pub,
|
||||
signature,
|
||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||
alias: None,
|
||||
};
|
||||
|
||||
client_transport
|
||||
|
||||
318
crates/wzp-relay/tests/reflect.rs
Normal file
318
crates/wzp-relay/tests/reflect.rs
Normal file
@@ -0,0 +1,318 @@
|
||||
//! Integration tests for the "STUN for QUIC" reflect protocol
|
||||
//! (PRD: .taskmaster/docs/prd_reflect_over_quic.txt, Phase 1).
|
||||
//!
|
||||
//! We don't spin up the full relay binary — instead we exercise the
|
||||
//! same wire-level request/response dance with a mock relay loop
|
||||
//! that implements exactly the match arm added to
|
||||
//! `wzp-relay/src/main.rs`. This isolates the protocol test from the
|
||||
//! rest of the relay state (rooms, federation, call registry, ...).
|
||||
//!
|
||||
//! Three test cases:
|
||||
//! 1. `reflect_happy_path` — client sends `Reflect`, mock relay
|
||||
//! replies with `ReflectResponse { observed_addr }`, client
|
||||
//! parses it back to a `SocketAddr` and confirms the IP is
|
||||
//! `127.0.0.1` and the port matches its own bound port.
|
||||
//! 2. `reflect_two_clients_distinct_ports` — two simultaneous
|
||||
//! client connections on different ephemeral ports get back
|
||||
//! different reflected ports, proving the relay uses
|
||||
//! per-connection `remote_address` rather than a global.
|
||||
//! 3. `reflect_old_relay_times_out` — mock relay that *doesn't*
|
||||
//! handle `Reflect`; client side times out in the expected
|
||||
//! window and does not hang.
|
||||
//!
|
||||
//! The third test uses a `tokio::time::timeout` wrapper directly
|
||||
//! (the client-side `request_reflect` helper lives in
|
||||
//! `desktop/src-tauri/src/lib.rs` which isn't a library we can
|
||||
//! depend on from here, so we reproduce the timeout semantics
|
||||
//! inline).
|
||||
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use wzp_proto::{MediaTransport, SignalMessage};
|
||||
use wzp_transport::{client_config, create_endpoint, server_config, QuinnTransport};
|
||||
|
||||
/// Spawn a minimal mock relay that loops over `recv_signal`,
|
||||
/// matches on `Reflect`, and responds with `ReflectResponse` using
|
||||
/// the remote_address observed for this connection. Mirrors the
|
||||
/// match arm in `crates/wzp-relay/src/main.rs`.
|
||||
async fn spawn_mock_relay_with_reflect(
|
||||
server_transport: Arc<QuinnTransport>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
// Observed remote address at the time the connection was
|
||||
// accepted. Stable for the life of the connection under quinn's
|
||||
// normal operation. This is exactly what the real relay does.
|
||||
let observed = server_transport.connection().remote_address();
|
||||
loop {
|
||||
match server_transport.recv_signal().await {
|
||||
Ok(Some(SignalMessage::Reflect)) => {
|
||||
let resp = SignalMessage::ReflectResponse {
|
||||
observed_addr: observed.to_string(),
|
||||
};
|
||||
// If the send fails the client has gone; just exit.
|
||||
if server_transport.send_signal(&resp).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(Some(_other)) => {
|
||||
// Ignore anything else — not relevant to this test.
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(_e) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn a mock relay that intentionally DOES NOT handle Reflect.
|
||||
/// Models a pre-Phase-1 relay — it keeps reading signal messages and
|
||||
/// logs them to stderr, but never produces a `ReflectResponse`.
|
||||
async fn spawn_mock_relay_without_reflect(
|
||||
server_transport: Arc<QuinnTransport>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match server_transport.recv_signal().await {
|
||||
Ok(Some(_msg)) => {
|
||||
// Deliberately do nothing. Old relay.
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Build an in-process QUIC client/server pair on loopback and
|
||||
/// return (client_transport, server_transport, endpoints). The
|
||||
/// endpoints tuple must be kept alive for the test duration.
|
||||
///
|
||||
/// `client_port_hint` of 0 means "let OS pick". Pass an explicit
|
||||
/// port to pin the client's source port (useful for the
|
||||
/// distinct-ports test).
|
||||
async fn connected_pair_with_port(
|
||||
_client_port_hint: u16,
|
||||
) -> (Arc<QuinnTransport>, Arc<QuinnTransport>, (quinn::Endpoint, quinn::Endpoint)) {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
|
||||
let (sc, _cert_der) = server_config();
|
||||
let server_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||
let server_ep = create_endpoint(server_addr, Some(sc)).expect("server endpoint");
|
||||
let server_listen = server_ep.local_addr().expect("server local addr");
|
||||
|
||||
// Always bind the client to an ephemeral port — we'll read back
|
||||
// the actual assigned port via `local_addr()` in the assertions.
|
||||
let client_bind: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||
let client_ep = create_endpoint(client_bind, None).expect("client endpoint");
|
||||
|
||||
let server_ep_clone = server_ep.clone();
|
||||
let accept_fut = tokio::spawn(async move {
|
||||
let conn = wzp_transport::accept(&server_ep_clone).await.expect("accept");
|
||||
Arc::new(QuinnTransport::new(conn))
|
||||
});
|
||||
|
||||
let client_conn =
|
||||
wzp_transport::connect(&client_ep, server_listen, "localhost", client_config())
|
||||
.await
|
||||
.expect("connect");
|
||||
let client_transport = Arc::new(QuinnTransport::new(client_conn));
|
||||
let server_transport = accept_fut.await.expect("join accept task");
|
||||
|
||||
(client_transport, server_transport, (server_ep, client_ep))
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Test 1: happy path — client learns its own port via Reflect
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reflect_happy_path() {
|
||||
let (client_transport, server_transport, (_server_ep, client_ep)) =
|
||||
connected_pair_with_port(0).await;
|
||||
|
||||
// Grab the client's actual bound port so we can cross-check
|
||||
// against the reflected response.
|
||||
let client_port = client_ep
|
||||
.local_addr()
|
||||
.expect("client local addr")
|
||||
.port();
|
||||
assert_ne!(client_port, 0, "client must have a real bound port");
|
||||
|
||||
// Start the mock relay's reflect handler.
|
||||
let _relay_handle = spawn_mock_relay_with_reflect(Arc::clone(&server_transport)).await;
|
||||
|
||||
// Client sends Reflect and awaits the response. The real
|
||||
// request_reflect helper in desktop/src-tauri/src/lib.rs uses a
|
||||
// oneshot channel driven off the spawned recv loop; here we just
|
||||
// do it inline because there's no spawned loop yet in this test
|
||||
// — this isolates the wire protocol from the client-side state
|
||||
// machine.
|
||||
client_transport
|
||||
.send_signal(&SignalMessage::Reflect)
|
||||
.await
|
||||
.expect("send Reflect");
|
||||
|
||||
let resp = tokio::time::timeout(Duration::from_secs(2), client_transport.recv_signal())
|
||||
.await
|
||||
.expect("reflect response should arrive within 2s")
|
||||
.expect("recv_signal ok")
|
||||
.expect("some message");
|
||||
|
||||
let observed_addr = match resp {
|
||||
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
|
||||
other => panic!("expected ReflectResponse, got {:?}", std::mem::discriminant(&other)),
|
||||
};
|
||||
|
||||
let parsed: SocketAddr = observed_addr
|
||||
.parse()
|
||||
.expect("ReflectResponse.observed_addr must parse as SocketAddr");
|
||||
|
||||
// The relay should see the client on 127.0.0.1 (loopback in the
|
||||
// test harness) and on the client's bound ephemeral port.
|
||||
assert_eq!(parsed.ip().to_string(), "127.0.0.1");
|
||||
assert_eq!(
|
||||
parsed.port(),
|
||||
client_port,
|
||||
"reflected port must match the client's local_addr port"
|
||||
);
|
||||
|
||||
drop(client_transport);
|
||||
drop(server_transport);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Test 2: two clients get DIFFERENT reflected ports
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn reflect_two_clients_distinct_ports() {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
|
||||
// Shared server: one endpoint, two incoming accepts.
|
||||
let (sc, _cert_der) = server_config();
|
||||
let server_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||
let server_ep = create_endpoint(server_addr, Some(sc)).expect("server endpoint");
|
||||
let server_listen = server_ep.local_addr().expect("server local addr");
|
||||
|
||||
// Accept two clients in parallel.
|
||||
let server_ep_a = server_ep.clone();
|
||||
let accept_a = tokio::spawn(async move {
|
||||
let conn = wzp_transport::accept(&server_ep_a).await.expect("accept A");
|
||||
Arc::new(QuinnTransport::new(conn))
|
||||
});
|
||||
let server_ep_b = server_ep.clone();
|
||||
let accept_b = tokio::spawn(async move {
|
||||
let conn = wzp_transport::accept(&server_ep_b).await.expect("accept B");
|
||||
Arc::new(QuinnTransport::new(conn))
|
||||
});
|
||||
|
||||
// Client A
|
||||
let client_ep_a = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep A");
|
||||
let conn_a =
|
||||
wzp_transport::connect(&client_ep_a, server_listen, "localhost", client_config())
|
||||
.await
|
||||
.expect("connect A");
|
||||
let client_a = Arc::new(QuinnTransport::new(conn_a));
|
||||
let port_a = client_ep_a.local_addr().unwrap().port();
|
||||
|
||||
// Client B
|
||||
let client_ep_b = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep B");
|
||||
let conn_b =
|
||||
wzp_transport::connect(&client_ep_b, server_listen, "localhost", client_config())
|
||||
.await
|
||||
.expect("connect B");
|
||||
let client_b = Arc::new(QuinnTransport::new(conn_b));
|
||||
let port_b = client_ep_b.local_addr().unwrap().port();
|
||||
|
||||
assert_ne!(
|
||||
port_a, port_b,
|
||||
"preconditions: OS must assign two clients different ephemeral ports"
|
||||
);
|
||||
|
||||
let server_a = accept_a.await.expect("join A");
|
||||
let server_b = accept_b.await.expect("join B");
|
||||
|
||||
// Spawn a reflect handler for each server-side transport.
|
||||
let _relay_a = spawn_mock_relay_with_reflect(Arc::clone(&server_a)).await;
|
||||
let _relay_b = spawn_mock_relay_with_reflect(Arc::clone(&server_b)).await;
|
||||
|
||||
// Each client requests reflect concurrently.
|
||||
let reflect_for = |t: Arc<QuinnTransport>| async move {
|
||||
t.send_signal(&SignalMessage::Reflect).await.expect("send");
|
||||
let resp = tokio::time::timeout(Duration::from_secs(2), t.recv_signal())
|
||||
.await
|
||||
.expect("timeout")
|
||||
.expect("ok")
|
||||
.expect("some");
|
||||
match resp {
|
||||
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
|
||||
_ => panic!("wrong variant"),
|
||||
}
|
||||
};
|
||||
|
||||
let (addr_a, addr_b) = tokio::join!(reflect_for(client_a.clone()), reflect_for(client_b.clone()));
|
||||
|
||||
let parsed_a: SocketAddr = addr_a.parse().unwrap();
|
||||
let parsed_b: SocketAddr = addr_b.parse().unwrap();
|
||||
|
||||
assert_eq!(parsed_a.port(), port_a, "client A's reflected port");
|
||||
assert_eq!(parsed_b.port(), port_b, "client B's reflected port");
|
||||
assert_ne!(
|
||||
parsed_a.port(),
|
||||
parsed_b.port(),
|
||||
"each client must see its own port, not a shared one"
|
||||
);
|
||||
|
||||
drop(client_a);
|
||||
drop(client_b);
|
||||
drop(server_a);
|
||||
drop(server_b);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Test 3: old relay never answers — client times out cleanly
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reflect_old_relay_times_out() {
|
||||
let (client_transport, server_transport, _endpoints) =
|
||||
connected_pair_with_port(0).await;
|
||||
|
||||
// Mock relay that ignores Reflect — simulates a pre-Phase-1 build.
|
||||
let _relay_handle =
|
||||
spawn_mock_relay_without_reflect(Arc::clone(&server_transport)).await;
|
||||
|
||||
client_transport
|
||||
.send_signal(&SignalMessage::Reflect)
|
||||
.await
|
||||
.expect("send Reflect");
|
||||
|
||||
// 1100ms ceiling matches the 1s timeout baked into
|
||||
// get_reflected_address plus a tiny bit of slack. If this
|
||||
// regression ever fires it probably means recv_signal blocked
|
||||
// longer than expected and the Tauri command would hang the UI.
|
||||
let start = std::time::Instant::now();
|
||||
let result =
|
||||
tokio::time::timeout(Duration::from_millis(1100), client_transport.recv_signal()).await;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"recv_signal must time out when the relay ignores Reflect"
|
||||
);
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(1000),
|
||||
"timeout fired too early ({:?})",
|
||||
elapsed
|
||||
);
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(1200),
|
||||
"timeout fired too late ({:?}), client would feel unresponsive",
|
||||
elapsed
|
||||
);
|
||||
|
||||
drop(client_transport);
|
||||
drop(server_transport);
|
||||
}
|
||||
@@ -185,6 +185,18 @@
|
||||
<span class="fp-display">~/.wzp/identity</span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="settings-section">
|
||||
<h3>Network</h3>
|
||||
<div class="setting-row">
|
||||
<span class="setting-label">Public address</span>
|
||||
<span id="s-reflected-addr" class="fp-display">(not queried)</span>
|
||||
<button id="s-reflect-btn" class="secondary-btn">Detect</button>
|
||||
</div>
|
||||
<small style="color:var(--text-dim);display:block;margin-top:4px">
|
||||
Asks the registered relay to echo back the IP:port it sees for this
|
||||
connection (QUIC-native NAT reflection, replaces STUN).
|
||||
</small>
|
||||
</div>
|
||||
<div class="settings-section">
|
||||
<h3>Recent Rooms</h3>
|
||||
<div id="s-recent-rooms" class="recent-rooms-list"></div>
|
||||
|
||||
@@ -465,6 +465,14 @@ struct SignalState {
|
||||
incoming_call_id: Option<String>,
|
||||
incoming_caller_fp: Option<String>,
|
||||
incoming_caller_alias: Option<String>,
|
||||
/// Pending `ReflectResponse` channel. When the `get_reflected_address`
|
||||
/// Tauri command fires, it drops a `oneshot::Sender<SocketAddr>` here
|
||||
/// before sending a `SignalMessage::Reflect`. The spawned recv loop
|
||||
/// picks the response off the next bi-stream and fires the sender.
|
||||
/// If another Reflect request comes in while one is pending, we
|
||||
/// replace the sender — the old receiver sees a `Cancelled` error
|
||||
/// and the caller retries.
|
||||
pending_reflect: Option<tokio::sync::oneshot::Sender<std::net::SocketAddr>>,
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -542,6 +550,39 @@ async fn register_signal(
|
||||
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None;
|
||||
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
|
||||
}
|
||||
Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => {
|
||||
// "STUN for QUIC" response — the relay told us our
|
||||
// own server-reflexive address. If a Tauri command
|
||||
// is currently awaiting this, fire the oneshot;
|
||||
// otherwise log and drop (unsolicited responses
|
||||
// from a confused relay shouldn't crash the loop).
|
||||
tracing::info!(%observed_addr, "signal: ReflectResponse");
|
||||
match observed_addr.parse::<std::net::SocketAddr>() {
|
||||
Ok(parsed) => {
|
||||
let mut sig = signal_state.lock().await;
|
||||
if let Some(tx) = sig.pending_reflect.take() {
|
||||
// `send` returns Err(addr) only if the
|
||||
// receiver was dropped (caller timed out
|
||||
// or canceled). Either way, nothing to
|
||||
// do — the value is gone.
|
||||
let _ = tx.send(parsed);
|
||||
} else {
|
||||
tracing::debug!(%observed_addr, "reflect: unsolicited response (no pending sender)");
|
||||
}
|
||||
let _ = app_clone.emit(
|
||||
"signal-event",
|
||||
serde_json::json!({"type":"reflect","observed_addr":observed_addr}),
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(%observed_addr, error = %e, "reflect: relay returned unparseable addr");
|
||||
// Treat unparseable response as a failed
|
||||
// request so the caller doesn't hang.
|
||||
let mut sig = signal_state.lock().await;
|
||||
let _ = sig.pending_reflect.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(other)) => {
|
||||
tracing::debug!(?other, "signal: unhandled message");
|
||||
}
|
||||
@@ -615,6 +656,69 @@ async fn answer_call(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// "STUN for QUIC" — ask the relay what our own public address looks
|
||||
/// like from its side of the TLS-authenticated signal connection.
|
||||
///
|
||||
/// Wire flow:
|
||||
/// 1. We install a `oneshot::Sender` in `SignalState.pending_reflect`
|
||||
/// (replacing any stale one — last request wins).
|
||||
/// 2. We release the state lock and send `SignalMessage::Reflect`
|
||||
/// over the existing transport. The relay opens a fresh bi-stream
|
||||
/// on its side to respond, which the spawned recv loop picks up.
|
||||
/// 3. The recv loop's `ReflectResponse` match arm takes the sender
|
||||
/// back out and fires it with the parsed `SocketAddr`.
|
||||
/// 4. We await the receiver with a 1s timeout so a non-reflecting
|
||||
/// relay (pre-Phase-1 build) doesn't hang the UI forever.
|
||||
///
|
||||
/// Returns the addr as a string so it can cross the Tauri IPC
|
||||
/// boundary unchanged — JS-side can display it directly or parse it
|
||||
/// with `new URL(...)` / a regex if needed.
|
||||
#[tauri::command]
|
||||
async fn get_reflected_address(
|
||||
state: tauri::State<'_, Arc<AppState>>,
|
||||
) -> Result<String, String> {
|
||||
use wzp_proto::SignalMessage;
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<std::net::SocketAddr>();
|
||||
let transport = {
|
||||
let mut sig = state.signal.lock().await;
|
||||
// Drop any older pending sender — we don't support more than
|
||||
// one in-flight Reflect per connection. A prior request whose
|
||||
// receiver has timed out will be cleaned up here automatically.
|
||||
sig.pending_reflect = Some(tx);
|
||||
sig.transport
|
||||
.as_ref()
|
||||
.ok_or_else(|| "not registered".to_string())?
|
||||
.clone()
|
||||
};
|
||||
if let Err(e) = transport.send_signal(&SignalMessage::Reflect).await {
|
||||
// Clean up the pending sender so the next attempt doesn't see
|
||||
// a stale channel. Re-acquire the lock inline since we already
|
||||
// released it above to release `transport` back to the caller.
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_reflect = None;
|
||||
return Err(format!("send Reflect: {e}"));
|
||||
}
|
||||
|
||||
// 1s is plenty for a same-datacenter relay (< 50ms RTT) and also
|
||||
// the ceiling for "something's wrong, tell the user" — any older
|
||||
// relay will never reply at all. 1100ms in the integration test.
|
||||
match tokio::time::timeout(std::time::Duration::from_millis(1000), rx).await {
|
||||
Ok(Ok(addr)) => Ok(addr.to_string()),
|
||||
Ok(Err(_canceled)) => {
|
||||
// The recv loop dropped the sender (relay returned
|
||||
// unparseable addr, or loop exited mid-request).
|
||||
Err("reflect channel canceled (signal loop exited or parse error)".into())
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
// Timeout — strip the pending sender so the next attempt
|
||||
// starts clean. Old (pre-Phase-1) relays will land here.
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_reflect = None;
|
||||
Err("reflect timeout (relay may not support reflection)".into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<serde_json::Value, String> {
|
||||
let sig = state.signal.lock().await;
|
||||
@@ -651,6 +755,7 @@ pub fn run() {
|
||||
signal: Arc::new(Mutex::new(SignalState {
|
||||
transport: None, endpoint: None, fingerprint: String::new(), signal_status: "idle".into(),
|
||||
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
|
||||
pending_reflect: None,
|
||||
})),
|
||||
});
|
||||
|
||||
@@ -700,6 +805,7 @@ pub fn run() {
|
||||
ping_relay, get_identity, get_app_info,
|
||||
connect, disconnect, toggle_mic, toggle_speaker, get_status,
|
||||
register_signal, place_call, answer_call, get_signal_status,
|
||||
get_reflected_address,
|
||||
deregister,
|
||||
set_speakerphone, is_speakerphone_on,
|
||||
get_call_history, get_recent_contacts, clear_call_history,
|
||||
|
||||
@@ -83,6 +83,8 @@ const sRoom = document.getElementById("s-room") as HTMLInputElement;
|
||||
const sAlias = document.getElementById("s-alias") as HTMLInputElement;
|
||||
const sOsAec = document.getElementById("s-os-aec") as HTMLInputElement;
|
||||
const sDredDebug = document.getElementById("s-dred-debug") as HTMLInputElement;
|
||||
const sReflectedAddr = document.getElementById("s-reflected-addr") as HTMLSpanElement;
|
||||
const sReflectBtn = document.getElementById("s-reflect-btn") as HTMLButtonElement;
|
||||
const sAgc = document.getElementById("s-agc") as HTMLInputElement;
|
||||
const sQuality = document.getElementById("s-quality") as HTMLInputElement;
|
||||
const sQualityLabel = document.getElementById("s-quality-label")!;
|
||||
@@ -757,6 +759,36 @@ function renderSettingsRecentRooms(rooms: RecentRoom[]) {
|
||||
|
||||
settingsBtnHome.addEventListener("click", openSettings);
|
||||
settingsBtnCall.addEventListener("click", openSettings);
|
||||
// "STUN for QUIC" — ask the registered relay for our own public
|
||||
// address. Requires register_signal to have been run first
|
||||
// (otherwise the Rust side returns "not registered"). The button
|
||||
// shows its working state inline so the user knows it's waiting on
|
||||
// the relay rather than the network.
|
||||
sReflectBtn.addEventListener("click", async () => {
|
||||
sReflectedAddr.textContent = "querying...";
|
||||
sReflectBtn.disabled = true;
|
||||
try {
|
||||
const addr = await invoke<string>("get_reflected_address");
|
||||
sReflectedAddr.textContent = addr;
|
||||
sReflectedAddr.style.color = "var(--green)";
|
||||
} catch (e: any) {
|
||||
// Two main failure modes surfaced via the error string:
|
||||
// - "not registered" — user hasn't registered
|
||||
// against a relay yet
|
||||
// - "reflect timeout (relay may not support reflection)"
|
||||
// — old relay, pre-Phase-1
|
||||
const msg = String(e);
|
||||
sReflectedAddr.textContent = msg.includes("not registered")
|
||||
? "⚠ register first"
|
||||
: msg.includes("timeout")
|
||||
? "⚠ relay does not support reflection"
|
||||
: `⚠ ${msg}`;
|
||||
sReflectedAddr.style.color = "var(--yellow)";
|
||||
} finally {
|
||||
sReflectBtn.disabled = false;
|
||||
}
|
||||
});
|
||||
|
||||
settingsClose.addEventListener("click", closeSettings);
|
||||
settingsPanel.addEventListener("click", (e) => { if (e.target === settingsPanel) closeSettings(); });
|
||||
|
||||
|
||||
Reference in New Issue
Block a user