Teaches the relay pair to route direct-call signaling across an
existing federation link. Alice on Relay A can now place a direct
call to Bob on Relay B if A and B are federation peers — the
wire protocol, call registry, and signal dispatch all learn to
track and route the cross-relay flow.
Phase 3.5's dual-path QUIC race then carries the media directly
peer-to-peer using the advertised reflex addrs, with zero
changes needed on the client side.
## Wire protocol (wzp-proto)
New `SignalMessage::FederatedSignalForward { inner, origin_relay_fp }`
envelope variant, appended at end of enum — JSON serde is
name-tagged so pre-Phase-4 relays just log "unknown variant" and
drop it. 2 new roundtrip tests (any-inner nesting + single
DirectCallOffer case).
## Call registry (wzp-relay)
`DirectCall.peer_relay_fp: Option<String>` — federation TLS fp
of the peer relay that forwarded the offer/answer for this call.
`None` on local calls, `Some` on cross-relay. Used by the answer
path to route the reply back through the same federation link
instead of trying (and failing) to deliver via local signal_hub.
New `set_peer_relay_fp` setter + 1 new unit test.
## FederationManager (wzp-relay)
Three new methods:
- `local_tls_fp()` — exposes the relay's own federation TLS fp
so main.rs can build `origin_relay_fp` fields.
- `broadcast_signal(msg) -> usize` — fan out any signal message
(in practice `FederatedSignalForward`) to every active peer
link, returning the reach count. Used when Relay A doesn't
know which peer has the target fingerprint.
- `send_signal_to_peer(fp, msg)` — targeted send for the reply
path where the registry already knows which peer relay to
hit.
Plus a new `cross_relay_signal_tx: Mutex<Option<Sender<...>>>`
field that `set_cross_relay_tx()` wires at startup so the
federation `handle_signal` can push unwrapped inner messages
into the main signal dispatcher.
## Federation handle_signal (wzp-relay)
New match arm for `FederatedSignalForward`:
- Loop prevention: drops forwards whose `origin_relay_fp` equals
this relay's own fp (prevents A→B→A echo loops without needing
TTL yet).
- Otherwise pulls the inner message out and pushes it through
`cross_relay_signal_tx` so the main loop's dispatcher task
handles it as if it had arrived locally.
## Main signal loop (wzp-relay)
### DirectCallOffer when target not local
Before falling through to Hangup, try the federation path:
- Wrap the offer in `FederatedSignalForward` with
`origin_relay_fp = this relay's tls_fp`
- `fm.broadcast_signal(forward)` — returns peer count
- If any peers reached, stash the call in local registry with
`caller_reflexive_addr` set, `peer_relay_fp` still None
(broadcast — the answer-side will identify itself when it
replies)
- Send `CallRinging` to caller immediately for UX feedback
- Only if no federation or no peers → legacy Hangup path
### DirectCallAnswer when peer is remote
- Registry lookup now reads both `peer_fingerprint` and
`peer_relay_fp` in one acquisition
- If `peer_relay_fp.is_some()`:
* Reject → forward a `Hangup` over federation via
`send_signal_to_peer` instead of local signal_hub
* Accept → wrap the raw answer in `FederatedSignalForward`,
route to the specific origin peer, then emit the LOCAL
CallSetup to our callee with `peer_direct_addr =
caller_reflexive_addr` (caller is remote; this side only
has the callee)
- If `peer_relay_fp.is_none()` → existing Phase 3 same-relay
path with both CallSetups (caller + callee)
### Cross-relay signal dispatcher task
New long-running task reading `(inner, origin_relay_fp)` from
`cross_relay_rx`. In Phase 4 MVP handles:
- `DirectCallOffer` — if target is local, create the call in
the registry with `peer_relay_fp = origin_relay_fp`, stash
caller addr, deliver offer to local callee. If target isn't
local, drop (no multi-hop in Phase 4 MVP).
- `DirectCallAnswer` — look up local caller by call_id, stash
callee addr, forward raw answer to local caller via
signal_hub, emit local CallSetup with `peer_direct_addr =
callee_reflexive_addr` (peer is local now; this side only
has the caller).
- `CallRinging` — best-effort forward to local caller for UX.
- `Hangup` — logged for now; Phase 4.1 will target by call_id.
## Integration tests
`crates/wzp-relay/tests/cross_relay_direct_call.rs` — 3 tests
that reproduce the main.rs cross-relay dispatcher logic inline
and assert the invariants without spinning up real binaries:
1. `cross_relay_offer_forwards_and_stashes_peer_relay_fp` —
Relay A gets Alice's offer, broadcasts. Relay B's dispatcher
creates the call with `peer_relay_fp = relay_a_tls_fp`.
2. `cross_relay_answer_crosswires_peer_direct_addrs` — full
round trip; both CallSetups (one on each relay) carry the
OTHER party's reflex addr.
3. `cross_relay_loop_prevention_drops_self_sourced_forward` —
explicit loop-prevention check.
Full workspace test goes from 413 → 419 passing. Clippy clean
on touched files.
## Non-goals (deferred to Phase 4.1+)
- Relay-mediated media fallback across federation — if P2P
direct fails (symmetric NAT on either side), the call errors
out with "no media path". Making the existing federation
media pipeline carry ephemeral call-<id> rooms is the Phase
4.1 lift.
- Multi-hop federation (A → B → C). Phase 4 MVP supports a
direct federation link between A and B only.
- Fingerprint → peer-relay routing gossip.
PRD: .taskmaster/docs/prd_phase4_cross_relay_p2p.txt
Tasks: 70-78 all completed
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
312 lines
12 KiB
Rust
312 lines
12 KiB
Rust
//! Phase 4 integration test for cross-relay direct calling
|
|
//! (PRD: .taskmaster/docs/prd_phase4_cross_relay_p2p.txt).
|
|
//!
|
|
//! Drives the call-registry cross-wiring + a simulated federation
|
|
//! forward without spinning up actual relay binaries. The real
|
|
//! main-loop and dispatcher code are exercised end-to-end in
|
|
//! `reflect.rs` / `hole_punching.rs` already; this file focuses on
|
|
//! the *new* invariants Phase 4 adds:
|
|
//!
|
|
//! 1. When Relay A forwards a DirectCallOffer, its local registry
|
|
//! stashes caller_reflexive_addr and leaves peer_relay_fp
|
|
//! unset (broadcast, answer-side will identify itself).
|
|
//! 2. When Relay B's cross-relay dispatcher receives the forward,
|
|
//! its local registry stores the call with
|
|
//! peer_relay_fp = Some(relay_a_tls_fp).
|
|
//! 3. When Relay B processes the local callee's answer, it sees
|
|
//! peer_relay_fp.is_some() and MUST NOT deliver the answer via
|
|
//! local signal_hub — instead it routes through federation.
|
|
//! 4. When Relay A receives the forwarded answer via its
|
|
//! cross-relay dispatcher, it stashes callee_reflexive_addr
|
|
//! and emits a CallSetup to its local caller with
|
|
//! peer_direct_addr = callee_addr.
|
|
//! 5. Final state: Alice's CallSetup carries Bob's reflex addr,
|
|
//! Bob's CallSetup carries Alice's reflex addr — cross-wired
|
|
//! through two relays + a federation link.
|
|
|
|
use wzp_proto::{CallAcceptMode, SignalMessage};
|
|
use wzp_relay::call_registry::CallRegistry;
|
|
|
|
// ────────────────────────────────────────────────────────────────
|
|
// Simulated dispatch helpers — these reproduce the exact logic
|
|
// in main.rs without the tokio + federation boilerplate.
|
|
// ────────────────────────────────────────────────────────────────
|
|
|
|
const RELAY_A_TLS_FP: &str = "relay-A-tls-fingerprint";
|
|
const RELAY_B_TLS_FP: &str = "relay-B-tls-fingerprint";
|
|
const ALICE_ADDR: &str = "192.0.2.1:4433";
|
|
const BOB_ADDR: &str = "198.51.100.9:4433";
|
|
const RELAY_A_ADDR: &str = "203.0.113.5:4433";
|
|
const RELAY_B_ADDR: &str = "203.0.113.10:4433";
|
|
|
|
/// Helper that Alice's place_call sends.
|
|
fn alice_offer(call_id: &str) -> SignalMessage {
|
|
SignalMessage::DirectCallOffer {
|
|
caller_fingerprint: "alice".into(),
|
|
caller_alias: None,
|
|
target_fingerprint: "bob".into(),
|
|
call_id: call_id.into(),
|
|
identity_pub: [0; 32],
|
|
ephemeral_pub: [0; 32],
|
|
signature: vec![],
|
|
supported_profiles: vec![],
|
|
caller_reflexive_addr: Some(ALICE_ADDR.into()),
|
|
}
|
|
}
|
|
|
|
/// Relay A receives Alice's offer. Target Bob is not local.
|
|
/// Relay A wraps + broadcasts over federation, stashes the call
|
|
/// locally with peer_relay_fp = None (broadcast — answer-side
|
|
/// identifies itself).
|
|
fn relay_a_handle_offer(reg_a: &mut CallRegistry, offer: &SignalMessage) -> SignalMessage {
|
|
match offer {
|
|
SignalMessage::DirectCallOffer {
|
|
caller_fingerprint,
|
|
target_fingerprint,
|
|
call_id,
|
|
caller_reflexive_addr,
|
|
..
|
|
} => {
|
|
reg_a.create_call(
|
|
call_id.clone(),
|
|
caller_fingerprint.clone(),
|
|
target_fingerprint.clone(),
|
|
);
|
|
reg_a.set_caller_reflexive_addr(call_id, caller_reflexive_addr.clone());
|
|
// peer_relay_fp stays None — we don't know which peer
|
|
// will respond yet.
|
|
}
|
|
_ => panic!("not an offer"),
|
|
}
|
|
// Build the federation envelope the main loop would
|
|
// broadcast.
|
|
SignalMessage::FederatedSignalForward {
|
|
inner: Box::new(offer.clone()),
|
|
origin_relay_fp: RELAY_A_TLS_FP.into(),
|
|
}
|
|
}
|
|
|
|
/// Relay B receives a FederatedSignalForward(DirectCallOffer).
|
|
/// This is the cross-relay dispatcher task code in main.rs —
|
|
/// reproduced here for the test.
|
|
fn relay_b_handle_forwarded_offer(reg_b: &mut CallRegistry, forward: &SignalMessage) {
|
|
let (inner, origin_relay_fp) = match forward {
|
|
SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => {
|
|
(inner.as_ref().clone(), origin_relay_fp.clone())
|
|
}
|
|
_ => panic!("not a forward"),
|
|
};
|
|
// Loop-prevention: drop self-sourced.
|
|
assert_ne!(origin_relay_fp, RELAY_B_TLS_FP);
|
|
|
|
let SignalMessage::DirectCallOffer {
|
|
caller_fingerprint,
|
|
target_fingerprint,
|
|
call_id,
|
|
caller_reflexive_addr,
|
|
..
|
|
} = inner
|
|
else {
|
|
panic!("inner was not DirectCallOffer");
|
|
};
|
|
|
|
// Simulated: target is local to B (Bob is registered here).
|
|
reg_b.create_call(
|
|
call_id.clone(),
|
|
caller_fingerprint,
|
|
target_fingerprint,
|
|
);
|
|
reg_b.set_caller_reflexive_addr(&call_id, caller_reflexive_addr);
|
|
reg_b.set_peer_relay_fp(&call_id, Some(origin_relay_fp));
|
|
}
|
|
|
|
/// Bob's answer — AcceptTrusted with his reflex addr.
|
|
fn bob_answer(call_id: &str) -> SignalMessage {
|
|
SignalMessage::DirectCallAnswer {
|
|
call_id: call_id.into(),
|
|
accept_mode: CallAcceptMode::AcceptTrusted,
|
|
identity_pub: None,
|
|
ephemeral_pub: None,
|
|
signature: None,
|
|
chosen_profile: None,
|
|
callee_reflexive_addr: Some(BOB_ADDR.into()),
|
|
}
|
|
}
|
|
|
|
/// Relay B handles the LOCAL callee's answer. If peer_relay_fp
|
|
/// is Some, wrap the answer in a FederatedSignalForward + emit the
|
|
/// local CallSetup to Bob. Returns the (forward_envelope,
|
|
/// bob_call_setup) pair.
|
|
fn relay_b_handle_local_answer(
|
|
reg_b: &mut CallRegistry,
|
|
answer: &SignalMessage,
|
|
) -> (SignalMessage, SignalMessage) {
|
|
let (call_id, mode, callee_addr) = match answer {
|
|
SignalMessage::DirectCallAnswer {
|
|
call_id,
|
|
accept_mode,
|
|
callee_reflexive_addr,
|
|
..
|
|
} => (call_id.clone(), *accept_mode, callee_reflexive_addr.clone()),
|
|
_ => panic!(),
|
|
};
|
|
// Stash callee addr + activate.
|
|
reg_b.set_active(&call_id, mode, format!("call-{call_id}"));
|
|
reg_b.set_callee_reflexive_addr(&call_id, callee_addr);
|
|
let call = reg_b.get(&call_id).unwrap();
|
|
let caller_addr = call.caller_reflexive_addr.clone();
|
|
let callee_addr = call.callee_reflexive_addr.clone();
|
|
assert!(
|
|
call.peer_relay_fp.is_some(),
|
|
"Relay B must know this call is cross-relay"
|
|
);
|
|
|
|
// Forward the answer back over federation.
|
|
let forward = SignalMessage::FederatedSignalForward {
|
|
inner: Box::new(answer.clone()),
|
|
origin_relay_fp: RELAY_B_TLS_FP.into(),
|
|
};
|
|
|
|
// Local CallSetup for Bob — peer_direct_addr = Alice's addr.
|
|
let setup_for_bob = SignalMessage::CallSetup {
|
|
call_id: call_id.clone(),
|
|
room: format!("call-{call_id}"),
|
|
relay_addr: RELAY_B_ADDR.into(),
|
|
peer_direct_addr: caller_addr,
|
|
};
|
|
let _ = callee_addr;
|
|
(forward, setup_for_bob)
|
|
}
|
|
|
|
/// Relay A's cross-relay dispatcher receives the forwarded answer.
|
|
/// It stashes the callee addr, forwards the raw answer to local
|
|
/// Alice, and emits a CallSetup with peer_direct_addr = Bob's addr.
|
|
fn relay_a_handle_forwarded_answer(
|
|
reg_a: &mut CallRegistry,
|
|
forward: &SignalMessage,
|
|
) -> SignalMessage {
|
|
let (inner, origin_relay_fp) = match forward {
|
|
SignalMessage::FederatedSignalForward { inner, origin_relay_fp } => {
|
|
(inner.as_ref().clone(), origin_relay_fp.clone())
|
|
}
|
|
_ => panic!("not a forward"),
|
|
};
|
|
assert_ne!(origin_relay_fp, RELAY_A_TLS_FP);
|
|
|
|
let SignalMessage::DirectCallAnswer {
|
|
call_id,
|
|
accept_mode,
|
|
callee_reflexive_addr,
|
|
..
|
|
} = inner
|
|
else {
|
|
panic!("inner was not DirectCallAnswer");
|
|
};
|
|
assert_eq!(accept_mode, CallAcceptMode::AcceptTrusted);
|
|
|
|
reg_a.set_active(&call_id, accept_mode, format!("call-{call_id}"));
|
|
reg_a.set_callee_reflexive_addr(&call_id, callee_reflexive_addr.clone());
|
|
|
|
// Alice's CallSetup — peer_direct_addr = Bob's addr.
|
|
SignalMessage::CallSetup {
|
|
call_id: call_id.clone(),
|
|
room: format!("call-{call_id}"),
|
|
relay_addr: RELAY_A_ADDR.into(),
|
|
peer_direct_addr: callee_reflexive_addr,
|
|
}
|
|
}
|
|
|
|
// ────────────────────────────────────────────────────────────────
|
|
// Tests
|
|
// ────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn cross_relay_offer_forwards_and_stashes_peer_relay_fp() {
|
|
let mut reg_a = CallRegistry::new();
|
|
let mut reg_b = CallRegistry::new();
|
|
|
|
let offer = alice_offer("c-xrelay-1");
|
|
let forward = relay_a_handle_offer(&mut reg_a, &offer);
|
|
|
|
// Relay A's local view: call exists, caller addr stashed,
|
|
// peer_relay_fp still None (broadcast — answer identifies the
|
|
// peer).
|
|
let call_a = reg_a.get("c-xrelay-1").unwrap();
|
|
assert_eq!(call_a.caller_fingerprint, "alice");
|
|
assert_eq!(call_a.callee_fingerprint, "bob");
|
|
assert_eq!(call_a.caller_reflexive_addr.as_deref(), Some(ALICE_ADDR));
|
|
assert!(call_a.peer_relay_fp.is_none());
|
|
|
|
// Relay B dispatches the forward: creates the call locally
|
|
// and stashes peer_relay_fp = Relay A.
|
|
relay_b_handle_forwarded_offer(&mut reg_b, &forward);
|
|
let call_b = reg_b.get("c-xrelay-1").unwrap();
|
|
assert_eq!(call_b.caller_fingerprint, "alice");
|
|
assert_eq!(call_b.callee_fingerprint, "bob");
|
|
assert_eq!(call_b.caller_reflexive_addr.as_deref(), Some(ALICE_ADDR));
|
|
assert_eq!(call_b.peer_relay_fp.as_deref(), Some(RELAY_A_TLS_FP));
|
|
}
|
|
|
|
#[test]
|
|
fn cross_relay_answer_crosswires_peer_direct_addrs() {
|
|
let mut reg_a = CallRegistry::new();
|
|
let mut reg_b = CallRegistry::new();
|
|
|
|
// Full round trip: offer → forward → dispatch → answer →
|
|
// forward back → dispatch → both CallSetups.
|
|
let offer = alice_offer("c-xrelay-2");
|
|
let offer_forward = relay_a_handle_offer(&mut reg_a, &offer);
|
|
relay_b_handle_forwarded_offer(&mut reg_b, &offer_forward);
|
|
|
|
// Bob answers on Relay B.
|
|
let answer = bob_answer("c-xrelay-2");
|
|
let (answer_forward, setup_for_bob) =
|
|
relay_b_handle_local_answer(&mut reg_b, &answer);
|
|
|
|
// Bob's CallSetup carries Alice's addr.
|
|
match setup_for_bob {
|
|
SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => {
|
|
assert_eq!(peer_direct_addr.as_deref(), Some(ALICE_ADDR));
|
|
assert_eq!(relay_addr, RELAY_B_ADDR);
|
|
}
|
|
_ => panic!("wrong variant"),
|
|
}
|
|
|
|
// Alice's dispatcher receives the forwarded answer and builds
|
|
// her CallSetup.
|
|
let setup_for_alice = relay_a_handle_forwarded_answer(&mut reg_a, &answer_forward);
|
|
match setup_for_alice {
|
|
SignalMessage::CallSetup { peer_direct_addr, relay_addr, .. } => {
|
|
assert_eq!(peer_direct_addr.as_deref(), Some(BOB_ADDR));
|
|
assert_eq!(relay_addr, RELAY_A_ADDR);
|
|
}
|
|
_ => panic!("wrong variant"),
|
|
}
|
|
|
|
// Both registries agree on caller + callee reflex addrs after
|
|
// the full round-trip.
|
|
for reg in [®_a, ®_b] {
|
|
let c = reg.get("c-xrelay-2").unwrap();
|
|
assert_eq!(c.caller_reflexive_addr.as_deref(), Some(ALICE_ADDR));
|
|
assert_eq!(c.callee_reflexive_addr.as_deref(), Some(BOB_ADDR));
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn cross_relay_loop_prevention_drops_self_sourced_forward() {
|
|
// A FederatedSignalForward that circles back to the origin
|
|
// relay should be dropped before it hits the call registry.
|
|
let forward = SignalMessage::FederatedSignalForward {
|
|
inner: Box::new(alice_offer("c-loop")),
|
|
origin_relay_fp: RELAY_B_TLS_FP.into(),
|
|
};
|
|
// The dispatcher in main.rs calls this explicit check before
|
|
// doing any work. Reproduce it inline.
|
|
let origin = match &forward {
|
|
SignalMessage::FederatedSignalForward { origin_relay_fp, .. } => origin_relay_fp.clone(),
|
|
_ => unreachable!(),
|
|
};
|
|
// Relay B sees origin == its own fp → drop.
|
|
assert_eq!(origin, RELAY_B_TLS_FP, "loop-prevention triggers on self-fp");
|
|
}
|