From e595fe6591c34651c70c9e8b8d138df46aaf2261 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sun, 29 Mar 2026 18:45:36 +0400 Subject: [PATCH] =?UTF-8?q?feat:=20P3-T6=20per-session=20forwarding=20?= =?UTF-8?q?=E2=80=94=20relay=20links=20for=20hop-by-hop=20media?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RelayLink: QUIC connection to peer relay (SNI "_relay") for forwarding specific sessions. Methods: connect, forward, add/remove_session, is_idle. RelayLinkManager: manages connections to multiple peers. - get_or_connect: lazy connection establishment - forward_to: send media packet to specific peer - register/unregister_session: track which sessions use which links - Auto-closes idle links on session unregister Protocol: added SignalMessage::SessionForward { session_id, target_fingerprint, source_relay } and SessionForwardAck { session_id, room_name } for relay-link session setup signaling. Building block for P3-T7 (call setup over mesh) which wires route resolution + relay links + handshake into a complete flow. 62 relay tests + 42 proto tests passing (7 new relay_link tests). Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/wzp-client/src/featherchat.rs | 2 + crates/wzp-proto/src/packet.rs | 15 + crates/wzp-relay/src/lib.rs | 1 + crates/wzp-relay/src/relay_link.rs | 483 +++++++++++++++++++++++++++ 4 files changed, 501 insertions(+) create mode 100644 crates/wzp-relay/src/relay_link.rs diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index 7c6e7f6..e100df9 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -107,6 +107,8 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::PresenceUpdate { .. } => CallSignalType::Offer, // reuse SignalMessage::RouteQuery { .. } => CallSignalType::Offer, // reuse SignalMessage::RouteResponse { .. } => CallSignalType::Offer, // reuse + SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse + SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse } } diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index 2af389e..3a27376 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -613,6 +613,21 @@ pub enum SignalMessage { found: bool, relay_chain: Vec, }, + + /// Request to set up a forwarding session for a specific fingerprint. + /// Sent over a relay link (`_relay` SNI) to ask the peer relay to + /// create a room and forward media for the given session. + SessionForward { + session_id: String, + target_fingerprint: String, + source_relay: String, + }, + /// Confirm that the forwarding session has been set up on the peer relay. + /// The `room_name` tells the source relay which room to address media to. + SessionForwardAck { + session_id: String, + room_name: String, + }, } /// Reasons for ending a call. diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index e8782ed..1416cc8 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -14,6 +14,7 @@ pub mod metrics; pub mod pipeline; pub mod presence; pub mod probe; +pub mod relay_link; pub mod room; pub mod route; pub mod session_mgr; diff --git a/crates/wzp-relay/src/relay_link.rs b/crates/wzp-relay/src/relay_link.rs new file mode 100644 index 0000000..3b55f19 --- /dev/null +++ b/crates/wzp-relay/src/relay_link.rs @@ -0,0 +1,483 @@ +//! Per-session relay forwarding — connect to a peer relay and forward only +//! specific sessions' media packets there. +//! +//! This is the building block for relay chaining (multi-hop calls). Instead +//! of forwarding ALL traffic to a single hardcoded relay (forward mode) or +//! to everyone in a room (SFU mode), a `RelayLink` represents a QUIC +//! connection to one peer relay used for forwarding a specific set of +//! sessions. +//! +//! `RelayLinkManager` tracks all active relay links and their session +//! assignments, providing get-or-connect semantics and idle cleanup. + +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::sync::Arc; + +use tracing::{debug, info, warn}; + +use wzp_proto::MediaPacket; +use wzp_proto::MediaTransport; + +/// A connection to a peer relay for forwarding specific sessions. +/// +/// Each `RelayLink` holds a QUIC transport to one peer relay and tracks +/// which session IDs are being forwarded through it. When all sessions +/// are removed the link is considered idle and can be cleaned up. +pub struct RelayLink { + target_addr: SocketAddr, + /// The underlying QUIC transport. `None` only in unit-test stubs where + /// no real connection is established. + transport: Option>, + active_sessions: HashSet, +} + +impl RelayLink { + /// Connect to a peer relay at `target`. + /// + /// Uses the `"_relay"` SNI to signal that this is a relay-to-relay + /// connection (similar to `"_probe"` for health checks). The peer + /// should skip normal client auth/handshake for relay-SNI connections. + pub async fn connect(target: SocketAddr) -> Result { + // Create a client-only endpoint on an OS-assigned port. + let endpoint = wzp_transport::create_endpoint( + "0.0.0.0:0".parse().unwrap(), + None, + )?; + + let client_cfg = wzp_transport::client_config(); + let conn = wzp_transport::connect(&endpoint, target, "_relay", client_cfg).await?; + let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); + + info!(%target, "relay link established"); + + Ok(Self { + target_addr: target, + transport: Some(transport), + active_sessions: HashSet::new(), + }) + } + + /// Create a `RelayLink` from an existing transport (useful when the + /// connection was established through other means). + pub fn from_transport( + target_addr: SocketAddr, + transport: Arc, + ) -> Self { + Self { + target_addr, + transport: Some(transport), + active_sessions: HashSet::new(), + } + } + + /// Create a stub `RelayLink` with no transport — for unit tests that + /// only exercise session-tracking / management logic. + #[cfg(test)] + fn stub(target_addr: SocketAddr) -> Self { + Self { + target_addr, + transport: None, + active_sessions: HashSet::new(), + } + } + + /// Forward a media packet to this peer relay. + pub async fn forward(&self, pkt: &MediaPacket) -> Result<(), anyhow::Error> { + match &self.transport { + Some(t) => t + .send_media(pkt) + .await + .map_err(|e| anyhow::anyhow!("relay link forward to {}: {e}", self.target_addr)), + None => Err(anyhow::anyhow!( + "relay link to {} has no transport (stub)", + self.target_addr + )), + } + } + + /// The address of the peer relay this link connects to. + pub fn target_addr(&self) -> SocketAddr { + self.target_addr + } + + /// A reference to the underlying QUIC transport (if connected). + pub fn transport(&self) -> Option<&Arc> { + self.transport.as_ref() + } + + /// Add a session to be forwarded through this link. + pub fn add_session(&mut self, session_id: &str) { + if self.active_sessions.insert(session_id.to_string()) { + debug!( + target_relay = %self.target_addr, + session = session_id, + count = self.active_sessions.len(), + "session added to relay link" + ); + } + } + + /// Remove a session from this link. + pub fn remove_session(&mut self, session_id: &str) { + if self.active_sessions.remove(session_id) { + debug!( + target_relay = %self.target_addr, + session = session_id, + count = self.active_sessions.len(), + "session removed from relay link" + ); + } + } + + /// Check if this link is forwarding any sessions. + pub fn is_idle(&self) -> bool { + self.active_sessions.is_empty() + } + + /// Number of sessions being forwarded through this link. + pub fn session_count(&self) -> usize { + self.active_sessions.len() + } + + /// Check if a specific session is being forwarded through this link. + pub fn has_session(&self, session_id: &str) -> bool { + self.active_sessions.contains(session_id) + } + + /// Close the underlying QUIC connection (no-op if no transport). + pub async fn close(&self) { + info!(target_relay = %self.target_addr, "closing relay link"); + if let Some(ref t) = self.transport { + let _ = t.close().await; + } + } +} + +// --------------------------------------------------------------------------- +// RelayLinkManager +// --------------------------------------------------------------------------- + +/// Manages connections to multiple peer relays for per-session forwarding. +/// +/// Each peer relay gets at most one `RelayLink`. Sessions are registered +/// on specific links, and idle links (no sessions) can be cleaned up. +pub struct RelayLinkManager { + links: HashMap, +} + +impl RelayLinkManager { + /// Create an empty link manager. + pub fn new() -> Self { + Self { + links: HashMap::new(), + } + } + + /// Get or create a link to a peer relay. + /// + /// If a link already exists it is returned. Otherwise a new QUIC + /// connection is established using `RelayLink::connect`. + pub async fn get_or_connect( + &mut self, + target: SocketAddr, + ) -> Result<&RelayLink, anyhow::Error> { + if !self.links.contains_key(&target) { + let link = RelayLink::connect(target).await?; + self.links.insert(target, link); + } + Ok(self.links.get(&target).unwrap()) + } + + /// Get a mutable reference to an existing link (if any). + pub fn get_mut(&mut self, target: &SocketAddr) -> Option<&mut RelayLink> { + self.links.get_mut(target) + } + + /// Get a reference to an existing link (if any). + pub fn get(&self, target: &SocketAddr) -> Option<&RelayLink> { + self.links.get(target) + } + + /// Forward a packet for a specific session to the appropriate relay. + /// + /// The link must already exist (created via `get_or_connect`). + pub async fn forward_to( + &self, + target: SocketAddr, + pkt: &MediaPacket, + ) -> Result<(), anyhow::Error> { + match self.links.get(&target) { + Some(link) => link.forward(pkt).await, + None => Err(anyhow::anyhow!( + "no relay link to {target} — call get_or_connect first" + )), + } + } + + /// Register a session on a specific link. + /// + /// The link must already exist. If it does not, a warning is logged + /// and the registration is silently skipped. + pub fn register_session(&mut self, target: SocketAddr, session_id: &str) { + match self.links.get_mut(&target) { + Some(link) => link.add_session(session_id), + None => { + warn!( + %target, + session = session_id, + "cannot register session — no link to target" + ); + } + } + } + + /// Unregister a session. If the link becomes idle, close and remove it. + pub async fn unregister_session(&mut self, target: SocketAddr, session_id: &str) { + let should_remove = if let Some(link) = self.links.get_mut(&target) { + link.remove_session(session_id); + if link.is_idle() { + link.close().await; + true + } else { + false + } + } else { + false + }; + + if should_remove { + self.links.remove(&target); + info!(%target, "idle relay link removed"); + } + } + + /// Close all links and clear the manager. + pub async fn close_all(&mut self) { + for (addr, link) in self.links.drain() { + info!(%addr, "closing relay link (shutdown)"); + link.close().await; + } + } + + /// Number of active links. + pub fn link_count(&self) -> usize { + self.links.len() + } + + /// Total number of sessions being forwarded across all links. + pub fn session_count(&self) -> usize { + self.links.values().map(|l| l.session_count()).sum() + } + + /// Insert a pre-built relay link (for testing or manual setup). + pub fn insert(&mut self, link: RelayLink) { + self.links.insert(link.target_addr(), link); + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(s: &str) -> SocketAddr { + s.parse().unwrap() + } + + // ---------- RelayLink session tracking ---------- + + #[test] + fn link_manager_tracks_sessions() { + let mut mgr = RelayLinkManager::new(); + let target1 = addr("10.0.0.2:4433"); + + let mut link = RelayLink::stub(target1); + link.add_session("session-aaa"); + link.add_session("session-bbb"); + mgr.insert(link); + + assert_eq!(mgr.link_count(), 1); + assert_eq!(mgr.session_count(), 2); + + // Register another session on the same link + mgr.register_session(target1, "session-ccc"); + assert_eq!(mgr.session_count(), 3); + + // Verify individual link + let link_ref = mgr.get(&target1).unwrap(); + assert!(link_ref.has_session("session-aaa")); + assert!(link_ref.has_session("session-bbb")); + assert!(link_ref.has_session("session-ccc")); + assert!(!link_ref.has_session("unknown")); + } + + #[test] + fn link_manager_idle_detection() { + let mut link = RelayLink::stub(addr("10.0.0.3:4433")); + + // Empty link is idle + assert!(link.is_idle()); + assert_eq!(link.session_count(), 0); + + // Add a session — no longer idle + link.add_session("sess-1"); + assert!(!link.is_idle()); + assert_eq!(link.session_count(), 1); + + // Remove it — idle again + link.remove_session("sess-1"); + assert!(link.is_idle()); + assert_eq!(link.session_count(), 0); + } + + #[test] + fn session_forward_signal_roundtrip() { + use wzp_proto::SignalMessage; + + // SessionForward roundtrip + let msg = SignalMessage::SessionForward { + session_id: "abcd1234".to_string(), + target_fingerprint: "deadbeef".to_string(), + source_relay: "10.0.0.1:4433".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let decoded: SignalMessage = serde_json::from_str(&json).unwrap(); + match decoded { + SignalMessage::SessionForward { + session_id, + target_fingerprint, + source_relay, + } => { + assert_eq!(session_id, "abcd1234"); + assert_eq!(target_fingerprint, "deadbeef"); + assert_eq!(source_relay, "10.0.0.1:4433"); + } + _ => panic!("expected SessionForward variant"), + } + + // SessionForwardAck roundtrip + let ack = SignalMessage::SessionForwardAck { + session_id: "abcd1234".to_string(), + room_name: "relay-room-42".to_string(), + }; + let json = serde_json::to_string(&ack).unwrap(); + let decoded: SignalMessage = serde_json::from_str(&json).unwrap(); + match decoded { + SignalMessage::SessionForwardAck { + session_id, + room_name, + } => { + assert_eq!(session_id, "abcd1234"); + assert_eq!(room_name, "relay-room-42"); + } + _ => panic!("expected SessionForwardAck variant"), + } + } + + #[test] + fn link_manager_multi_target() { + let mut mgr = RelayLinkManager::new(); + let target_a = addr("10.0.0.2:4433"); + let target_b = addr("10.0.0.3:4433"); + let target_c = addr("10.0.0.4:4433"); + + for (target, sessions) in [ + (target_a, vec!["s1", "s2"]), + (target_b, vec!["s3"]), + (target_c, vec!["s4", "s5", "s6"]), + ] { + let mut link = RelayLink::stub(target); + for s in sessions { + link.add_session(s); + } + mgr.insert(link); + } + + assert_eq!(mgr.link_count(), 3); + assert_eq!(mgr.session_count(), 6); // 2 + 1 + 3 + + assert_eq!(mgr.get(&target_a).unwrap().session_count(), 2); + assert_eq!(mgr.get(&target_b).unwrap().session_count(), 1); + assert_eq!(mgr.get(&target_c).unwrap().session_count(), 3); + } + + #[test] + fn link_manager_cleanup() { + let mut mgr = RelayLinkManager::new(); + let target = addr("10.0.0.5:4433"); + + let mut link = RelayLink::stub(target); + link.add_session("s1"); + link.add_session("s2"); + link.add_session("s3"); + mgr.insert(link); + + assert_eq!(mgr.link_count(), 1); + assert_eq!(mgr.session_count(), 3); + + // Remove sessions one by one via the manager's mutable access. + // We cannot call the async unregister_session with stub links here, + // so we exercise the synchronous management path directly. + { + let link = mgr.get_mut(&target).unwrap(); + link.remove_session("s1"); + assert!(!link.is_idle()); + link.remove_session("s2"); + assert!(!link.is_idle()); + link.remove_session("s3"); + assert!(link.is_idle()); + } + + // All sessions removed — link is idle + assert_eq!(mgr.session_count(), 0); + assert!(mgr.get(&target).unwrap().is_idle()); + + // Simulate what unregister_session does: remove the idle link + mgr.links.remove(&target); + assert_eq!(mgr.link_count(), 0); + } + + #[test] + fn register_session_on_nonexistent_link_is_noop() { + let mut mgr = RelayLinkManager::new(); + // Should not panic, just warn + mgr.register_session(addr("10.0.0.99:4433"), "orphan-session"); + assert_eq!(mgr.link_count(), 0); + assert_eq!(mgr.session_count(), 0); + } + + #[test] + fn forward_to_nonexistent_link_errors() { + let mgr = RelayLinkManager::new(); + let target = addr("10.0.0.99:4433"); + + let pkt = MediaPacket { + header: wzp_proto::packet::MediaHeader { + version: 0, + is_repair: false, + codec_id: wzp_proto::CodecId::Opus16k, + has_quality_report: false, + fec_ratio_encoded: 0, + seq: 1, + timestamp: 100, + fec_block: 0, + fec_symbol: 0, + reserved: 0, + csrc_count: 0, + }, + payload: bytes::Bytes::from_static(b"test"), + quality_report: None, + }; + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let result = rt.block_on(mgr.forward_to(target, &pkt)); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("no relay link")); + } +}