feat(hole-punching): advertise peer reflexive addrs in DirectCall flow — Phase 3
Completes the signal-plane plumbing for P2P direct calling: both
peers now learn their own server-reflexive address (Phase 1
Reflect), include it in DirectCallOffer / DirectCallAnswer, and
the relay cross-wires them into each side's CallSetup so the
client knows the OTHER party's direct addr. Dual-path QUIC race
is scaffolded but deferred to Phase 3.5 — this commit ships the
full advertising layer so real-hardware testing can confirm the
addrs flow end-to-end before adding the concurrent-connect logic.
Wire protocol (wzp-proto/src/packet.rs):
- DirectCallOffer gains optional `caller_reflexive_addr`
- DirectCallAnswer gains optional `callee_reflexive_addr`
- CallSetup gains optional `peer_direct_addr`
- All #[serde(default, skip_serializing_if = "Option::is_none")] so
pre-Phase-3 peers and relays stay backward compatible by
construction — the new fields are elided from the JSON on the
wire when None, and older clients parse the JSON ignoring any
fields they don't know.
- 2 new roundtrip tests (Some + None cases, old-JSON parse-back).
Call registry (wzp-relay/src/call_registry.rs):
- DirectCall gains caller_reflexive_addr + callee_reflexive_addr.
- set_caller_reflexive_addr / set_callee_reflexive_addr setters.
- 2 new unit tests: stores and returns addrs, clearing works.
Relay cross-wiring (wzp-relay/src/main.rs):
- On DirectCallOffer: stash the caller's addr in the registry.
- On DirectCallAnswer: stash the callee's addr (only set by
AcceptTrusted answers — privacy-mode leaves it None).
- Send two different CallSetup messages: one to the caller with
peer_direct_addr=callee_addr, and one to the callee with
peer_direct_addr=caller_addr. The cross-wiring means each side
gets the OTHER party's direct addr, not its own.
- Logs `p2p_viable=true` when both sides advertised.
Client advertising (desktop/src-tauri/src/lib.rs):
- New `try_reflect_own_addr` helper that reuses the Phase 1
oneshot pattern WITHOUT holding state.signal.lock() across the
await (critical: the recv loop reacquires the same mutex to
fire the oneshot, so holding it would deadlock).
- `place_call` queries reflect first and includes the returned
addr in DirectCallOffer. Falls back to None on any failure —
call still proceeds via the relay path.
- `answer_call` queries reflect ONLY on AcceptTrusted so
AcceptGeneric keeps the callee's IP private by design. Reject
and AcceptGeneric both pass None.
- recv loop's CallSetup handler destructures and forwards
peer_direct_addr to the JS layer in the signal-event payload.
Client scaffolding for dual-path (desktop/src-tauri/src/lib.rs +
desktop/src/main.ts):
- `connect` Tauri command gets a new optional `peer_direct_addr`
argument. Currently LOGS the addr but still uses the relay
path for the media connection — Phase 3.5 will swap in a
tokio::select! race between direct dial + relay dial. Scaffolding
lands here so the JS wire is stable, real-hardware testing can
confirm advertising works end-to-end, and Phase 3.5 is a pure
Rust change with no JS touches.
- JS setup handler forwards `data.peer_direct_addr` to invoke.
Back-compat with the CLI client (crates/wzp-client/src/cli.rs):
- CLI test harness updated for the new fields — always passes
None for both reflex addrs (no hole-punching). Also destructures
peer_direct_addr: _ in its CallSetup handler.
Tests (8 new, all passing):
- wzp-proto: hole_punching_optional_fields_roundtrip,
hole_punching_backward_compat_old_json_parses
- wzp-relay call_registry: call_registry_stores_reflexive_addrs,
call_registry_clearing_reflex_addr_works
- wzp-relay integration: crates/wzp-relay/tests/hole_punching.rs
* both_peers_advertise_reflex_addrs_cross_wire_in_setup
* privacy_mode_answer_omits_callee_addr_from_setup
* pre_phase3_caller_leaves_both_setups_relay_only
* neither_peer_advertises_both_setups_are_relay_only
Full workspace test goes from 396 → 404 passing.
PRD: .taskmaster/docs/prd_hole_punching.txt
Tasks: 53-60 all completed (58 = scaffolding-only; 3.5 follow-up)
Next up: **Phase 3.5 — dual-path QUIC connect race**. With the
advertising layer live, this becomes a focused change: on
CallSetup-with-peer_direct_addr, start a server-capable dual
endpoint, and tokio::select! across (direct dial, relay dial,
inbound accept). Whichever QUIC handshake completes first wins,
the losers drop, 2s direct timeout falls back to relay.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -260,12 +260,28 @@ async fn connect(
|
||||
alias: String,
|
||||
os_aec: bool,
|
||||
quality: String,
|
||||
// Phase 3 hole-punching: peer's server-reflexive address as
|
||||
// cross-wired by the relay in CallSetup.peer_direct_addr. JS
|
||||
// passes it through when present. Currently LOGGED for
|
||||
// observability but not yet used to race a direct QUIC
|
||||
// handshake — that's the Phase 3.5 follow-up. Passing it
|
||||
// through now so real-hardware testing can confirm the
|
||||
// advertising layer is delivering the addrs end to end, and so
|
||||
// the JS → Rust wire is stable before we add the race logic.
|
||||
#[allow(non_snake_case)]
|
||||
peer_direct_addr: Option<String>,
|
||||
) -> Result<String, String> {
|
||||
let mut engine_lock = state.engine.lock().await;
|
||||
if engine_lock.is_some() {
|
||||
return Err("already connected".into());
|
||||
}
|
||||
|
||||
if let Some(ref addr) = peer_direct_addr {
|
||||
tracing::info!(%addr, %relay, %room, "connect: peer_direct_addr supplied — hole-punching candidate logged (Phase 3.5 will race direct vs relay here)");
|
||||
} else {
|
||||
tracing::info!(%relay, %room, "connect: no peer_direct_addr — relay-only path");
|
||||
}
|
||||
|
||||
// If we previously opened a quinn::Endpoint for the signaling connection
|
||||
// (direct-call path), reuse it so the media connection shares the same
|
||||
// UDP socket. This side-steps the Android issue where a second
|
||||
@@ -540,10 +556,32 @@ async fn register_signal(
|
||||
Ok(Some(SignalMessage::DirectCallAnswer { call_id, accept_mode, .. })) => {
|
||||
tracing::info!(%call_id, ?accept_mode, "signal: DirectCallAnswer (forwarded by relay)");
|
||||
}
|
||||
Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr })) => {
|
||||
tracing::info!(%call_id, %room, %relay_addr, "signal: CallSetup — emitting setup event to JS");
|
||||
let mut sig = signal_state.lock().await; sig.signal_status = "setup".into();
|
||||
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"setup","call_id":call_id,"room":room,"relay_addr":relay_addr}));
|
||||
Ok(Some(SignalMessage::CallSetup { call_id, room, relay_addr, peer_direct_addr })) => {
|
||||
// Phase 3: peer_direct_addr carries the OTHER party's
|
||||
// reflex addr when hole-punching is viable. Forwarded
|
||||
// to JS alongside the relay addr so the connect flow
|
||||
// can attempt a dual-path race. `null` when either
|
||||
// side didn't advertise (pre-Phase-3 peer, privacy
|
||||
// mode callee, or relay policy).
|
||||
tracing::info!(
|
||||
%call_id,
|
||||
%room,
|
||||
%relay_addr,
|
||||
peer_direct = ?peer_direct_addr,
|
||||
"signal: CallSetup — emitting setup event to JS"
|
||||
);
|
||||
let mut sig = signal_state.lock().await;
|
||||
sig.signal_status = "setup".into();
|
||||
let _ = app_clone.emit(
|
||||
"signal-event",
|
||||
serde_json::json!({
|
||||
"type": "setup",
|
||||
"call_id": call_id,
|
||||
"room": room,
|
||||
"relay_addr": relay_addr,
|
||||
"peer_direct_addr": peer_direct_addr,
|
||||
}),
|
||||
);
|
||||
}
|
||||
Ok(Some(SignalMessage::Hangup { reason })) => {
|
||||
tracing::info!(?reason, "signal: Hangup");
|
||||
@@ -609,15 +647,48 @@ async fn place_call(
|
||||
target_fp: String,
|
||||
) -> Result<(), String> {
|
||||
use wzp_proto::SignalMessage;
|
||||
|
||||
// Phase 3 hole-punching: query our own reflex addr BEFORE the
|
||||
// offer so we can advertise it. Best-effort — a failed reflect
|
||||
// (old relay, transient error) falls back to `None` which
|
||||
// means the callee's CallSetup will have peer_direct_addr=None
|
||||
// and the whole call goes through the relay path unchanged.
|
||||
//
|
||||
// Critical: this call does its own state.signal.lock() usage and
|
||||
// MUST NOT be wrapped in an outer lock, or the recv loop's
|
||||
// ReflectResponse handler will deadlock on the same mutex.
|
||||
let state_inner: Arc<AppState> = (*state).clone();
|
||||
let own_reflex = try_reflect_own_addr(&state_inner).await.ok().flatten();
|
||||
if let Some(ref a) = own_reflex {
|
||||
tracing::info!(%a, "place_call: learned own reflex addr for hole-punching advertisement");
|
||||
} else {
|
||||
tracing::info!("place_call: no reflex addr available, falling back to relay-only");
|
||||
}
|
||||
|
||||
let sig = state.signal.lock().await;
|
||||
let transport = sig.transport.as_ref().ok_or("not registered")?;
|
||||
let call_id = format!("{:016x}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
|
||||
tracing::info!(%call_id, %target_fp, "place_call: sending DirectCallOffer");
|
||||
transport.send_signal(&SignalMessage::DirectCallOffer {
|
||||
caller_fingerprint: sig.fingerprint.clone(), caller_alias: None, target_fingerprint: target_fp.clone(),
|
||||
call_id: call_id.clone(), identity_pub: [0u8; 32], ephemeral_pub: [0u8; 32], signature: vec![],
|
||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||
}).await.map_err(|e| format!("{e}"))?;
|
||||
let call_id = format!(
|
||||
"{:016x}",
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos()
|
||||
);
|
||||
tracing::info!(%call_id, %target_fp, reflex = ?own_reflex, "place_call: sending DirectCallOffer");
|
||||
transport
|
||||
.send_signal(&SignalMessage::DirectCallOffer {
|
||||
caller_fingerprint: sig.fingerprint.clone(),
|
||||
caller_alias: None,
|
||||
target_fingerprint: target_fp.clone(),
|
||||
call_id: call_id.clone(),
|
||||
identity_pub: [0u8; 32],
|
||||
ephemeral_pub: [0u8; 32],
|
||||
signature: vec![],
|
||||
supported_profiles: vec![wzp_proto::QualityProfile::GOOD],
|
||||
caller_reflexive_addr: own_reflex,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| format!("{e}"))?;
|
||||
history::log(call_id, target_fp, None, history::CallDirection::Placed);
|
||||
let _ = app.emit("history-changed", ());
|
||||
Ok(())
|
||||
@@ -631,31 +702,109 @@ async fn answer_call(
|
||||
mode: i32,
|
||||
) -> Result<(), String> {
|
||||
use wzp_proto::SignalMessage;
|
||||
let accept_mode = match mode {
|
||||
0 => wzp_proto::CallAcceptMode::Reject,
|
||||
1 => wzp_proto::CallAcceptMode::AcceptTrusted,
|
||||
_ => wzp_proto::CallAcceptMode::AcceptGeneric,
|
||||
};
|
||||
|
||||
// Phase 3 hole-punching: only AcceptTrusted reveals our reflex
|
||||
// addr. Privacy-mode (AcceptGeneric) and Reject explicitly do
|
||||
// NOT — leaking the callee's IP back to the caller in those
|
||||
// modes would defeat the entire point of AcceptGeneric.
|
||||
//
|
||||
// Like place_call, we MUST NOT hold state.signal.lock() across
|
||||
// the reflect await or the recv loop's ReflectResponse handler
|
||||
// will deadlock on the same mutex.
|
||||
let own_reflex = if accept_mode == wzp_proto::CallAcceptMode::AcceptTrusted {
|
||||
let state_inner: Arc<AppState> = (*state).clone();
|
||||
let r = try_reflect_own_addr(&state_inner).await.ok().flatten();
|
||||
if let Some(ref a) = r {
|
||||
tracing::info!(%call_id, %a, "answer_call: learned own reflex addr for AcceptTrusted");
|
||||
} else {
|
||||
tracing::info!(%call_id, "answer_call: no reflex addr for AcceptTrusted, falling back to relay-only");
|
||||
}
|
||||
r
|
||||
} else {
|
||||
// Reject / AcceptGeneric: keep the IP private.
|
||||
None
|
||||
};
|
||||
|
||||
let sig = state.signal.lock().await;
|
||||
let transport = sig.transport.as_ref().ok_or_else(|| {
|
||||
tracing::warn!("answer_call: not registered (no transport)");
|
||||
"not registered".to_string()
|
||||
})?;
|
||||
let accept_mode = match mode { 0 => wzp_proto::CallAcceptMode::Reject, 1 => wzp_proto::CallAcceptMode::AcceptTrusted, _ => wzp_proto::CallAcceptMode::AcceptGeneric };
|
||||
tracing::info!(%call_id, ?accept_mode, "answer_call: sending DirectCallAnswer");
|
||||
transport.send_signal(&SignalMessage::DirectCallAnswer {
|
||||
call_id: call_id.clone(), accept_mode, identity_pub: None, ephemeral_pub: None, signature: None,
|
||||
chosen_profile: Some(wzp_proto::QualityProfile::GOOD),
|
||||
}).await.map_err(|e| {
|
||||
tracing::error!(%call_id, error = %e, "answer_call: send_signal failed");
|
||||
format!("{e}")
|
||||
})?;
|
||||
tracing::info!(%call_id, ?accept_mode, reflex = ?own_reflex, "answer_call: sending DirectCallAnswer");
|
||||
transport
|
||||
.send_signal(&SignalMessage::DirectCallAnswer {
|
||||
call_id: call_id.clone(),
|
||||
accept_mode,
|
||||
identity_pub: None,
|
||||
ephemeral_pub: None,
|
||||
signature: None,
|
||||
chosen_profile: Some(wzp_proto::QualityProfile::GOOD),
|
||||
callee_reflexive_addr: own_reflex,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!(%call_id, error = %e, "answer_call: send_signal failed");
|
||||
format!("{e}")
|
||||
})?;
|
||||
tracing::info!(%call_id, "answer_call: DirectCallAnswer sent successfully");
|
||||
// Upgrade the pending "Missed" entry to "Received" if the user
|
||||
// accepted (mode != Reject). Mode 0 = Reject → leave as Missed.
|
||||
if mode != 0 {
|
||||
if history::mark_received_if_pending(&call_id) {
|
||||
let _ = app.emit("history-changed", ());
|
||||
}
|
||||
if mode != 0 && history::mark_received_if_pending(&call_id) {
|
||||
let _ = app.emit("history-changed", ());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Internal reflect helper shared by `get_reflected_address` and the
|
||||
/// hole-punching path in `place_call` / `answer_call`.
|
||||
///
|
||||
/// Must be called WITHOUT holding `state.signal.lock()` — the recv
|
||||
/// loop acquires the same lock to fire the oneshot, so holding it
|
||||
/// across the await would deadlock.
|
||||
///
|
||||
/// Returns `Ok(Some(addr))` on success, `Ok(None)` if reflect is
|
||||
/// unsupported / timed out / transport failed (caller should
|
||||
/// gracefully continue with a relay-only path), or `Err` on
|
||||
/// "not registered" which is a hard precondition failure.
|
||||
async fn try_reflect_own_addr(
|
||||
state: &Arc<AppState>,
|
||||
) -> Result<Option<String>, String> {
|
||||
use wzp_proto::SignalMessage;
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<std::net::SocketAddr>();
|
||||
let transport = {
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_reflect = Some(tx);
|
||||
sig.transport
|
||||
.as_ref()
|
||||
.ok_or_else(|| "not registered".to_string())?
|
||||
.clone()
|
||||
};
|
||||
if let Err(e) = transport.send_signal(&SignalMessage::Reflect).await {
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_reflect = None;
|
||||
tracing::warn!(error = %e, "try_reflect_own_addr: send_signal failed, continuing without reflex addr");
|
||||
return Ok(None);
|
||||
}
|
||||
match tokio::time::timeout(std::time::Duration::from_millis(1000), rx).await {
|
||||
Ok(Ok(addr)) => Ok(Some(addr.to_string())),
|
||||
Ok(Err(_canceled)) => {
|
||||
tracing::warn!("try_reflect_own_addr: oneshot canceled");
|
||||
Ok(None)
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.pending_reflect = None;
|
||||
tracing::warn!("try_reflect_own_addr: 1s timeout (pre-Phase-1 relay?)");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// "STUN for QUIC" — ask the relay what our own public address looks
|
||||
/// like from its side of the TLS-authenticated signal connection.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user