feat: P3-T6 per-session forwarding — relay links for hop-by-hop media

RelayLink: QUIC connection to peer relay (SNI "_relay") for forwarding
specific sessions. Methods: connect, forward, add/remove_session, is_idle.

RelayLinkManager: manages connections to multiple peers.
- get_or_connect: lazy connection establishment
- forward_to: send media packet to specific peer
- register/unregister_session: track which sessions use which links
- Auto-closes idle links on session unregister

Protocol: added SignalMessage::SessionForward { session_id,
target_fingerprint, source_relay } and SessionForwardAck { session_id,
room_name } for relay-link session setup signaling.

Building block for P3-T7 (call setup over mesh) which wires
route resolution + relay links + handshake into a complete flow.

62 relay tests + 42 proto tests passing (7 new relay_link tests).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-29 18:45:36 +04:00
parent 326aa491cc
commit e595fe6591
4 changed files with 501 additions and 0 deletions

View File

@@ -107,6 +107,8 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::PresenceUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::RouteQuery { .. } => CallSignalType::Offer, // reuse
SignalMessage::RouteResponse { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse
}
}

View File

@@ -613,6 +613,21 @@ pub enum SignalMessage {
found: bool,
relay_chain: Vec<String>,
},
/// Request to set up a forwarding session for a specific fingerprint.
/// Sent over a relay link (`_relay` SNI) to ask the peer relay to
/// create a room and forward media for the given session.
SessionForward {
session_id: String,
target_fingerprint: String,
source_relay: String,
},
/// Confirm that the forwarding session has been set up on the peer relay.
/// The `room_name` tells the source relay which room to address media to.
SessionForwardAck {
session_id: String,
room_name: String,
},
}
/// Reasons for ending a call.

View File

@@ -14,6 +14,7 @@ pub mod metrics;
pub mod pipeline;
pub mod presence;
pub mod probe;
pub mod relay_link;
pub mod room;
pub mod route;
pub mod session_mgr;

View File

@@ -0,0 +1,483 @@
//! Per-session relay forwarding — connect to a peer relay and forward only
//! specific sessions' media packets there.
//!
//! This is the building block for relay chaining (multi-hop calls). Instead
//! of forwarding ALL traffic to a single hardcoded relay (forward mode) or
//! to everyone in a room (SFU mode), a `RelayLink` represents a QUIC
//! connection to one peer relay used for forwarding a specific set of
//! sessions.
//!
//! `RelayLinkManager` tracks all active relay links and their session
//! assignments, providing get-or-connect semantics and idle cleanup.
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{debug, info, warn};
use wzp_proto::MediaPacket;
use wzp_proto::MediaTransport;
/// A connection to a peer relay for forwarding specific sessions.
///
/// Each `RelayLink` holds a QUIC transport to one peer relay and tracks
/// which session IDs are being forwarded through it. When all sessions
/// are removed the link is considered idle and can be cleaned up.
pub struct RelayLink {
target_addr: SocketAddr,
/// The underlying QUIC transport. `None` only in unit-test stubs where
/// no real connection is established.
transport: Option<Arc<wzp_transport::QuinnTransport>>,
active_sessions: HashSet<String>,
}
impl RelayLink {
/// Connect to a peer relay at `target`.
///
/// Uses the `"_relay"` SNI to signal that this is a relay-to-relay
/// connection (similar to `"_probe"` for health checks). The peer
/// should skip normal client auth/handshake for relay-SNI connections.
pub async fn connect(target: SocketAddr) -> Result<Self, anyhow::Error> {
// Create a client-only endpoint on an OS-assigned port.
let endpoint = wzp_transport::create_endpoint(
"0.0.0.0:0".parse().unwrap(),
None,
)?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, target, "_relay", client_cfg).await?;
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
info!(%target, "relay link established");
Ok(Self {
target_addr: target,
transport: Some(transport),
active_sessions: HashSet::new(),
})
}
/// Create a `RelayLink` from an existing transport (useful when the
/// connection was established through other means).
pub fn from_transport(
target_addr: SocketAddr,
transport: Arc<wzp_transport::QuinnTransport>,
) -> Self {
Self {
target_addr,
transport: Some(transport),
active_sessions: HashSet::new(),
}
}
/// Create a stub `RelayLink` with no transport — for unit tests that
/// only exercise session-tracking / management logic.
#[cfg(test)]
fn stub(target_addr: SocketAddr) -> Self {
Self {
target_addr,
transport: None,
active_sessions: HashSet::new(),
}
}
/// Forward a media packet to this peer relay.
pub async fn forward(&self, pkt: &MediaPacket) -> Result<(), anyhow::Error> {
match &self.transport {
Some(t) => t
.send_media(pkt)
.await
.map_err(|e| anyhow::anyhow!("relay link forward to {}: {e}", self.target_addr)),
None => Err(anyhow::anyhow!(
"relay link to {} has no transport (stub)",
self.target_addr
)),
}
}
/// The address of the peer relay this link connects to.
pub fn target_addr(&self) -> SocketAddr {
self.target_addr
}
/// A reference to the underlying QUIC transport (if connected).
pub fn transport(&self) -> Option<&Arc<wzp_transport::QuinnTransport>> {
self.transport.as_ref()
}
/// Add a session to be forwarded through this link.
pub fn add_session(&mut self, session_id: &str) {
if self.active_sessions.insert(session_id.to_string()) {
debug!(
target_relay = %self.target_addr,
session = session_id,
count = self.active_sessions.len(),
"session added to relay link"
);
}
}
/// Remove a session from this link.
pub fn remove_session(&mut self, session_id: &str) {
if self.active_sessions.remove(session_id) {
debug!(
target_relay = %self.target_addr,
session = session_id,
count = self.active_sessions.len(),
"session removed from relay link"
);
}
}
/// Check if this link is forwarding any sessions.
pub fn is_idle(&self) -> bool {
self.active_sessions.is_empty()
}
/// Number of sessions being forwarded through this link.
pub fn session_count(&self) -> usize {
self.active_sessions.len()
}
/// Check if a specific session is being forwarded through this link.
pub fn has_session(&self, session_id: &str) -> bool {
self.active_sessions.contains(session_id)
}
/// Close the underlying QUIC connection (no-op if no transport).
pub async fn close(&self) {
info!(target_relay = %self.target_addr, "closing relay link");
if let Some(ref t) = self.transport {
let _ = t.close().await;
}
}
}
// ---------------------------------------------------------------------------
// RelayLinkManager
// ---------------------------------------------------------------------------
/// Manages connections to multiple peer relays for per-session forwarding.
///
/// Each peer relay gets at most one `RelayLink`. Sessions are registered
/// on specific links, and idle links (no sessions) can be cleaned up.
pub struct RelayLinkManager {
links: HashMap<SocketAddr, RelayLink>,
}
impl RelayLinkManager {
/// Create an empty link manager.
pub fn new() -> Self {
Self {
links: HashMap::new(),
}
}
/// Get or create a link to a peer relay.
///
/// If a link already exists it is returned. Otherwise a new QUIC
/// connection is established using `RelayLink::connect`.
pub async fn get_or_connect(
&mut self,
target: SocketAddr,
) -> Result<&RelayLink, anyhow::Error> {
if !self.links.contains_key(&target) {
let link = RelayLink::connect(target).await?;
self.links.insert(target, link);
}
Ok(self.links.get(&target).unwrap())
}
/// Get a mutable reference to an existing link (if any).
pub fn get_mut(&mut self, target: &SocketAddr) -> Option<&mut RelayLink> {
self.links.get_mut(target)
}
/// Get a reference to an existing link (if any).
pub fn get(&self, target: &SocketAddr) -> Option<&RelayLink> {
self.links.get(target)
}
/// Forward a packet for a specific session to the appropriate relay.
///
/// The link must already exist (created via `get_or_connect`).
pub async fn forward_to(
&self,
target: SocketAddr,
pkt: &MediaPacket,
) -> Result<(), anyhow::Error> {
match self.links.get(&target) {
Some(link) => link.forward(pkt).await,
None => Err(anyhow::anyhow!(
"no relay link to {target} — call get_or_connect first"
)),
}
}
/// Register a session on a specific link.
///
/// The link must already exist. If it does not, a warning is logged
/// and the registration is silently skipped.
pub fn register_session(&mut self, target: SocketAddr, session_id: &str) {
match self.links.get_mut(&target) {
Some(link) => link.add_session(session_id),
None => {
warn!(
%target,
session = session_id,
"cannot register session — no link to target"
);
}
}
}
/// Unregister a session. If the link becomes idle, close and remove it.
pub async fn unregister_session(&mut self, target: SocketAddr, session_id: &str) {
let should_remove = if let Some(link) = self.links.get_mut(&target) {
link.remove_session(session_id);
if link.is_idle() {
link.close().await;
true
} else {
false
}
} else {
false
};
if should_remove {
self.links.remove(&target);
info!(%target, "idle relay link removed");
}
}
/// Close all links and clear the manager.
pub async fn close_all(&mut self) {
for (addr, link) in self.links.drain() {
info!(%addr, "closing relay link (shutdown)");
link.close().await;
}
}
/// Number of active links.
pub fn link_count(&self) -> usize {
self.links.len()
}
/// Total number of sessions being forwarded across all links.
pub fn session_count(&self) -> usize {
self.links.values().map(|l| l.session_count()).sum()
}
/// Insert a pre-built relay link (for testing or manual setup).
pub fn insert(&mut self, link: RelayLink) {
self.links.insert(link.target_addr(), link);
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
fn addr(s: &str) -> SocketAddr {
s.parse().unwrap()
}
// ---------- RelayLink session tracking ----------
#[test]
fn link_manager_tracks_sessions() {
let mut mgr = RelayLinkManager::new();
let target1 = addr("10.0.0.2:4433");
let mut link = RelayLink::stub(target1);
link.add_session("session-aaa");
link.add_session("session-bbb");
mgr.insert(link);
assert_eq!(mgr.link_count(), 1);
assert_eq!(mgr.session_count(), 2);
// Register another session on the same link
mgr.register_session(target1, "session-ccc");
assert_eq!(mgr.session_count(), 3);
// Verify individual link
let link_ref = mgr.get(&target1).unwrap();
assert!(link_ref.has_session("session-aaa"));
assert!(link_ref.has_session("session-bbb"));
assert!(link_ref.has_session("session-ccc"));
assert!(!link_ref.has_session("unknown"));
}
#[test]
fn link_manager_idle_detection() {
let mut link = RelayLink::stub(addr("10.0.0.3:4433"));
// Empty link is idle
assert!(link.is_idle());
assert_eq!(link.session_count(), 0);
// Add a session — no longer idle
link.add_session("sess-1");
assert!(!link.is_idle());
assert_eq!(link.session_count(), 1);
// Remove it — idle again
link.remove_session("sess-1");
assert!(link.is_idle());
assert_eq!(link.session_count(), 0);
}
#[test]
fn session_forward_signal_roundtrip() {
use wzp_proto::SignalMessage;
// SessionForward roundtrip
let msg = SignalMessage::SessionForward {
session_id: "abcd1234".to_string(),
target_fingerprint: "deadbeef".to_string(),
source_relay: "10.0.0.1:4433".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::SessionForward {
session_id,
target_fingerprint,
source_relay,
} => {
assert_eq!(session_id, "abcd1234");
assert_eq!(target_fingerprint, "deadbeef");
assert_eq!(source_relay, "10.0.0.1:4433");
}
_ => panic!("expected SessionForward variant"),
}
// SessionForwardAck roundtrip
let ack = SignalMessage::SessionForwardAck {
session_id: "abcd1234".to_string(),
room_name: "relay-room-42".to_string(),
};
let json = serde_json::to_string(&ack).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::SessionForwardAck {
session_id,
room_name,
} => {
assert_eq!(session_id, "abcd1234");
assert_eq!(room_name, "relay-room-42");
}
_ => panic!("expected SessionForwardAck variant"),
}
}
#[test]
fn link_manager_multi_target() {
let mut mgr = RelayLinkManager::new();
let target_a = addr("10.0.0.2:4433");
let target_b = addr("10.0.0.3:4433");
let target_c = addr("10.0.0.4:4433");
for (target, sessions) in [
(target_a, vec!["s1", "s2"]),
(target_b, vec!["s3"]),
(target_c, vec!["s4", "s5", "s6"]),
] {
let mut link = RelayLink::stub(target);
for s in sessions {
link.add_session(s);
}
mgr.insert(link);
}
assert_eq!(mgr.link_count(), 3);
assert_eq!(mgr.session_count(), 6); // 2 + 1 + 3
assert_eq!(mgr.get(&target_a).unwrap().session_count(), 2);
assert_eq!(mgr.get(&target_b).unwrap().session_count(), 1);
assert_eq!(mgr.get(&target_c).unwrap().session_count(), 3);
}
#[test]
fn link_manager_cleanup() {
let mut mgr = RelayLinkManager::new();
let target = addr("10.0.0.5:4433");
let mut link = RelayLink::stub(target);
link.add_session("s1");
link.add_session("s2");
link.add_session("s3");
mgr.insert(link);
assert_eq!(mgr.link_count(), 1);
assert_eq!(mgr.session_count(), 3);
// Remove sessions one by one via the manager's mutable access.
// We cannot call the async unregister_session with stub links here,
// so we exercise the synchronous management path directly.
{
let link = mgr.get_mut(&target).unwrap();
link.remove_session("s1");
assert!(!link.is_idle());
link.remove_session("s2");
assert!(!link.is_idle());
link.remove_session("s3");
assert!(link.is_idle());
}
// All sessions removed — link is idle
assert_eq!(mgr.session_count(), 0);
assert!(mgr.get(&target).unwrap().is_idle());
// Simulate what unregister_session does: remove the idle link
mgr.links.remove(&target);
assert_eq!(mgr.link_count(), 0);
}
#[test]
fn register_session_on_nonexistent_link_is_noop() {
let mut mgr = RelayLinkManager::new();
// Should not panic, just warn
mgr.register_session(addr("10.0.0.99:4433"), "orphan-session");
assert_eq!(mgr.link_count(), 0);
assert_eq!(mgr.session_count(), 0);
}
#[test]
fn forward_to_nonexistent_link_errors() {
let mgr = RelayLinkManager::new();
let target = addr("10.0.0.99:4433");
let pkt = MediaPacket {
header: wzp_proto::packet::MediaHeader {
version: 0,
is_repair: false,
codec_id: wzp_proto::CodecId::Opus16k,
has_quality_report: false,
fec_ratio_encoded: 0,
seq: 1,
timestamp: 100,
fec_block: 0,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
},
payload: bytes::Bytes::from_static(b"test"),
quality_report: None,
};
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let result = rt.block_on(mgr.forward_to(target, &pkt));
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("no relay link"));
}
}