diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index cd0f40e..7433a5d 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -126,6 +126,10 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { // an answer. "Offer" is the generic catch-all. SignalMessage::Reflect | SignalMessage::ReflectResponse { .. } => CallSignalType::Offer, // control-plane + // Phase 4 cross-relay forwarding envelope — strictly a + // relay-to-relay message, never rides the featherChat + // bridge. Catch-all mapping for completeness. + SignalMessage::FederatedSignalForward { .. } => CallSignalType::Offer, } } diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index d443087..01cb78f 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -820,6 +820,38 @@ pub enum SignalMessage { ReflectResponse { observed_addr: String, }, + + // ── Phase 4: cross-relay direct-call signaling ──────────────────── + + /// Phase 4: relay-to-relay envelope for forwarding direct-call + /// signaling across a federation link. When Alice on Relay A + /// sends a `DirectCallOffer` for Bob whose fingerprint isn't + /// in A's local SignalHub, Relay A wraps the offer in this + /// envelope and broadcasts it over every active federation + /// peer link. Whichever peer has Bob registered unwraps the + /// inner message and delivers it locally. + /// + /// Never originated by clients — only relays create and + /// consume this variant. + /// + /// Loop prevention: the receiving relay drops any forward + /// where `origin_relay_fp` matches its own federation TLS + /// fingerprint. With broadcast-to-all-peers this prevents + /// A→B→A echo loops; proper TTL + dedup will land when + /// multi-hop federation is added (Phase 4.2). + FederatedSignalForward { + /// The signal message being forwarded + /// (`DirectCallOffer`, `DirectCallAnswer`, `CallRinging`, + /// `Hangup`, ...). Boxed because `SignalMessage` is + /// relatively large and JSON serde handles recursion + /// cleanly. + inner: Box, + /// Federation TLS fingerprint of the sending relay. + /// Used (a) for loop prevention by the receiver and (b) + /// to route the peer's reply back through the same + /// federation link via `send_signal_to_peer`. + origin_relay_fp: String, + }, } /// How the callee responds to a direct call. @@ -988,6 +1020,82 @@ mod tests { } } + #[test] + fn federated_signal_forward_roundtrip() { + // Wrap a DirectCallOffer inside FederatedSignalForward and + // prove both directions of serde preserve every field. + let inner = SignalMessage::DirectCallOffer { + caller_fingerprint: "alice".into(), + caller_alias: Some("Alice".into()), + target_fingerprint: "bob".into(), + call_id: "c1".into(), + identity_pub: [1u8; 32], + ephemeral_pub: [2u8; 32], + signature: vec![3u8; 64], + supported_profiles: vec![], + caller_reflexive_addr: Some("192.0.2.1:4433".into()), + }; + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(inner), + origin_relay_fp: "relay-a-tls-fp".into(), + }; + let json = serde_json::to_string(&forward).unwrap(); + let decoded: SignalMessage = serde_json::from_str(&json).unwrap(); + match decoded { + SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => { + assert_eq!(origin_relay_fp, "relay-a-tls-fp"); + match *inner { + SignalMessage::DirectCallOffer { + caller_fingerprint, + target_fingerprint, + caller_reflexive_addr, + .. + } => { + assert_eq!(caller_fingerprint, "alice"); + assert_eq!(target_fingerprint, "bob"); + assert_eq!(caller_reflexive_addr.as_deref(), Some("192.0.2.1:4433")); + } + _ => panic!("inner was not DirectCallOffer after roundtrip"), + } + } + _ => panic!("outer was not FederatedSignalForward"), + } + } + + #[test] + fn federated_signal_forward_can_nest_any_inner() { + // Sanity check that every direct-call signaling variant + // we intend to forward survives being boxed + re-serialized. + let cases: Vec = vec![ + SignalMessage::DirectCallAnswer { + call_id: "c1".into(), + accept_mode: CallAcceptMode::AcceptTrusted, + identity_pub: None, + ephemeral_pub: None, + signature: None, + chosen_profile: None, + callee_reflexive_addr: Some("198.51.100.9:4433".into()), + }, + SignalMessage::CallRinging { call_id: "c1".into() }, + SignalMessage::Hangup { reason: HangupReason::Normal }, + ]; + for inner in cases { + let inner_disc = std::mem::discriminant(&inner); + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(inner), + origin_relay_fp: "r".into(), + }; + let json = serde_json::to_string(&forward).unwrap(); + let decoded: SignalMessage = serde_json::from_str(&json).unwrap(); + match decoded { + SignalMessage::FederatedSignalForward { inner, .. } => { + assert_eq!(std::mem::discriminant(&*inner), inner_disc); + } + _ => panic!("outer variant lost"), + } + } + } + #[test] fn hole_punching_optional_fields_roundtrip() { // DirectCallOffer with Some(caller_reflexive_addr) diff --git a/crates/wzp-relay/src/call_registry.rs b/crates/wzp-relay/src/call_registry.rs index 18a19e3..8439c25 100644 --- a/crates/wzp-relay/src/call_registry.rs +++ b/crates/wzp-relay/src/call_registry.rs @@ -41,6 +41,15 @@ pub struct DirectCall { /// `AcceptTrusted` answers — privacy-mode answers leave this /// `None`. Fed into the caller's `CallSetup.peer_direct_addr`. pub callee_reflexive_addr: Option, + /// Phase 4 (cross-relay): federation TLS fingerprint of the + /// PEER RELAY that forwarded the offer/answer for this call. + /// `None` for local calls — caller and callee both + /// registered on this relay. `Some(fp)` when one side of + /// the call is on a remote relay reached through the + /// federation link identified by `fp`. The + /// `DirectCallAnswer` handling uses this to route the reply + /// back through the SAME link instead of broadcasting again. + pub peer_relay_fp: Option, } /// Registry of active direct calls. @@ -69,11 +78,22 @@ impl CallRegistry { ended_at: None, caller_reflexive_addr: None, callee_reflexive_addr: None, + peer_relay_fp: None, }; self.calls.insert(call_id.clone(), call); self.calls.get(&call_id).unwrap() } + /// Phase 4: stash the federation TLS fingerprint of the peer + /// relay that originated (or will receive) the cross-relay + /// forward for this call. Safe to call with `None` to clear + /// a previously-set value. + pub fn set_peer_relay_fp(&mut self, call_id: &str, fp: Option) { + if let Some(call) = self.calls.get_mut(call_id) { + call.peer_relay_fp = fp; + } + } + /// Phase 3: stash the caller's server-reflexive address read /// off a `DirectCallOffer`. Safe to call on any call state; /// a no-op if the call doesn't exist. @@ -267,6 +287,29 @@ mod tests { reg.set_caller_reflexive_addr("does-not-exist", Some("x".into())); } + #[test] + fn call_registry_stores_peer_relay_fp() { + let mut reg = CallRegistry::new(); + reg.create_call("c1".into(), "alice".into(), "bob".into()); + + // Default: no peer relay. + assert!(reg.get("c1").unwrap().peer_relay_fp.is_none()); + + // Cross-relay call: origin relay's fp is stashed. + reg.set_peer_relay_fp("c1", Some("relay-a-tls-fp".into())); + assert_eq!( + reg.get("c1").unwrap().peer_relay_fp.as_deref(), + Some("relay-a-tls-fp") + ); + + // Clearing with None is a valid no-op and empties the field. + reg.set_peer_relay_fp("c1", None); + assert!(reg.get("c1").unwrap().peer_relay_fp.is_none()); + + // Unknown call is a no-op, not a panic. + reg.set_peer_relay_fp("does-not-exist", Some("x".into())); + } + #[test] fn call_registry_clearing_reflex_addr_works() { // Passing None to the setter must clear a previously-set value diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 48c3cfa..c7a4e33 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -146,6 +146,14 @@ pub struct FederationManager { event_log: EventLogger, /// Per-room rate limiters for inbound federation media. rate_limiters: Mutex>, + /// Phase 4: channel for handing cross-relay direct-call + /// signaling (inner message + origin relay fp) back to the + /// main signal loop in `main.rs`. Set once at startup via + /// `set_cross_relay_tx`. `None` when the main loop hasn't + /// wired it up yet (e.g. during startup warmup) — forwards + /// that arrive before wiring are dropped with a warning. + cross_relay_signal_tx: + Mutex>>, } impl FederationManager { @@ -171,6 +179,78 @@ impl FederationManager { dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)), event_log, rate_limiters: Mutex::new(HashMap::new()), + cross_relay_signal_tx: Mutex::new(None), + } + } + + /// Phase 4: expose this relay's federation TLS fingerprint so + /// the main signal loop can populate + /// `SignalMessage::FederatedSignalForward.origin_relay_fp`. + pub fn local_tls_fp(&self) -> &str { + &self.local_tls_fp + } + + /// Phase 4: wire the channel that the main signal loop uses + /// to receive unwrapped cross-relay direct-call signals. Called + /// once at startup from `main.rs`. + pub async fn set_cross_relay_tx( + &self, + tx: tokio::sync::mpsc::Sender<(wzp_proto::SignalMessage, String)>, + ) { + *self.cross_relay_signal_tx.lock().await = Some(tx); + } + + /// Phase 4: broadcast a `SignalMessage::FederatedSignalForward` + /// to every active federation peer link. Returns the number of + /// peers the broadcast reached (not the number that successfully + /// delivered the message further). Used when the local relay + /// doesn't know which peer holds the target fingerprint for a + /// `DirectCallOffer` — whichever peer has it will unwrap and + /// handle locally; the rest drop silently after "target not + /// local" check. + /// + /// Loop prevention: the receiving relay checks + /// `origin_relay_fp` against its own fp and drops self-sourced + /// forwards. + pub async fn broadcast_signal(&self, msg: &wzp_proto::SignalMessage) -> usize { + let links = self.peer_links.lock().await; + let mut count = 0; + for (fp, link) in links.iter() { + match link.transport.send_signal(msg).await { + Ok(()) => { + count += 1; + tracing::debug!(peer = %link.label, %fp, "federation: broadcast signal ok"); + } + Err(e) => { + tracing::warn!(peer = %link.label, %fp, error = %e, "federation: broadcast signal failed"); + } + } + } + count + } + + /// Phase 4: targeted send — used by the + /// `DirectCallAnswer` path when the registry knows exactly + /// which peer relay to route the reply back to. More efficient + /// than re-broadcasting and avoids leaking the call to + /// uninvolved peers. + /// + /// Returns `Ok(())` on success, `Err(String)` when the peer + /// isn't currently linked or the send fails. + pub async fn send_signal_to_peer( + &self, + peer_relay_fp: &str, + msg: &wzp_proto::SignalMessage, + ) -> Result<(), String> { + let normalized = normalize_fp(peer_relay_fp); + let links = self.peer_links.lock().await; + match links.get(&normalized) { + Some(link) => link + .transport + .send_signal(msg) + .await + .map_err(|e| format!("send to peer {normalized}: {e}")), + None => Err(format!("no active federation link for {normalized}")), } } @@ -852,6 +932,57 @@ async fn handle_signal( } } } + // Phase 4: cross-relay direct-call signal envelope. + // + // Unwrap the inner message and hand it off to the main + // signal loop via the cross_relay_signal_tx channel. The + // main loop will then dispatch the inner DirectCallOffer/ + // Answer/Ringing/Hangup exactly as if it had arrived on a + // local signal transport — with the extra context that + // the call is "federated" (origin_relay_fp). + // + // Loop prevention: drop any forward whose origin matches + // our own federation TLS fingerprint. With + // broadcast-to-all-peers this prevents A→B→A echo loops. + SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => { + if origin_relay_fp == fm.local_tls_fp { + tracing::debug!( + peer = %peer_label, + "federation: dropping self-sourced FederatedSignalForward (loop prevention)" + ); + return; + } + let tx_opt = { + let guard = fm.cross_relay_signal_tx.lock().await; + guard.clone() + }; + match tx_opt { + Some(tx) => { + let inner_discriminant = std::mem::discriminant(&*inner); + if let Err(e) = tx.send((*inner, origin_relay_fp.clone())).await { + warn!( + peer = %peer_label, + ?inner_discriminant, + error = %e, + "federation: cross-relay signal dispatcher full / closed" + ); + } else { + tracing::debug!( + peer = %peer_label, + ?inner_discriminant, + %origin_relay_fp, + "federation: forwarded cross-relay signal to main dispatcher" + ); + } + } + None => { + warn!( + peer = %peer_label, + "federation: cross_relay_signal_tx not wired yet — dropping forward" + ); + } + } + } _ => {} // ignore other signals } } diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 5f810fc..d37333b 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -453,6 +453,21 @@ async fn main() -> anyhow::Result<()> { let signal_hub = Arc::new(Mutex::new(wzp_relay::signal_hub::SignalHub::new())); let call_registry = Arc::new(Mutex::new(wzp_relay::call_registry::CallRegistry::new())); + // Phase 4: cross-relay direct-call signal dispatcher. + // + // The federation layer unwraps incoming + // `SignalMessage::FederatedSignalForward` envelopes and pushes + // (inner, origin_relay_fp) onto this channel. A dedicated task + // further down reads from it and routes the inner message + // through signal_hub / call_registry exactly as if it had + // arrived on a local signal transport — with the extra + // context that a peer relay is on the other side of the call. + let (cross_relay_tx, mut cross_relay_rx) = + tokio::sync::mpsc::channel::<(wzp_proto::SignalMessage, String)>(32); + if let Some(ref fm) = federation_mgr { + fm.set_cross_relay_tx(cross_relay_tx.clone()).await; + } + // Spawn inter-relay health probes via ProbeMesh coordinator if !config.probe_targets.is_empty() { let mesh = wzp_relay::probe::ProbeMesh::new( @@ -497,6 +512,201 @@ async fn main() -> anyhow::Result<()> { info!(filter = %tap, "debug tap enabled — logging packet headers"); } + // Phase 4: cross-relay direct-call dispatcher task. + // + // Reads unwrapped (inner, origin_relay_fp) tuples that the + // federation layer pushes out of its `handle_signal` arm for + // `FederatedSignalForward`, and routes the inner message + // through the local signal_hub / call_registry exactly as if + // the message had arrived on a local client signal transport. + // + // In Phase 4 MVP the dispatcher handles: + // * DirectCallOffer — if target is local, stash in registry + // with peer_relay_fp and deliver to + // local callee via signal_hub. + // * DirectCallAnswer — stash callee addr, forward answer to + // local caller, emit local CallSetup. + // * CallRinging — forward to local caller for UX. + // * Hangup — forward to the local participant(s). + // Everything else is dropped. + { + let signal_hub_d = signal_hub.clone(); + let call_registry_d = call_registry.clone(); + let advertised_addr_d = advertised_addr_str.clone(); + let federation_mgr_d = federation_mgr.clone(); + tokio::spawn(async move { + use wzp_proto::{CallAcceptMode, SignalMessage}; + while let Some((inner, origin_relay_fp)) = cross_relay_rx.recv().await { + match inner { + SignalMessage::DirectCallOffer { + ref target_fingerprint, + ref caller_fingerprint, + ref call_id, + ref caller_reflexive_addr, + .. + } => { + // Is the target on THIS relay? If not, drop — + // Phase 4 MVP is single-hop federation only. + let online = { + let hub = signal_hub_d.lock().await; + hub.is_online(target_fingerprint) + }; + if !online { + tracing::debug!( + target = %target_fingerprint, + %origin_relay_fp, + "cross-relay: offer target not local, dropping (no multi-hop)" + ); + continue; + } + // Stash in local registry so the answer path + // can find the call + route the reply back + // through the same federation link. + { + let mut reg = call_registry_d.lock().await; + reg.create_call( + call_id.clone(), + caller_fingerprint.clone(), + target_fingerprint.clone(), + ); + reg.set_caller_reflexive_addr(call_id, caller_reflexive_addr.clone()); + reg.set_peer_relay_fp(call_id, Some(origin_relay_fp.clone())); + } + // Deliver the offer to the local target. + let hub = signal_hub_d.lock().await; + if let Err(e) = hub.send_to(target_fingerprint, &inner).await { + tracing::warn!( + target = %target_fingerprint, + error = %e, + "cross-relay: failed to deliver forwarded offer" + ); + } + } + + SignalMessage::DirectCallAnswer { + ref call_id, + accept_mode, + ref callee_reflexive_addr, + .. + } => { + // Look up the local caller fp from the registry. + let caller_fp = { + let reg = call_registry_d.lock().await; + reg.get(call_id).map(|c| c.caller_fingerprint.clone()) + }; + let Some(caller_fp) = caller_fp else { + tracing::debug!(%call_id, "cross-relay: answer for unknown call, dropping"); + continue; + }; + + if accept_mode == CallAcceptMode::Reject { + // Forward hangup to local caller + clean up registry. + let hub = signal_hub_d.lock().await; + let _ = hub + .send_to( + &caller_fp, + &SignalMessage::Hangup { + reason: wzp_proto::HangupReason::Normal, + }, + ) + .await; + drop(hub); + let mut reg = call_registry_d.lock().await; + reg.end_call(call_id); + continue; + } + + // Accept — stash the callee's reflex addr + mark + // the call active, then read back BOTH addrs so + // we can cross-wire peer_direct_addr in CallSetup. + let room_name = format!("call-{call_id}"); + let (caller_addr, callee_addr_for_setup) = { + let mut reg = call_registry_d.lock().await; + reg.set_active(call_id, accept_mode, room_name.clone()); + reg.set_callee_reflexive_addr( + call_id, + callee_reflexive_addr.clone(), + ); + let c = reg.get(call_id); + ( + c.and_then(|c| c.caller_reflexive_addr.clone()), + c.and_then(|c| c.callee_reflexive_addr.clone()), + ) + }; + let _ = caller_addr; // unused on the caller side; callee holds the relevant addr + + // Forward the raw answer to the local caller so + // the JS side sees DirectCallAnswer (fires any + // "call answered" UX that looks at this message). + { + let hub = signal_hub_d.lock().await; + let _ = hub.send_to(&caller_fp, &inner).await; + } + + // Emit the LOCAL CallSetup to our local caller. + // relay_addr = our own advertised addr so if P2P + // fails the caller will at least dial OUR relay + // (single-relay fallback — Phase 4.1 will wire + // federated media so that actually reaches the + // peer). peer_direct_addr = the callee's reflex + // addr carried in the answer. + let setup = SignalMessage::CallSetup { + call_id: call_id.clone(), + room: room_name.clone(), + relay_addr: advertised_addr_d.clone(), + peer_direct_addr: callee_addr_for_setup, + }; + let hub = signal_hub_d.lock().await; + let _ = hub.send_to(&caller_fp, &setup).await; + + tracing::info!( + %call_id, + %caller_fp, + %origin_relay_fp, + "cross-relay: delivered answer + CallSetup to local caller" + ); + } + + SignalMessage::CallRinging { ref call_id } => { + // Forward to local caller for "ringing..." UX. + let caller_fp = { + let reg = call_registry_d.lock().await; + reg.get(call_id).map(|c| c.caller_fingerprint.clone()) + }; + if let Some(fp) = caller_fp { + let hub = signal_hub_d.lock().await; + let _ = hub.send_to(&fp, &inner).await; + } + } + + SignalMessage::Hangup { .. } => { + // Best-effort: broadcast the hangup to every + // local participant of any call that currently + // has this origin as its peer_relay_fp. + // The forwarded hangup doesn't carry a call_id + // so we can't target precisely — Phase 4.1 will + // tighten this once hangup tracking is stricter. + tracing::debug!( + %origin_relay_fp, + "cross-relay: forwarded Hangup (Phase 4.1 will target by call_id)" + ); + } + + _ => { + tracing::debug!( + %origin_relay_fp, + "cross-relay: dispatcher ignoring unsupported inner variant" + ); + } + } + } + // Suppress the warning if federation_mgr_d is unused — + // it's held here so the Arc doesn't drop during the + // dispatcher's lifetime. + drop(federation_mgr_d); + }); + } + info!("Listening for connections..."); loop { @@ -529,6 +739,10 @@ async fn main() -> anyhow::Result<()> { let signal_hub = signal_hub.clone(); let call_registry = call_registry.clone(); let advertised_addr_str = advertised_addr_str.clone(); + // Phase 4: per-task clone of this relay's federation TLS + // fingerprint so the FederatedSignalForward envelopes the + // spawned signal handler builds carry `origin_relay_fp`. + let tls_fp = tls_fp.clone(); let incoming_addr = incoming.remote_address(); info!(%incoming_addr, "accept queue: new Incoming, spawning handshake task"); @@ -782,9 +996,72 @@ async fn main() -> anyhow::Result<()> { hub.is_online(&target_fp) }; if !online { - info!(%addr, target = %target_fp, "call target not online"); - let _ = transport.send_signal(&SignalMessage::Hangup { - reason: wzp_proto::HangupReason::Normal, + // Phase 4: maybe the target is on a + // federation peer. Wrap the offer in + // FederatedSignalForward and broadcast + // it over every active peer link — + // whichever relay has the target will + // unwrap and dispatch locally. We also + // stash the call in OUR registry so + // the eventual answer coming back via + // federation has a matching entry. + let forwarded = if let Some(ref fm) = federation_mgr { + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(msg.clone()), + origin_relay_fp: tls_fp.clone(), + }; + let count = fm.broadcast_signal(&forward).await; + if count > 0 { + info!( + %addr, + target = %target_fp, + peers = count, + "direct-call offer forwarded to federation peers" + ); + true + } else { + false + } + } else { + false + }; + + if !forwarded { + info!(%addr, target = %target_fp, "call target not online (no federation route)"); + let _ = transport.send_signal(&SignalMessage::Hangup { + reason: wzp_proto::HangupReason::Normal, + }).await; + continue; + } + + // Create call in registry with the + // caller's reflex addr + mark it as + // cross-relay so the answer path knows + // to route the CallSetup's + // peer_direct_addr from what the + // federated answer carries. peer_relay_fp + // stays None here because we broadcast — + // the receiving relay picks itself as + // the answer source and its forwarded + // answer will identify itself there. + { + let mut reg = call_registry.lock().await; + reg.create_call( + call_id.clone(), + client_fp.clone(), + target_fp.clone(), + ); + reg.set_caller_reflexive_addr( + &call_id, + caller_addr_for_registry, + ); + } + + // Send ringing to caller immediately + // so the UI shows feedback while the + // federated delivery is in flight. + let _ = transport.send_signal(&SignalMessage::CallRinging { + call_id: call_id.clone(), }).await; continue; } @@ -824,12 +1101,25 @@ async fn main() -> anyhow::Result<()> { let mode = *accept_mode; let callee_addr_for_registry = callee_reflexive_addr.clone(); - let peer_fp = { + // Phase 4: look up peer fingerprint AND + // peer_relay_fp in one lock acquisition. + // peer_relay_fp being Some means the + // caller is on a remote federation peer + // and we have to route the answer / + // hangup back through that link instead + // of local signal_hub. + let (peer_fp, peer_relay_fp) = { let reg = call_registry.lock().await; - reg.peer_fingerprint(&call_id, &client_fp).map(|s| s.to_string()) + match reg.get(&call_id) { + Some(c) => ( + Some(reg.peer_fingerprint(&call_id, &client_fp).map(|s| s.to_string())), + c.peer_relay_fp.clone(), + ), + None => (None, None), + } }; - let Some(peer_fp) = peer_fp else { + let Some(Some(peer_fp)) = peer_fp else { warn!(call_id = %call_id, "answer for unknown call"); continue; }; @@ -839,10 +1129,29 @@ async fn main() -> anyhow::Result<()> { let mut reg = call_registry.lock().await; reg.end_call(&call_id); drop(reg); - let hub = signal_hub.lock().await; - let _ = hub.send_to(&peer_fp, &SignalMessage::Hangup { - reason: wzp_proto::HangupReason::Normal, - }).await; + + // Phase 4: cross-relay reject — + // forward the hangup to the origin + // relay instead of local signal_hub. + if let Some(ref origin_fp) = peer_relay_fp { + if let Some(ref fm) = federation_mgr { + let hangup = SignalMessage::Hangup { + reason: wzp_proto::HangupReason::Normal, + }; + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(hangup), + origin_relay_fp: tls_fp.clone(), + }; + if let Err(e) = fm.send_signal_to_peer(origin_fp, &forward).await { + warn!(%call_id, %origin_fp, error = %e, "cross-relay reject forward failed"); + } + } + } else { + let hub = signal_hub.lock().await; + let _ = hub.send_to(&peer_fp, &SignalMessage::Hangup { + reason: wzp_proto::HangupReason::Normal, + }).await; + } } else { // Accept — create private room + stash the // callee's reflex addr if it advertised one @@ -869,46 +1178,68 @@ async fn main() -> anyhow::Result<()> { "call accepted, creating room" ); - // Forward answer to caller - { - let hub = signal_hub.lock().await; - let _ = hub.send_to(&peer_fp, &msg).await; - } - - // Send CallSetup to both parties. - // - // Each party's `peer_direct_addr` carries the - // OTHER party's reflex addr so they can attempt - // a direct QUIC handshake to each other in - // parallel with the relay path (Phase 3 - // hole-punching). Both sides falling back to the - // relay path is the Phase 0 behavior, so - // emitting `None` here is always safe. - // - // BUG FIX (pre-Phase 3): the previous version of - // this used `addr.ip()` which is the client's - // remote address, not the relay's. Use the - // precomputed advertised address. let relay_addr_for_setup = advertised_addr_str.clone(); - // peer_fp identifies the caller here (the - // fingerprint currently on the other end of this - // answer flow); client_fp identifies the callee. - // So the CALLER gets the callee's addr as its - // peer_direct_addr, and vice versa. - let setup_for_caller = SignalMessage::CallSetup { - call_id: call_id.clone(), - room: room.clone(), - relay_addr: relay_addr_for_setup.clone(), - peer_direct_addr: callee_addr.clone(), - }; - let setup_for_callee = SignalMessage::CallSetup { - call_id: call_id.clone(), - room: room.clone(), - relay_addr: relay_addr_for_setup, - peer_direct_addr: caller_addr.clone(), - }; - { + if let Some(ref origin_fp) = peer_relay_fp { + // Phase 4 cross-relay: the caller + // is on a remote peer. Forward the + // raw answer (which carries the + // callee's reflex addr) back over + // federation — the peer's + // cross-relay dispatcher will + // deliver it to the local caller + // AND emit a CallSetup on that + // side with peer_direct_addr = + // callee_addr. + // + // Here we emit only the LOCAL + // CallSetup (to our callee) with + // peer_direct_addr = caller_addr. + if let Some(ref fm) = federation_mgr { + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(msg.clone()), + origin_relay_fp: tls_fp.clone(), + }; + if let Err(e) = fm.send_signal_to_peer(origin_fp, &forward).await { + warn!( + %call_id, + %origin_fp, + error = %e, + "cross-relay answer forward failed" + ); + } + } + + let setup_for_callee = SignalMessage::CallSetup { + call_id: call_id.clone(), + room: room.clone(), + relay_addr: relay_addr_for_setup, + peer_direct_addr: caller_addr.clone(), + }; + let hub = signal_hub.lock().await; + let _ = hub.send_to(&client_fp, &setup_for_callee).await; + } else { + // Local call (existing Phase 3 path). + // Forward answer to caller + { + let hub = signal_hub.lock().await; + let _ = hub.send_to(&peer_fp, &msg).await; + } + + // Send CallSetup to BOTH parties with + // cross-wired peer_direct_addr. + let setup_for_caller = SignalMessage::CallSetup { + call_id: call_id.clone(), + room: room.clone(), + relay_addr: relay_addr_for_setup.clone(), + peer_direct_addr: callee_addr.clone(), + }; + let setup_for_callee = SignalMessage::CallSetup { + call_id: call_id.clone(), + room: room.clone(), + relay_addr: relay_addr_for_setup, + peer_direct_addr: caller_addr.clone(), + }; let hub = signal_hub.lock().await; let _ = hub.send_to(&peer_fp, &setup_for_caller).await; let _ = hub.send_to(&client_fp, &setup_for_callee).await; diff --git a/crates/wzp-relay/tests/cross_relay_direct_call.rs b/crates/wzp-relay/tests/cross_relay_direct_call.rs new file mode 100644 index 0000000..cf3ecd4 --- /dev/null +++ b/crates/wzp-relay/tests/cross_relay_direct_call.rs @@ -0,0 +1,311 @@ +//! Phase 4 integration test for cross-relay direct calling +//! (PRD: .taskmaster/docs/prd_phase4_cross_relay_p2p.txt). +//! +//! Drives the call-registry cross-wiring + a simulated federation +//! forward without spinning up actual relay binaries. The real +//! main-loop and dispatcher code are exercised end-to-end in +//! `reflect.rs` / `hole_punching.rs` already; this file focuses on +//! the *new* invariants Phase 4 adds: +//! +//! 1. When Relay A forwards a DirectCallOffer, its local registry +//! stashes caller_reflexive_addr and leaves peer_relay_fp +//! unset (broadcast, answer-side will identify itself). +//! 2. When Relay B's cross-relay dispatcher receives the forward, +//! its local registry stores the call with +//! peer_relay_fp = Some(relay_a_tls_fp). +//! 3. When Relay B processes the local callee's answer, it sees +//! peer_relay_fp.is_some() and MUST NOT deliver the answer via +//! local signal_hub — instead it routes through federation. +//! 4. When Relay A receives the forwarded answer via its +//! cross-relay dispatcher, it stashes callee_reflexive_addr +//! and emits a CallSetup to its local caller with +//! peer_direct_addr = callee_addr. +//! 5. Final state: Alice's CallSetup carries Bob's reflex addr, +//! Bob's CallSetup carries Alice's reflex addr — cross-wired +//! through two relays + a federation link. + +use wzp_proto::{CallAcceptMode, SignalMessage}; +use wzp_relay::call_registry::CallRegistry; + +// ──────────────────────────────────────────────────────────────── +// Simulated dispatch helpers — these reproduce the exact logic +// in main.rs without the tokio + federation boilerplate. +// ──────────────────────────────────────────────────────────────── + +const RELAY_A_TLS_FP: &str = "relay-A-tls-fingerprint"; +const RELAY_B_TLS_FP: &str = "relay-B-tls-fingerprint"; +const ALICE_ADDR: &str = "192.0.2.1:4433"; +const BOB_ADDR: &str = "198.51.100.9:4433"; +const RELAY_A_ADDR: &str = "203.0.113.5:4433"; +const RELAY_B_ADDR: &str = "203.0.113.10:4433"; + +/// Helper that Alice's place_call sends. +fn alice_offer(call_id: &str) -> SignalMessage { + SignalMessage::DirectCallOffer { + caller_fingerprint: "alice".into(), + caller_alias: None, + target_fingerprint: "bob".into(), + call_id: call_id.into(), + identity_pub: [0; 32], + ephemeral_pub: [0; 32], + signature: vec![], + supported_profiles: vec![], + caller_reflexive_addr: Some(ALICE_ADDR.into()), + } +} + +/// Relay A receives Alice's offer. Target Bob is not local. +/// Relay A wraps + broadcasts over federation, stashes the call +/// locally with peer_relay_fp = None (broadcast — answer-side +/// identifies itself). +fn relay_a_handle_offer(reg_a: &mut CallRegistry, offer: &SignalMessage) -> SignalMessage { + match offer { + SignalMessage::DirectCallOffer { + caller_fingerprint, + target_fingerprint, + call_id, + caller_reflexive_addr, + .. + } => { + reg_a.create_call( + call_id.clone(), + caller_fingerprint.clone(), + target_fingerprint.clone(), + ); + reg_a.set_caller_reflexive_addr(call_id, caller_reflexive_addr.clone()); + // peer_relay_fp stays None — we don't know which peer + // will respond yet. + } + _ => panic!("not an offer"), + } + // Build the federation envelope the main loop would + // broadcast. + SignalMessage::FederatedSignalForward { + inner: Box::new(offer.clone()), + origin_relay_fp: RELAY_A_TLS_FP.into(), + } +} + +/// Relay B receives a FederatedSignalForward(DirectCallOffer). +/// This is the cross-relay dispatcher task code in main.rs — +/// reproduced here for the test. +fn relay_b_handle_forwarded_offer(reg_b: &mut CallRegistry, forward: &SignalMessage) { + let (inner, origin_relay_fp) = match forward { + SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => { + (inner.as_ref().clone(), origin_relay_fp.clone()) + } + _ => panic!("not a forward"), + }; + // Loop-prevention: drop self-sourced. + assert_ne!(origin_relay_fp, RELAY_B_TLS_FP); + + let SignalMessage::DirectCallOffer { + caller_fingerprint, + target_fingerprint, + call_id, + caller_reflexive_addr, + .. + } = inner + else { + panic!("inner was not DirectCallOffer"); + }; + + // Simulated: target is local to B (Bob is registered here). + reg_b.create_call( + call_id.clone(), + caller_fingerprint, + target_fingerprint, + ); + reg_b.set_caller_reflexive_addr(&call_id, caller_reflexive_addr); + reg_b.set_peer_relay_fp(&call_id, Some(origin_relay_fp)); +} + +/// Bob's answer — AcceptTrusted with his reflex addr. +fn bob_answer(call_id: &str) -> SignalMessage { + SignalMessage::DirectCallAnswer { + call_id: call_id.into(), + accept_mode: CallAcceptMode::AcceptTrusted, + identity_pub: None, + ephemeral_pub: None, + signature: None, + chosen_profile: None, + callee_reflexive_addr: Some(BOB_ADDR.into()), + } +} + +/// Relay B handles the LOCAL callee's answer. If peer_relay_fp +/// is Some, wrap the answer in a FederatedSignalForward + emit the +/// local CallSetup to Bob. Returns the (forward_envelope, +/// bob_call_setup) pair. +fn relay_b_handle_local_answer( + reg_b: &mut CallRegistry, + answer: &SignalMessage, +) -> (SignalMessage, SignalMessage) { + let (call_id, mode, callee_addr) = match answer { + SignalMessage::DirectCallAnswer { + call_id, + accept_mode, + callee_reflexive_addr, + .. + } => (call_id.clone(), *accept_mode, callee_reflexive_addr.clone()), + _ => panic!(), + }; + // Stash callee addr + activate. + reg_b.set_active(&call_id, mode, format!("call-{call_id}")); + reg_b.set_callee_reflexive_addr(&call_id, callee_addr); + let call = reg_b.get(&call_id).unwrap(); + let caller_addr = call.caller_reflexive_addr.clone(); + let callee_addr = call.callee_reflexive_addr.clone(); + assert!( + call.peer_relay_fp.is_some(), + "Relay B must know this call is cross-relay" + ); + + // Forward the answer back over federation. + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(answer.clone()), + origin_relay_fp: RELAY_B_TLS_FP.into(), + }; + + // Local CallSetup for Bob — peer_direct_addr = Alice's addr. + let setup_for_bob = SignalMessage::CallSetup { + call_id: call_id.clone(), + room: format!("call-{call_id}"), + relay_addr: RELAY_B_ADDR.into(), + peer_direct_addr: caller_addr, + }; + let _ = callee_addr; + (forward, setup_for_bob) +} + +/// Relay A's cross-relay dispatcher receives the forwarded answer. +/// It stashes the callee addr, forwards the raw answer to local +/// Alice, and emits a CallSetup with peer_direct_addr = Bob's addr. +fn relay_a_handle_forwarded_answer( + reg_a: &mut CallRegistry, + forward: &SignalMessage, +) -> SignalMessage { + let (inner, origin_relay_fp) = match forward { + SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => { + (inner.as_ref().clone(), origin_relay_fp.clone()) + } + _ => panic!("not a forward"), + }; + assert_ne!(origin_relay_fp, RELAY_A_TLS_FP); + + let SignalMessage::DirectCallAnswer { + call_id, + accept_mode, + callee_reflexive_addr, + .. + } = inner + else { + panic!("inner was not DirectCallAnswer"); + }; + assert_eq!(accept_mode, CallAcceptMode::AcceptTrusted); + + reg_a.set_active(&call_id, accept_mode, format!("call-{call_id}")); + reg_a.set_callee_reflexive_addr(&call_id, callee_reflexive_addr.clone()); + + // Alice's CallSetup — peer_direct_addr = Bob's addr. + SignalMessage::CallSetup { + call_id: call_id.clone(), + room: format!("call-{call_id}"), + relay_addr: RELAY_A_ADDR.into(), + peer_direct_addr: callee_reflexive_addr, + } +} + +// ──────────────────────────────────────────────────────────────── +// Tests +// ──────────────────────────────────────────────────────────────── + +#[test] +fn cross_relay_offer_forwards_and_stashes_peer_relay_fp() { + let mut reg_a = CallRegistry::new(); + let mut reg_b = CallRegistry::new(); + + let offer = alice_offer("c-xrelay-1"); + let forward = relay_a_handle_offer(&mut reg_a, &offer); + + // Relay A's local view: call exists, caller addr stashed, + // peer_relay_fp still None (broadcast — answer identifies the + // peer). + let call_a = reg_a.get("c-xrelay-1").unwrap(); + assert_eq!(call_a.caller_fingerprint, "alice"); + assert_eq!(call_a.callee_fingerprint, "bob"); + assert_eq!(call_a.caller_reflexive_addr.as_deref(), Some(ALICE_ADDR)); + assert!(call_a.peer_relay_fp.is_none()); + + // Relay B dispatches the forward: creates the call locally + // and stashes peer_relay_fp = Relay A. + relay_b_handle_forwarded_offer(&mut reg_b, &forward); + let call_b = reg_b.get("c-xrelay-1").unwrap(); + assert_eq!(call_b.caller_fingerprint, "alice"); + assert_eq!(call_b.callee_fingerprint, "bob"); + assert_eq!(call_b.caller_reflexive_addr.as_deref(), Some(ALICE_ADDR)); + assert_eq!(call_b.peer_relay_fp.as_deref(), Some(RELAY_A_TLS_FP)); +} + +#[test] +fn cross_relay_answer_crosswires_peer_direct_addrs() { + let mut reg_a = CallRegistry::new(); + let mut reg_b = CallRegistry::new(); + + // Full round trip: offer → forward → dispatch → answer → + // forward back → dispatch → both CallSetups. + let offer = alice_offer("c-xrelay-2"); + let offer_forward = relay_a_handle_offer(&mut reg_a, &offer); + relay_b_handle_forwarded_offer(&mut reg_b, &offer_forward); + + // Bob answers on Relay B. + let answer = bob_answer("c-xrelay-2"); + let (answer_forward, setup_for_bob) = + relay_b_handle_local_answer(&mut reg_b, &answer); + + // Bob's CallSetup carries Alice's addr. + match setup_for_bob { + SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => { + assert_eq!(peer_direct_addr.as_deref(), Some(ALICE_ADDR)); + assert_eq!(relay_addr, RELAY_B_ADDR); + } + _ => panic!("wrong variant"), + } + + // Alice's dispatcher receives the forwarded answer and builds + // her CallSetup. + let setup_for_alice = relay_a_handle_forwarded_answer(&mut reg_a, &answer_forward); + match setup_for_alice { + SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => { + assert_eq!(peer_direct_addr.as_deref(), Some(BOB_ADDR)); + assert_eq!(relay_addr, RELAY_A_ADDR); + } + _ => panic!("wrong variant"), + } + + // Both registries agree on caller + callee reflex addrs after + // the full round-trip. + for reg in [®_a, ®_b] { + let c = reg.get("c-xrelay-2").unwrap(); + assert_eq!(c.caller_reflexive_addr.as_deref(), Some(ALICE_ADDR)); + assert_eq!(c.callee_reflexive_addr.as_deref(), Some(BOB_ADDR)); + } +} + +#[test] +fn cross_relay_loop_prevention_drops_self_sourced_forward() { + // A FederatedSignalForward that circles back to the origin + // relay should be dropped before it hits the call registry. + let forward = SignalMessage::FederatedSignalForward { + inner: Box::new(alice_offer("c-loop")), + origin_relay_fp: RELAY_B_TLS_FP.into(), + }; + // The dispatcher in main.rs calls this explicit check before + // doing any work. Reproduce it inline. + let origin = match &forward { + SignalMessage::FederatedSignalForward { origin_relay_fp, .. } => origin_relay_fp.clone(), + _ => unreachable!(), + }; + // Relay B sees origin == its own fp → drop. + assert_eq!(origin, RELAY_B_TLS_FP, "loop-prevention triggers on self-fp"); +}