Compare commits
2 Commits
578ff8cff4
...
921856eba9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
921856eba9 | ||
|
|
7e7968b2f9 |
@@ -120,6 +120,12 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
|
|||||||
SignalMessage::CallRinging { .. } => CallSignalType::Ringing,
|
SignalMessage::CallRinging { .. } => CallSignalType::Ringing,
|
||||||
SignalMessage::RegisterPresence { .. }
|
SignalMessage::RegisterPresence { .. }
|
||||||
| SignalMessage::RegisterPresenceAck { .. } => CallSignalType::Offer, // relay-only
|
| SignalMessage::RegisterPresenceAck { .. } => CallSignalType::Offer, // relay-only
|
||||||
|
// NAT reflection is a client↔relay control exchange that
|
||||||
|
// never crosses the featherChat bridge — if it ever reaches
|
||||||
|
// this mapper something is wrong, but we still have to give
|
||||||
|
// an answer. "Offer" is the generic catch-all.
|
||||||
|
SignalMessage::Reflect
|
||||||
|
| SignalMessage::ReflectResponse { .. } => CallSignalType::Offer, // control-plane
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -83,12 +83,12 @@ async fn full_handshake_both_sides_derive_same_session() {
|
|||||||
|
|
||||||
// Run client and relay handshakes concurrently.
|
// Run client and relay handshakes concurrently.
|
||||||
let (client_result, relay_result) = tokio::join!(
|
let (client_result, relay_result) = tokio::join!(
|
||||||
wzp_client::handshake::perform_handshake(client_transport_clone.as_ref(), &client_seed),
|
wzp_client::handshake::perform_handshake(client_transport_clone.as_ref(), &client_seed, None),
|
||||||
wzp_relay::handshake::accept_handshake(relay_transport_clone.as_ref(), &relay_seed),
|
wzp_relay::handshake::accept_handshake(relay_transport_clone.as_ref(), &relay_seed),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut client_session = client_result.expect("client handshake should succeed");
|
let mut client_session = client_result.expect("client handshake should succeed");
|
||||||
let (mut relay_session, chosen_profile) =
|
let (mut relay_session, chosen_profile, _caller_fp, _caller_alias) =
|
||||||
relay_result.expect("relay handshake should succeed");
|
relay_result.expect("relay handshake should succeed");
|
||||||
|
|
||||||
// Verify a profile was chosen.
|
// Verify a profile was chosen.
|
||||||
@@ -151,6 +151,7 @@ async fn handshake_rejects_tampered_signature() {
|
|||||||
ephemeral_pub,
|
ephemeral_pub,
|
||||||
signature: bad_signature,
|
signature: bad_signature,
|
||||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||||
|
alias: None,
|
||||||
};
|
};
|
||||||
client_transport_clone
|
client_transport_clone
|
||||||
.send_signal(&offer)
|
.send_signal(&offer)
|
||||||
|
|||||||
@@ -115,6 +115,7 @@ fn wzp_signal_serializes_into_fc_callsignal_payload() {
|
|||||||
ephemeral_pub: [2u8; 32],
|
ephemeral_pub: [2u8; 32],
|
||||||
signature: vec![3u8; 64],
|
signature: vec![3u8; 64],
|
||||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||||
|
alias: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Encode as featherChat CallSignal payload
|
// Encode as featherChat CallSignal payload
|
||||||
@@ -280,6 +281,7 @@ fn all_signal_types_map_correctly() {
|
|||||||
wzp_proto::SignalMessage::CallOffer {
|
wzp_proto::SignalMessage::CallOffer {
|
||||||
identity_pub: [0; 32], ephemeral_pub: [0; 32],
|
identity_pub: [0; 32], ephemeral_pub: [0; 32],
|
||||||
signature: vec![], supported_profiles: vec![],
|
signature: vec![], supported_profiles: vec![],
|
||||||
|
alias: None,
|
||||||
},
|
},
|
||||||
"Offer",
|
"Offer",
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -770,6 +770,29 @@ pub enum SignalMessage {
|
|||||||
CallRinging {
|
CallRinging {
|
||||||
call_id: String,
|
call_id: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// ── NAT reflection ("STUN for QUIC") ──────────────────────────────
|
||||||
|
|
||||||
|
/// Client → relay: "please tell me the source IP:port you see on
|
||||||
|
/// this connection". A QUIC-native replacement for classic STUN
|
||||||
|
/// that reuses the TLS-authenticated signal channel to the relay
|
||||||
|
/// instead of running a separate UDP reflection service on port
|
||||||
|
/// 3478. The relay answers with `ReflectResponse`.
|
||||||
|
///
|
||||||
|
/// No payload — the relay already knows which connection the
|
||||||
|
/// request arrived on, and `connection.remote_address()` gives it
|
||||||
|
/// the exact source address (post-NAT) as observed from the
|
||||||
|
/// server side of the TLS session.
|
||||||
|
Reflect,
|
||||||
|
|
||||||
|
/// Relay → client: response to `Reflect`. Carries the socket
|
||||||
|
/// address the relay observes as the client's source for this
|
||||||
|
/// QUIC connection in `SocketAddr::to_string()` form — "a.b.c.d:p"
|
||||||
|
/// for IPv4, "[::1]:p" for IPv6. Clients parse it with
|
||||||
|
/// `SocketAddr::from_str`.
|
||||||
|
ReflectResponse {
|
||||||
|
observed_addr: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// How the callee responds to a direct call.
|
/// How the callee responds to a direct call.
|
||||||
@@ -908,6 +931,58 @@ mod tests {
|
|||||||
assert_eq!(packet.quality_report, decoded.quality_report);
|
assert_eq!(packet.quality_report, decoded.quality_report);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reflect_serialize_roundtrip() {
|
||||||
|
// Reflect is a unit variant — the client sends it with no
|
||||||
|
// payload and the relay answers with the observed source addr.
|
||||||
|
let req = SignalMessage::Reflect;
|
||||||
|
let json = serde_json::to_string(&req).unwrap();
|
||||||
|
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
assert!(matches!(decoded, SignalMessage::Reflect));
|
||||||
|
|
||||||
|
// ReflectResponse carries a string — exercise both IPv4 and
|
||||||
|
// IPv6 shapes because SocketAddr::to_string uses [::1]:port
|
||||||
|
// for v6 and the client side has to parse that back.
|
||||||
|
for addr in ["192.0.2.17:4433", "[2001:db8::1]:4433", "127.0.0.1:54321"] {
|
||||||
|
let resp = SignalMessage::ReflectResponse {
|
||||||
|
observed_addr: addr.to_string(),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&resp).unwrap();
|
||||||
|
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
SignalMessage::ReflectResponse { observed_addr } => {
|
||||||
|
assert_eq!(observed_addr, addr);
|
||||||
|
// Must parse back to a SocketAddr cleanly.
|
||||||
|
let _parsed: std::net::SocketAddr = observed_addr.parse()
|
||||||
|
.expect("observed_addr must parse as SocketAddr");
|
||||||
|
}
|
||||||
|
_ => panic!("wrong variant after roundtrip"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reflect_backward_compat_with_existing_variants() {
|
||||||
|
// Adding Reflect/ReflectResponse at the end of the enum must
|
||||||
|
// not break JSON round-tripping of existing variants. Smoke-
|
||||||
|
// test a sample of the pre-existing ones.
|
||||||
|
let cases = vec![
|
||||||
|
SignalMessage::Ping { timestamp_ms: 12345 },
|
||||||
|
SignalMessage::Hold,
|
||||||
|
SignalMessage::Hangup { reason: HangupReason::Normal },
|
||||||
|
SignalMessage::CallRinging { call_id: "abcd".into() },
|
||||||
|
];
|
||||||
|
for m in cases {
|
||||||
|
let json = serde_json::to_string(&m).unwrap();
|
||||||
|
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
// Discriminant equality proves variant tag survived.
|
||||||
|
assert_eq!(
|
||||||
|
std::mem::discriminant(&m),
|
||||||
|
std::mem::discriminant(&decoded)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn hold_unhold_serialize() {
|
fn hold_unhold_serialize() {
|
||||||
let hold = SignalMessage::Hold;
|
let hold = SignalMessage::Hold;
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use wzp_proto::{MediaTransport, SignalMessage};
|
use wzp_proto::{MediaTransport, SignalMessage};
|
||||||
use wzp_relay::config::RelayConfig;
|
use wzp_relay::config::RelayConfig;
|
||||||
@@ -892,6 +892,31 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
|
let _ = transport.send_signal(&SignalMessage::Pong { timestamp_ms }).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QUIC-native NAT reflection ("STUN for QUIC").
|
||||||
|
// The client asks "what source address do you
|
||||||
|
// see for me?" and we reply with whatever
|
||||||
|
// quinn reports as this connection's remote
|
||||||
|
// address — i.e. the post-NAT public address
|
||||||
|
// as observed from the server side of the TLS
|
||||||
|
// session. Used by the P2P path to learn the
|
||||||
|
// client's server-reflexive address without
|
||||||
|
// running a separate STUN server. No auth or
|
||||||
|
// rate-limit in Phase 1 — the client is
|
||||||
|
// already TLS-authenticated by the time it
|
||||||
|
// reaches this match arm.
|
||||||
|
SignalMessage::Reflect => {
|
||||||
|
let observed_addr = addr.to_string();
|
||||||
|
if let Err(e) = transport.send_signal(
|
||||||
|
&SignalMessage::ReflectResponse {
|
||||||
|
observed_addr: observed_addr.clone(),
|
||||||
|
},
|
||||||
|
).await {
|
||||||
|
warn!(%addr, error = %e, "reflect: failed to send response");
|
||||||
|
} else {
|
||||||
|
debug!(%addr, %observed_addr, "reflect: responded");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
other => {
|
other => {
|
||||||
warn!(%addr, "signal: unexpected message: {:?}", std::mem::discriminant(&other));
|
warn!(%addr, "signal: unexpected message: {:?}", std::mem::discriminant(&other));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,11 +63,11 @@ async fn handshake_succeeds() {
|
|||||||
accept_handshake(server_t.as_ref(), &callee_seed).await
|
accept_handshake(server_t.as_ref(), &callee_seed).await
|
||||||
});
|
});
|
||||||
|
|
||||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed)
|
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
|
||||||
.await
|
.await
|
||||||
.expect("perform_handshake should succeed");
|
.expect("perform_handshake should succeed");
|
||||||
|
|
||||||
let (callee_session, chosen_profile) = callee_handle
|
let (callee_session, chosen_profile, _caller_fp, _caller_alias) = callee_handle
|
||||||
.await
|
.await
|
||||||
.expect("join callee task")
|
.expect("join callee task")
|
||||||
.expect("accept_handshake should succeed");
|
.expect("accept_handshake should succeed");
|
||||||
@@ -124,11 +124,11 @@ async fn handshake_verifies_identity() {
|
|||||||
accept_handshake(server_t.as_ref(), &callee_seed).await
|
accept_handshake(server_t.as_ref(), &callee_seed).await
|
||||||
});
|
});
|
||||||
|
|
||||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed)
|
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
|
||||||
.await
|
.await
|
||||||
.expect("handshake must succeed even with different identities");
|
.expect("handshake must succeed even with different identities");
|
||||||
|
|
||||||
let (callee_session, _profile) = callee_handle
|
let (callee_session, _profile, _caller_fp, _caller_alias) = callee_handle
|
||||||
.await
|
.await
|
||||||
.expect("join")
|
.expect("join")
|
||||||
.expect("accept_handshake must succeed");
|
.expect("accept_handshake must succeed");
|
||||||
@@ -183,7 +183,7 @@ async fn auth_then_handshake() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// 2. Run the cryptographic handshake
|
// 2. Run the cryptographic handshake
|
||||||
let (session, profile) = accept_handshake(server_t.as_ref(), &callee_seed)
|
let (session, profile, _caller_fp, _caller_alias) = accept_handshake(server_t.as_ref(), &callee_seed)
|
||||||
.await
|
.await
|
||||||
.expect("accept_handshake after auth");
|
.expect("accept_handshake after auth");
|
||||||
|
|
||||||
@@ -199,7 +199,7 @@ async fn auth_then_handshake() {
|
|||||||
.await
|
.await
|
||||||
.expect("send AuthToken");
|
.expect("send AuthToken");
|
||||||
|
|
||||||
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed)
|
let caller_session = perform_handshake(client_transport.as_ref(), &caller_seed, None)
|
||||||
.await
|
.await
|
||||||
.expect("perform_handshake after auth");
|
.expect("perform_handshake after auth");
|
||||||
|
|
||||||
@@ -270,6 +270,7 @@ async fn handshake_rejects_bad_signature() {
|
|||||||
ephemeral_pub,
|
ephemeral_pub,
|
||||||
signature,
|
signature,
|
||||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||||
|
alias: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
client_transport
|
client_transport
|
||||||
|
|||||||
318
crates/wzp-relay/tests/reflect.rs
Normal file
318
crates/wzp-relay/tests/reflect.rs
Normal file
@@ -0,0 +1,318 @@
|
|||||||
|
//! Integration tests for the "STUN for QUIC" reflect protocol
|
||||||
|
//! (PRD: .taskmaster/docs/prd_reflect_over_quic.txt, Phase 1).
|
||||||
|
//!
|
||||||
|
//! We don't spin up the full relay binary — instead we exercise the
|
||||||
|
//! same wire-level request/response dance with a mock relay loop
|
||||||
|
//! that implements exactly the match arm added to
|
||||||
|
//! `wzp-relay/src/main.rs`. This isolates the protocol test from the
|
||||||
|
//! rest of the relay state (rooms, federation, call registry, ...).
|
||||||
|
//!
|
||||||
|
//! Three test cases:
|
||||||
|
//! 1. `reflect_happy_path` — client sends `Reflect`, mock relay
|
||||||
|
//! replies with `ReflectResponse { observed_addr }`, client
|
||||||
|
//! parses it back to a `SocketAddr` and confirms the IP is
|
||||||
|
//! `127.0.0.1` and the port matches its own bound port.
|
||||||
|
//! 2. `reflect_two_clients_distinct_ports` — two simultaneous
|
||||||
|
//! client connections on different ephemeral ports get back
|
||||||
|
//! different reflected ports, proving the relay uses
|
||||||
|
//! per-connection `remote_address` rather than a global.
|
||||||
|
//! 3. `reflect_old_relay_times_out` — mock relay that *doesn't*
|
||||||
|
//! handle `Reflect`; client side times out in the expected
|
||||||
|
//! window and does not hang.
|
||||||
|
//!
|
||||||
|
//! The third test uses a `tokio::time::timeout` wrapper directly
|
||||||
|
//! (the client-side `request_reflect` helper lives in
|
||||||
|
//! `desktop/src-tauri/src/lib.rs` which isn't a library we can
|
||||||
|
//! depend on from here, so we reproduce the timeout semantics
|
||||||
|
//! inline).
|
||||||
|
|
||||||
|
use std::net::{Ipv4Addr, SocketAddr};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use wzp_proto::{MediaTransport, SignalMessage};
|
||||||
|
use wzp_transport::{client_config, create_endpoint, server_config, QuinnTransport};
|
||||||
|
|
||||||
|
/// Spawn a minimal mock relay that loops over `recv_signal`,
|
||||||
|
/// matches on `Reflect`, and responds with `ReflectResponse` using
|
||||||
|
/// the remote_address observed for this connection. Mirrors the
|
||||||
|
/// match arm in `crates/wzp-relay/src/main.rs`.
|
||||||
|
async fn spawn_mock_relay_with_reflect(
|
||||||
|
server_transport: Arc<QuinnTransport>,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Observed remote address at the time the connection was
|
||||||
|
// accepted. Stable for the life of the connection under quinn's
|
||||||
|
// normal operation. This is exactly what the real relay does.
|
||||||
|
let observed = server_transport.connection().remote_address();
|
||||||
|
loop {
|
||||||
|
match server_transport.recv_signal().await {
|
||||||
|
Ok(Some(SignalMessage::Reflect)) => {
|
||||||
|
let resp = SignalMessage::ReflectResponse {
|
||||||
|
observed_addr: observed.to_string(),
|
||||||
|
};
|
||||||
|
// If the send fails the client has gone; just exit.
|
||||||
|
if server_transport.send_signal(&resp).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(_other)) => {
|
||||||
|
// Ignore anything else — not relevant to this test.
|
||||||
|
}
|
||||||
|
Ok(None) => break,
|
||||||
|
Err(_e) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a mock relay that intentionally DOES NOT handle Reflect.
|
||||||
|
/// Models a pre-Phase-1 relay — it keeps reading signal messages and
|
||||||
|
/// logs them to stderr, but never produces a `ReflectResponse`.
|
||||||
|
async fn spawn_mock_relay_without_reflect(
|
||||||
|
server_transport: Arc<QuinnTransport>,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match server_transport.recv_signal().await {
|
||||||
|
Ok(Some(_msg)) => {
|
||||||
|
// Deliberately do nothing. Old relay.
|
||||||
|
}
|
||||||
|
Ok(None) => break,
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build an in-process QUIC client/server pair on loopback and
|
||||||
|
/// return (client_transport, server_transport, endpoints). The
|
||||||
|
/// endpoints tuple must be kept alive for the test duration.
|
||||||
|
///
|
||||||
|
/// `client_port_hint` of 0 means "let OS pick". Pass an explicit
|
||||||
|
/// port to pin the client's source port (useful for the
|
||||||
|
/// distinct-ports test).
|
||||||
|
async fn connected_pair_with_port(
|
||||||
|
_client_port_hint: u16,
|
||||||
|
) -> (Arc<QuinnTransport>, Arc<QuinnTransport>, (quinn::Endpoint, quinn::Endpoint)) {
|
||||||
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
|
let (sc, _cert_der) = server_config();
|
||||||
|
let server_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||||
|
let server_ep = create_endpoint(server_addr, Some(sc)).expect("server endpoint");
|
||||||
|
let server_listen = server_ep.local_addr().expect("server local addr");
|
||||||
|
|
||||||
|
// Always bind the client to an ephemeral port — we'll read back
|
||||||
|
// the actual assigned port via `local_addr()` in the assertions.
|
||||||
|
let client_bind: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||||
|
let client_ep = create_endpoint(client_bind, None).expect("client endpoint");
|
||||||
|
|
||||||
|
let server_ep_clone = server_ep.clone();
|
||||||
|
let accept_fut = tokio::spawn(async move {
|
||||||
|
let conn = wzp_transport::accept(&server_ep_clone).await.expect("accept");
|
||||||
|
Arc::new(QuinnTransport::new(conn))
|
||||||
|
});
|
||||||
|
|
||||||
|
let client_conn =
|
||||||
|
wzp_transport::connect(&client_ep, server_listen, "localhost", client_config())
|
||||||
|
.await
|
||||||
|
.expect("connect");
|
||||||
|
let client_transport = Arc::new(QuinnTransport::new(client_conn));
|
||||||
|
let server_transport = accept_fut.await.expect("join accept task");
|
||||||
|
|
||||||
|
(client_transport, server_transport, (server_ep, client_ep))
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------
|
||||||
|
// Test 1: happy path — client learns its own port via Reflect
|
||||||
|
// -----------------------------------------------------------------------
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn reflect_happy_path() {
|
||||||
|
let (client_transport, server_transport, (_server_ep, client_ep)) =
|
||||||
|
connected_pair_with_port(0).await;
|
||||||
|
|
||||||
|
// Grab the client's actual bound port so we can cross-check
|
||||||
|
// against the reflected response.
|
||||||
|
let client_port = client_ep
|
||||||
|
.local_addr()
|
||||||
|
.expect("client local addr")
|
||||||
|
.port();
|
||||||
|
assert_ne!(client_port, 0, "client must have a real bound port");
|
||||||
|
|
||||||
|
// Start the mock relay's reflect handler.
|
||||||
|
let _relay_handle = spawn_mock_relay_with_reflect(Arc::clone(&server_transport)).await;
|
||||||
|
|
||||||
|
// Client sends Reflect and awaits the response. The real
|
||||||
|
// request_reflect helper in desktop/src-tauri/src/lib.rs uses a
|
||||||
|
// oneshot channel driven off the spawned recv loop; here we just
|
||||||
|
// do it inline because there's no spawned loop yet in this test
|
||||||
|
// — this isolates the wire protocol from the client-side state
|
||||||
|
// machine.
|
||||||
|
client_transport
|
||||||
|
.send_signal(&SignalMessage::Reflect)
|
||||||
|
.await
|
||||||
|
.expect("send Reflect");
|
||||||
|
|
||||||
|
let resp = tokio::time::timeout(Duration::from_secs(2), client_transport.recv_signal())
|
||||||
|
.await
|
||||||
|
.expect("reflect response should arrive within 2s")
|
||||||
|
.expect("recv_signal ok")
|
||||||
|
.expect("some message");
|
||||||
|
|
||||||
|
let observed_addr = match resp {
|
||||||
|
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
|
||||||
|
other => panic!("expected ReflectResponse, got {:?}", std::mem::discriminant(&other)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let parsed: SocketAddr = observed_addr
|
||||||
|
.parse()
|
||||||
|
.expect("ReflectResponse.observed_addr must parse as SocketAddr");
|
||||||
|
|
||||||
|
// The relay should see the client on 127.0.0.1 (loopback in the
|
||||||
|
// test harness) and on the client's bound ephemeral port.
|
||||||
|
assert_eq!(parsed.ip().to_string(), "127.0.0.1");
|
||||||
|
assert_eq!(
|
||||||
|
parsed.port(),
|
||||||
|
client_port,
|
||||||
|
"reflected port must match the client's local_addr port"
|
||||||
|
);
|
||||||
|
|
||||||
|
drop(client_transport);
|
||||||
|
drop(server_transport);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------
|
||||||
|
// Test 2: two clients get DIFFERENT reflected ports
|
||||||
|
// -----------------------------------------------------------------------
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||||
|
async fn reflect_two_clients_distinct_ports() {
|
||||||
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
|
// Shared server: one endpoint, two incoming accepts.
|
||||||
|
let (sc, _cert_der) = server_config();
|
||||||
|
let server_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
|
||||||
|
let server_ep = create_endpoint(server_addr, Some(sc)).expect("server endpoint");
|
||||||
|
let server_listen = server_ep.local_addr().expect("server local addr");
|
||||||
|
|
||||||
|
// Accept two clients in parallel.
|
||||||
|
let server_ep_a = server_ep.clone();
|
||||||
|
let accept_a = tokio::spawn(async move {
|
||||||
|
let conn = wzp_transport::accept(&server_ep_a).await.expect("accept A");
|
||||||
|
Arc::new(QuinnTransport::new(conn))
|
||||||
|
});
|
||||||
|
let server_ep_b = server_ep.clone();
|
||||||
|
let accept_b = tokio::spawn(async move {
|
||||||
|
let conn = wzp_transport::accept(&server_ep_b).await.expect("accept B");
|
||||||
|
Arc::new(QuinnTransport::new(conn))
|
||||||
|
});
|
||||||
|
|
||||||
|
// Client A
|
||||||
|
let client_ep_a = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep A");
|
||||||
|
let conn_a =
|
||||||
|
wzp_transport::connect(&client_ep_a, server_listen, "localhost", client_config())
|
||||||
|
.await
|
||||||
|
.expect("connect A");
|
||||||
|
let client_a = Arc::new(QuinnTransport::new(conn_a));
|
||||||
|
let port_a = client_ep_a.local_addr().unwrap().port();
|
||||||
|
|
||||||
|
// Client B
|
||||||
|
let client_ep_b = create_endpoint((Ipv4Addr::LOCALHOST, 0).into(), None).expect("ep B");
|
||||||
|
let conn_b =
|
||||||
|
wzp_transport::connect(&client_ep_b, server_listen, "localhost", client_config())
|
||||||
|
.await
|
||||||
|
.expect("connect B");
|
||||||
|
let client_b = Arc::new(QuinnTransport::new(conn_b));
|
||||||
|
let port_b = client_ep_b.local_addr().unwrap().port();
|
||||||
|
|
||||||
|
assert_ne!(
|
||||||
|
port_a, port_b,
|
||||||
|
"preconditions: OS must assign two clients different ephemeral ports"
|
||||||
|
);
|
||||||
|
|
||||||
|
let server_a = accept_a.await.expect("join A");
|
||||||
|
let server_b = accept_b.await.expect("join B");
|
||||||
|
|
||||||
|
// Spawn a reflect handler for each server-side transport.
|
||||||
|
let _relay_a = spawn_mock_relay_with_reflect(Arc::clone(&server_a)).await;
|
||||||
|
let _relay_b = spawn_mock_relay_with_reflect(Arc::clone(&server_b)).await;
|
||||||
|
|
||||||
|
// Each client requests reflect concurrently.
|
||||||
|
let reflect_for = |t: Arc<QuinnTransport>| async move {
|
||||||
|
t.send_signal(&SignalMessage::Reflect).await.expect("send");
|
||||||
|
let resp = tokio::time::timeout(Duration::from_secs(2), t.recv_signal())
|
||||||
|
.await
|
||||||
|
.expect("timeout")
|
||||||
|
.expect("ok")
|
||||||
|
.expect("some");
|
||||||
|
match resp {
|
||||||
|
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
|
||||||
|
_ => panic!("wrong variant"),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (addr_a, addr_b) = tokio::join!(reflect_for(client_a.clone()), reflect_for(client_b.clone()));
|
||||||
|
|
||||||
|
let parsed_a: SocketAddr = addr_a.parse().unwrap();
|
||||||
|
let parsed_b: SocketAddr = addr_b.parse().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(parsed_a.port(), port_a, "client A's reflected port");
|
||||||
|
assert_eq!(parsed_b.port(), port_b, "client B's reflected port");
|
||||||
|
assert_ne!(
|
||||||
|
parsed_a.port(),
|
||||||
|
parsed_b.port(),
|
||||||
|
"each client must see its own port, not a shared one"
|
||||||
|
);
|
||||||
|
|
||||||
|
drop(client_a);
|
||||||
|
drop(client_b);
|
||||||
|
drop(server_a);
|
||||||
|
drop(server_b);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------
|
||||||
|
// Test 3: old relay never answers — client times out cleanly
|
||||||
|
// -----------------------------------------------------------------------
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn reflect_old_relay_times_out() {
|
||||||
|
let (client_transport, server_transport, _endpoints) =
|
||||||
|
connected_pair_with_port(0).await;
|
||||||
|
|
||||||
|
// Mock relay that ignores Reflect — simulates a pre-Phase-1 build.
|
||||||
|
let _relay_handle =
|
||||||
|
spawn_mock_relay_without_reflect(Arc::clone(&server_transport)).await;
|
||||||
|
|
||||||
|
client_transport
|
||||||
|
.send_signal(&SignalMessage::Reflect)
|
||||||
|
.await
|
||||||
|
.expect("send Reflect");
|
||||||
|
|
||||||
|
// 1100ms ceiling matches the 1s timeout baked into
|
||||||
|
// get_reflected_address plus a tiny bit of slack. If this
|
||||||
|
// regression ever fires it probably means recv_signal blocked
|
||||||
|
// longer than expected and the Tauri command would hang the UI.
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
let result =
|
||||||
|
tokio::time::timeout(Duration::from_millis(1100), client_transport.recv_signal()).await;
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.is_err(),
|
||||||
|
"recv_signal must time out when the relay ignores Reflect"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
elapsed >= Duration::from_millis(1000),
|
||||||
|
"timeout fired too early ({:?})",
|
||||||
|
elapsed
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
elapsed < Duration::from_millis(1200),
|
||||||
|
"timeout fired too late ({:?}), client would feel unresponsive",
|
||||||
|
elapsed
|
||||||
|
);
|
||||||
|
|
||||||
|
drop(client_transport);
|
||||||
|
drop(server_transport);
|
||||||
|
}
|
||||||
@@ -185,6 +185,18 @@
|
|||||||
<span class="fp-display">~/.wzp/identity</span>
|
<span class="fp-display">~/.wzp/identity</span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="settings-section">
|
||||||
|
<h3>Network</h3>
|
||||||
|
<div class="setting-row">
|
||||||
|
<span class="setting-label">Public address</span>
|
||||||
|
<span id="s-reflected-addr" class="fp-display">(not queried)</span>
|
||||||
|
<button id="s-reflect-btn" class="secondary-btn">Detect</button>
|
||||||
|
</div>
|
||||||
|
<small style="color:var(--text-dim);display:block;margin-top:4px">
|
||||||
|
Asks the registered relay to echo back the IP:port it sees for this
|
||||||
|
connection (QUIC-native NAT reflection, replaces STUN).
|
||||||
|
</small>
|
||||||
|
</div>
|
||||||
<div class="settings-section">
|
<div class="settings-section">
|
||||||
<h3>Recent Rooms</h3>
|
<h3>Recent Rooms</h3>
|
||||||
<div id="s-recent-rooms" class="recent-rooms-list"></div>
|
<div id="s-recent-rooms" class="recent-rooms-list"></div>
|
||||||
|
|||||||
@@ -302,7 +302,14 @@ impl CallEngine {
|
|||||||
where
|
where
|
||||||
F: Fn(&str, &str) + Send + Sync + 'static,
|
F: Fn(&str, &str) + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
info!(%relay, %room, %alias, %quality, has_reuse = reuse_endpoint.is_some(), "CallEngine::start (android) invoked");
|
// Single "call epoch" timestamp threaded through send + recv tasks
|
||||||
|
// so every milestone log can carry t_ms_since_call_start. Used to
|
||||||
|
// diagnose the first-join no-audio regression by giving us a clean
|
||||||
|
// ordering between audio_start, first capture, first recv, first
|
||||||
|
// decode, first playout-ring write, and the C++ Oboe first-callback
|
||||||
|
// logs (which already exist in cpp/oboe_bridge.cpp).
|
||||||
|
let call_t0 = std::time::Instant::now();
|
||||||
|
info!(%relay, %room, %alias, %quality, has_reuse = reuse_endpoint.is_some(), t_ms = 0u128, "CallEngine::start (android) invoked");
|
||||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
let relay_addr: SocketAddr = relay.parse()?;
|
let relay_addr: SocketAddr = relay.parse()?;
|
||||||
@@ -348,7 +355,7 @@ impl CallEngine {
|
|||||||
return Err(anyhow::anyhow!("QUIC connect timeout (10s)"));
|
return Err(anyhow::anyhow!("QUIC connect timeout (10s)"));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
info!("QUIC connection established, performing handshake");
|
info!(t_ms = call_t0.elapsed().as_millis(), "first-join diag: QUIC connection established, performing handshake");
|
||||||
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
|
||||||
|
|
||||||
let _session = wzp_client::handshake::perform_handshake(
|
let _session = wzp_client::handshake::perform_handshake(
|
||||||
@@ -358,7 +365,7 @@ impl CallEngine {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| { error!("perform_handshake failed: {e}"); e })?;
|
.map_err(|e| { error!("perform_handshake failed: {e}"); e })?;
|
||||||
info!("connected to relay, handshake complete");
|
info!(t_ms = call_t0.elapsed().as_millis(), "first-join diag: connected to relay, handshake complete");
|
||||||
event_cb("connected", &format!("joined room {room}"));
|
event_cb("connected", &format!("joined room {room}"));
|
||||||
|
|
||||||
// Oboe audio via the wzp-native cdylib that was dlopen'd at
|
// Oboe audio via the wzp-native cdylib that was dlopen'd at
|
||||||
@@ -370,10 +377,21 @@ impl CallEngine {
|
|||||||
"wzp-native not loaded — dlopen failed at startup"
|
"wzp-native not loaded — dlopen failed at startup"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
let t_pre_audio = call_t0.elapsed().as_millis();
|
||||||
if let Err(code) = crate::wzp_native::audio_start() {
|
if let Err(code) = crate::wzp_native::audio_start() {
|
||||||
return Err(anyhow::anyhow!("wzp_native_audio_start failed: code {code}"));
|
return Err(anyhow::anyhow!("wzp_native_audio_start failed: code {code}"));
|
||||||
}
|
}
|
||||||
info!("wzp-native audio started");
|
// Diagnostic: how long did audio_start() take, and at what
|
||||||
|
// wall-clock offset from CallEngine::start did it complete?
|
||||||
|
// Compare to the C++ "playout cb#0" log timestamp in logcat to
|
||||||
|
// see whether the Oboe playout callback fires before or after
|
||||||
|
// the recv task starts pushing decoded frames.
|
||||||
|
let t_audio_start_done = call_t0.elapsed().as_millis();
|
||||||
|
info!(
|
||||||
|
t_ms = t_audio_start_done,
|
||||||
|
audio_start_ms = t_audio_start_done.saturating_sub(t_pre_audio),
|
||||||
|
"first-join diag: wzp-native audio started"
|
||||||
|
);
|
||||||
|
|
||||||
let running = Arc::new(AtomicBool::new(true));
|
let running = Arc::new(AtomicBool::new(true));
|
||||||
let mic_muted = Arc::new(AtomicBool::new(false));
|
let mic_muted = Arc::new(AtomicBool::new(false));
|
||||||
@@ -394,6 +412,7 @@ impl CallEngine {
|
|||||||
let send_drops = Arc::new(AtomicU64::new(0));
|
let send_drops = Arc::new(AtomicU64::new(0));
|
||||||
let send_quality = quality.clone();
|
let send_quality = quality.clone();
|
||||||
let send_tx_codec = tx_codec.clone();
|
let send_tx_codec = tx_codec.clone();
|
||||||
|
let send_t0 = call_t0;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let profile = resolve_quality(&send_quality);
|
let profile = resolve_quality(&send_quality);
|
||||||
let config = match profile {
|
let config = match profile {
|
||||||
@@ -409,7 +428,7 @@ impl CallEngine {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
let frame_samples = (config.profile.frame_duration_ms as usize) * 48;
|
let frame_samples = (config.profile.frame_duration_ms as usize) * 48;
|
||||||
info!(codec = ?config.profile.codec, frame_samples, "send task starting (android/oboe)");
|
info!(codec = ?config.profile.codec, frame_samples, t_ms = send_t0.elapsed().as_millis(), "first-join diag: send task spawned (android/oboe)");
|
||||||
*send_tx_codec.lock().await = format!("{:?}", config.profile.codec);
|
*send_tx_codec.lock().await = format!("{:?}", config.profile.codec);
|
||||||
let mut encoder = CallEncoder::new(&config);
|
let mut encoder = CallEncoder::new(&config);
|
||||||
encoder.set_aec_enabled(false);
|
encoder.set_aec_enabled(false);
|
||||||
@@ -419,6 +438,13 @@ impl CallEngine {
|
|||||||
let mut last_rms: u32 = 0;
|
let mut last_rms: u32 = 0;
|
||||||
let mut last_pkt_bytes: usize = 0;
|
let mut last_pkt_bytes: usize = 0;
|
||||||
let mut short_reads: u64 = 0;
|
let mut short_reads: u64 = 0;
|
||||||
|
// First-join diagnostic: latch the wall-clock offset of the
|
||||||
|
// first full-frame capture read and the first non-zero RMS
|
||||||
|
// reading separately. The gap between them tells us how long
|
||||||
|
// Oboe input took to actually start delivering real samples
|
||||||
|
// after returning a "started" status from audio_start.
|
||||||
|
let mut first_full_read_logged = false;
|
||||||
|
let mut first_nonzero_rms_logged = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if !send_r.load(Ordering::Relaxed) {
|
if !send_r.load(Ordering::Relaxed) {
|
||||||
@@ -434,12 +460,29 @@ impl CallEngine {
|
|||||||
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if !first_full_read_logged {
|
||||||
|
info!(
|
||||||
|
t_ms = send_t0.elapsed().as_millis(),
|
||||||
|
short_reads_before = short_reads,
|
||||||
|
frame_samples,
|
||||||
|
"first-join diag: send first full capture frame read"
|
||||||
|
);
|
||||||
|
first_full_read_logged = true;
|
||||||
|
}
|
||||||
|
|
||||||
// RMS for UI meter
|
// RMS for UI meter
|
||||||
let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum();
|
let sum_sq: f64 = buf.iter().map(|&s| (s as f64) * (s as f64)).sum();
|
||||||
let rms = (sum_sq / buf.len() as f64).sqrt() as u32;
|
let rms = (sum_sq / buf.len() as f64).sqrt() as u32;
|
||||||
send_level.store(rms, Ordering::Relaxed);
|
send_level.store(rms, Ordering::Relaxed);
|
||||||
last_rms = rms;
|
last_rms = rms;
|
||||||
|
if !first_nonzero_rms_logged && rms > 0 {
|
||||||
|
info!(
|
||||||
|
t_ms = send_t0.elapsed().as_millis(),
|
||||||
|
rms,
|
||||||
|
"first-join diag: send first non-zero capture RMS"
|
||||||
|
);
|
||||||
|
first_nonzero_rms_logged = true;
|
||||||
|
}
|
||||||
|
|
||||||
if send_mic.load(Ordering::Relaxed) {
|
if send_mic.load(Ordering::Relaxed) {
|
||||||
buf.fill(0);
|
buf.fill(0);
|
||||||
@@ -483,6 +526,7 @@ impl CallEngine {
|
|||||||
let recv_spk = spk_muted.clone();
|
let recv_spk = spk_muted.clone();
|
||||||
let recv_fr = frames_received.clone();
|
let recv_fr = frames_received.clone();
|
||||||
let recv_rx_codec = rx_codec.clone();
|
let recv_rx_codec = rx_codec.clone();
|
||||||
|
let recv_t0 = call_t0;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD);
|
let initial_profile = resolve_quality(&quality).unwrap_or(QualityProfile::GOOD);
|
||||||
// Phase 3b/3c: use concrete AdaptiveDecoder (not Box<dyn
|
// Phase 3b/3c: use concrete AdaptiveDecoder (not Box<dyn
|
||||||
@@ -497,7 +541,11 @@ impl CallEngine {
|
|||||||
// Phase 3b/3c DRED reconstruction state — see DredRecvState
|
// Phase 3b/3c DRED reconstruction state — see DredRecvState
|
||||||
// above for the full flow.
|
// above for the full flow.
|
||||||
let mut dred_recv = DredRecvState::new();
|
let mut dred_recv = DredRecvState::new();
|
||||||
info!(codec = ?current_codec, "recv task starting (android/oboe)");
|
info!(codec = ?current_codec, t_ms = recv_t0.elapsed().as_millis(), "first-join diag: recv task spawned (android/oboe)");
|
||||||
|
// First-join diagnostic latches — see send task above for the
|
||||||
|
// sibling capture milestones.
|
||||||
|
let mut first_decode_logged = false;
|
||||||
|
let mut first_playout_write_logged = false;
|
||||||
|
|
||||||
// ─── Decoded-PCM recorder (debug) ────────────────────────────
|
// ─── Decoded-PCM recorder (debug) ────────────────────────────
|
||||||
// Dumps the first ~10 seconds of post-AGC PCM to a raw i16 LE
|
// Dumps the first ~10 seconds of post-AGC PCM to a raw i16 LE
|
||||||
@@ -546,7 +594,13 @@ impl CallEngine {
|
|||||||
{
|
{
|
||||||
Ok(Ok(Some(pkt))) => {
|
Ok(Ok(Some(pkt))) => {
|
||||||
if !first_packet_logged {
|
if !first_packet_logged {
|
||||||
info!(codec_id = ?pkt.header.codec_id, payload_bytes = pkt.payload.len(), is_repair = pkt.header.is_repair, "recv: first media packet received");
|
info!(
|
||||||
|
t_ms = recv_t0.elapsed().as_millis(),
|
||||||
|
codec_id = ?pkt.header.codec_id,
|
||||||
|
payload_bytes = pkt.payload.len(),
|
||||||
|
is_repair = pkt.header.is_repair,
|
||||||
|
"first-join diag: recv first media packet"
|
||||||
|
);
|
||||||
first_packet_logged = true;
|
first_packet_logged = true;
|
||||||
}
|
}
|
||||||
if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
|
if !pkt.header.is_repair && pkt.header.codec_id != CodecId::ComfortNoise {
|
||||||
@@ -613,6 +667,15 @@ impl CallEngine {
|
|||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
last_decode_n = n;
|
last_decode_n = n;
|
||||||
decoded_frames += 1;
|
decoded_frames += 1;
|
||||||
|
if !first_decode_logged {
|
||||||
|
info!(
|
||||||
|
t_ms = recv_t0.elapsed().as_millis(),
|
||||||
|
n,
|
||||||
|
codec = ?current_codec,
|
||||||
|
"first-join diag: recv first successful decode"
|
||||||
|
);
|
||||||
|
first_decode_logged = true;
|
||||||
|
}
|
||||||
// Log sample range for the first few decoded frames and periodically
|
// Log sample range for the first few decoded frames and periodically
|
||||||
if decoded_frames <= 3 || decoded_frames % 100 == 0 {
|
if decoded_frames <= 3 || decoded_frames % 100 == 0 {
|
||||||
let slice = &pcm[..n];
|
let slice = &pcm[..n];
|
||||||
@@ -663,6 +726,15 @@ impl CallEngine {
|
|||||||
|
|
||||||
if !recv_spk.load(Ordering::Relaxed) {
|
if !recv_spk.load(Ordering::Relaxed) {
|
||||||
let w = crate::wzp_native::audio_write_playout(&pcm[..n]);
|
let w = crate::wzp_native::audio_write_playout(&pcm[..n]);
|
||||||
|
if !first_playout_write_logged {
|
||||||
|
info!(
|
||||||
|
t_ms = recv_t0.elapsed().as_millis(),
|
||||||
|
n,
|
||||||
|
w,
|
||||||
|
"first-join diag: recv first playout-ring write"
|
||||||
|
);
|
||||||
|
first_playout_write_logged = true;
|
||||||
|
}
|
||||||
last_written = w;
|
last_written = w;
|
||||||
written_samples = written_samples.saturating_add(w as u64);
|
written_samples = written_samples.saturating_add(w as u64);
|
||||||
if w < n && decoded_frames <= 10 {
|
if w < n && decoded_frames <= 10 {
|
||||||
|
|||||||
@@ -465,6 +465,14 @@ struct SignalState {
|
|||||||
incoming_call_id: Option<String>,
|
incoming_call_id: Option<String>,
|
||||||
incoming_caller_fp: Option<String>,
|
incoming_caller_fp: Option<String>,
|
||||||
incoming_caller_alias: Option<String>,
|
incoming_caller_alias: Option<String>,
|
||||||
|
/// Pending `ReflectResponse` channel. When the `get_reflected_address`
|
||||||
|
/// Tauri command fires, it drops a `oneshot::Sender<SocketAddr>` here
|
||||||
|
/// before sending a `SignalMessage::Reflect`. The spawned recv loop
|
||||||
|
/// picks the response off the next bi-stream and fires the sender.
|
||||||
|
/// If another Reflect request comes in while one is pending, we
|
||||||
|
/// replace the sender — the old receiver sees a `Cancelled` error
|
||||||
|
/// and the caller retries.
|
||||||
|
pending_reflect: Option<tokio::sync::oneshot::Sender<std::net::SocketAddr>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tauri::command]
|
#[tauri::command]
|
||||||
@@ -542,6 +550,39 @@ async fn register_signal(
|
|||||||
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None;
|
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None;
|
||||||
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
|
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
|
||||||
}
|
}
|
||||||
|
Ok(Some(SignalMessage::ReflectResponse { observed_addr })) => {
|
||||||
|
// "STUN for QUIC" response — the relay told us our
|
||||||
|
// own server-reflexive address. If a Tauri command
|
||||||
|
// is currently awaiting this, fire the oneshot;
|
||||||
|
// otherwise log and drop (unsolicited responses
|
||||||
|
// from a confused relay shouldn't crash the loop).
|
||||||
|
tracing::info!(%observed_addr, "signal: ReflectResponse");
|
||||||
|
match observed_addr.parse::<std::net::SocketAddr>() {
|
||||||
|
Ok(parsed) => {
|
||||||
|
let mut sig = signal_state.lock().await;
|
||||||
|
if let Some(tx) = sig.pending_reflect.take() {
|
||||||
|
// `send` returns Err(addr) only if the
|
||||||
|
// receiver was dropped (caller timed out
|
||||||
|
// or canceled). Either way, nothing to
|
||||||
|
// do — the value is gone.
|
||||||
|
let _ = tx.send(parsed);
|
||||||
|
} else {
|
||||||
|
tracing::debug!(%observed_addr, "reflect: unsolicited response (no pending sender)");
|
||||||
|
}
|
||||||
|
let _ = app_clone.emit(
|
||||||
|
"signal-event",
|
||||||
|
serde_json::json!({"type":"reflect","observed_addr":observed_addr}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(%observed_addr, error = %e, "reflect: relay returned unparseable addr");
|
||||||
|
// Treat unparseable response as a failed
|
||||||
|
// request so the caller doesn't hang.
|
||||||
|
let mut sig = signal_state.lock().await;
|
||||||
|
let _ = sig.pending_reflect.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Some(other)) => {
|
Ok(Some(other)) => {
|
||||||
tracing::debug!(?other, "signal: unhandled message");
|
tracing::debug!(?other, "signal: unhandled message");
|
||||||
}
|
}
|
||||||
@@ -615,6 +656,69 @@ async fn answer_call(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// "STUN for QUIC" — ask the relay what our own public address looks
|
||||||
|
/// like from its side of the TLS-authenticated signal connection.
|
||||||
|
///
|
||||||
|
/// Wire flow:
|
||||||
|
/// 1. We install a `oneshot::Sender` in `SignalState.pending_reflect`
|
||||||
|
/// (replacing any stale one — last request wins).
|
||||||
|
/// 2. We release the state lock and send `SignalMessage::Reflect`
|
||||||
|
/// over the existing transport. The relay opens a fresh bi-stream
|
||||||
|
/// on its side to respond, which the spawned recv loop picks up.
|
||||||
|
/// 3. The recv loop's `ReflectResponse` match arm takes the sender
|
||||||
|
/// back out and fires it with the parsed `SocketAddr`.
|
||||||
|
/// 4. We await the receiver with a 1s timeout so a non-reflecting
|
||||||
|
/// relay (pre-Phase-1 build) doesn't hang the UI forever.
|
||||||
|
///
|
||||||
|
/// Returns the addr as a string so it can cross the Tauri IPC
|
||||||
|
/// boundary unchanged — JS-side can display it directly or parse it
|
||||||
|
/// with `new URL(...)` / a regex if needed.
|
||||||
|
#[tauri::command]
|
||||||
|
async fn get_reflected_address(
|
||||||
|
state: tauri::State<'_, Arc<AppState>>,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
use wzp_proto::SignalMessage;
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel::<std::net::SocketAddr>();
|
||||||
|
let transport = {
|
||||||
|
let mut sig = state.signal.lock().await;
|
||||||
|
// Drop any older pending sender — we don't support more than
|
||||||
|
// one in-flight Reflect per connection. A prior request whose
|
||||||
|
// receiver has timed out will be cleaned up here automatically.
|
||||||
|
sig.pending_reflect = Some(tx);
|
||||||
|
sig.transport
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| "not registered".to_string())?
|
||||||
|
.clone()
|
||||||
|
};
|
||||||
|
if let Err(e) = transport.send_signal(&SignalMessage::Reflect).await {
|
||||||
|
// Clean up the pending sender so the next attempt doesn't see
|
||||||
|
// a stale channel. Re-acquire the lock inline since we already
|
||||||
|
// released it above to release `transport` back to the caller.
|
||||||
|
let mut sig = state.signal.lock().await;
|
||||||
|
sig.pending_reflect = None;
|
||||||
|
return Err(format!("send Reflect: {e}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1s is plenty for a same-datacenter relay (< 50ms RTT) and also
|
||||||
|
// the ceiling for "something's wrong, tell the user" — any older
|
||||||
|
// relay will never reply at all. 1100ms in the integration test.
|
||||||
|
match tokio::time::timeout(std::time::Duration::from_millis(1000), rx).await {
|
||||||
|
Ok(Ok(addr)) => Ok(addr.to_string()),
|
||||||
|
Ok(Err(_canceled)) => {
|
||||||
|
// The recv loop dropped the sender (relay returned
|
||||||
|
// unparseable addr, or loop exited mid-request).
|
||||||
|
Err("reflect channel canceled (signal loop exited or parse error)".into())
|
||||||
|
}
|
||||||
|
Err(_elapsed) => {
|
||||||
|
// Timeout — strip the pending sender so the next attempt
|
||||||
|
// starts clean. Old (pre-Phase-1) relays will land here.
|
||||||
|
let mut sig = state.signal.lock().await;
|
||||||
|
sig.pending_reflect = None;
|
||||||
|
Err("reflect timeout (relay may not support reflection)".into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tauri::command]
|
#[tauri::command]
|
||||||
async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<serde_json::Value, String> {
|
async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<serde_json::Value, String> {
|
||||||
let sig = state.signal.lock().await;
|
let sig = state.signal.lock().await;
|
||||||
@@ -651,6 +755,7 @@ pub fn run() {
|
|||||||
signal: Arc::new(Mutex::new(SignalState {
|
signal: Arc::new(Mutex::new(SignalState {
|
||||||
transport: None, endpoint: None, fingerprint: String::new(), signal_status: "idle".into(),
|
transport: None, endpoint: None, fingerprint: String::new(), signal_status: "idle".into(),
|
||||||
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
|
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
|
||||||
|
pending_reflect: None,
|
||||||
})),
|
})),
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -700,6 +805,7 @@ pub fn run() {
|
|||||||
ping_relay, get_identity, get_app_info,
|
ping_relay, get_identity, get_app_info,
|
||||||
connect, disconnect, toggle_mic, toggle_speaker, get_status,
|
connect, disconnect, toggle_mic, toggle_speaker, get_status,
|
||||||
register_signal, place_call, answer_call, get_signal_status,
|
register_signal, place_call, answer_call, get_signal_status,
|
||||||
|
get_reflected_address,
|
||||||
deregister,
|
deregister,
|
||||||
set_speakerphone, is_speakerphone_on,
|
set_speakerphone, is_speakerphone_on,
|
||||||
get_call_history, get_recent_contacts, clear_call_history,
|
get_call_history, get_recent_contacts, clear_call_history,
|
||||||
|
|||||||
@@ -83,6 +83,8 @@ const sRoom = document.getElementById("s-room") as HTMLInputElement;
|
|||||||
const sAlias = document.getElementById("s-alias") as HTMLInputElement;
|
const sAlias = document.getElementById("s-alias") as HTMLInputElement;
|
||||||
const sOsAec = document.getElementById("s-os-aec") as HTMLInputElement;
|
const sOsAec = document.getElementById("s-os-aec") as HTMLInputElement;
|
||||||
const sDredDebug = document.getElementById("s-dred-debug") as HTMLInputElement;
|
const sDredDebug = document.getElementById("s-dred-debug") as HTMLInputElement;
|
||||||
|
const sReflectedAddr = document.getElementById("s-reflected-addr") as HTMLSpanElement;
|
||||||
|
const sReflectBtn = document.getElementById("s-reflect-btn") as HTMLButtonElement;
|
||||||
const sAgc = document.getElementById("s-agc") as HTMLInputElement;
|
const sAgc = document.getElementById("s-agc") as HTMLInputElement;
|
||||||
const sQuality = document.getElementById("s-quality") as HTMLInputElement;
|
const sQuality = document.getElementById("s-quality") as HTMLInputElement;
|
||||||
const sQualityLabel = document.getElementById("s-quality-label")!;
|
const sQualityLabel = document.getElementById("s-quality-label")!;
|
||||||
@@ -757,6 +759,36 @@ function renderSettingsRecentRooms(rooms: RecentRoom[]) {
|
|||||||
|
|
||||||
settingsBtnHome.addEventListener("click", openSettings);
|
settingsBtnHome.addEventListener("click", openSettings);
|
||||||
settingsBtnCall.addEventListener("click", openSettings);
|
settingsBtnCall.addEventListener("click", openSettings);
|
||||||
|
// "STUN for QUIC" — ask the registered relay for our own public
|
||||||
|
// address. Requires register_signal to have been run first
|
||||||
|
// (otherwise the Rust side returns "not registered"). The button
|
||||||
|
// shows its working state inline so the user knows it's waiting on
|
||||||
|
// the relay rather than the network.
|
||||||
|
sReflectBtn.addEventListener("click", async () => {
|
||||||
|
sReflectedAddr.textContent = "querying...";
|
||||||
|
sReflectBtn.disabled = true;
|
||||||
|
try {
|
||||||
|
const addr = await invoke<string>("get_reflected_address");
|
||||||
|
sReflectedAddr.textContent = addr;
|
||||||
|
sReflectedAddr.style.color = "var(--green)";
|
||||||
|
} catch (e: any) {
|
||||||
|
// Two main failure modes surfaced via the error string:
|
||||||
|
// - "not registered" — user hasn't registered
|
||||||
|
// against a relay yet
|
||||||
|
// - "reflect timeout (relay may not support reflection)"
|
||||||
|
// — old relay, pre-Phase-1
|
||||||
|
const msg = String(e);
|
||||||
|
sReflectedAddr.textContent = msg.includes("not registered")
|
||||||
|
? "⚠ register first"
|
||||||
|
: msg.includes("timeout")
|
||||||
|
? "⚠ relay does not support reflection"
|
||||||
|
: `⚠ ${msg}`;
|
||||||
|
sReflectedAddr.style.color = "var(--yellow)";
|
||||||
|
} finally {
|
||||||
|
sReflectBtn.disabled = false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
settingsClose.addEventListener("click", closeSettings);
|
settingsClose.addEventListener("click", closeSettings);
|
||||||
settingsPanel.addEventListener("click", (e) => { if (e.target === settingsPanel) closeSettings(); });
|
settingsPanel.addEventListener("click", (e) => { if (e.target === settingsPanel) closeSettings(); });
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user