feat: P3-T5 route resolution — find relay path to any fingerprint
RouteResolver queries PresenceRegistry to determine how to reach a target:
- Route::Local — connected to this relay
- Route::DirectPeer(addr) — on a directly connected peer relay
- Route::Chain(addrs) — multi-hop (structure ready, single-hop for now)
- Route::NotFound — not in any known relay
Protocol: added SignalMessage::RouteQuery { fingerprint, ttl } and
RouteResponse { fingerprint, found, relay_chain } for peer-to-peer
route queries over probe connections.
HTTP API: GET /route/:fingerprint returns JSON with route type + chain.
Relay handles incoming RouteQuery on probe connections: looks up locally,
replies with RouteResponse. TTL decremented for future multi-hop forwarding.
55 relay tests + 42 proto tests passing (7 new route tests).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -105,6 +105,8 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
|
|||||||
SignalMessage::Transfer { .. } => CallSignalType::Transfer,
|
SignalMessage::Transfer { .. } => CallSignalType::Transfer,
|
||||||
SignalMessage::TransferAck => CallSignalType::Offer, // reuse
|
SignalMessage::TransferAck => CallSignalType::Offer, // reuse
|
||||||
SignalMessage::PresenceUpdate { .. } => CallSignalType::Offer, // reuse
|
SignalMessage::PresenceUpdate { .. } => CallSignalType::Offer, // reuse
|
||||||
|
SignalMessage::RouteQuery { .. } => CallSignalType::Offer, // reuse
|
||||||
|
SignalMessage::RouteResponse { .. } => CallSignalType::Offer, // reuse
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -601,6 +601,18 @@ pub enum SignalMessage {
|
|||||||
/// Address of the sending relay (e.g., "192.168.1.10:4433").
|
/// Address of the sending relay (e.g., "192.168.1.10:4433").
|
||||||
relay_addr: String,
|
relay_addr: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Ask a peer relay to look up a fingerprint in its registry.
|
||||||
|
RouteQuery {
|
||||||
|
fingerprint: String,
|
||||||
|
ttl: u8,
|
||||||
|
},
|
||||||
|
/// Response to a route query.
|
||||||
|
RouteResponse {
|
||||||
|
fingerprint: String,
|
||||||
|
found: bool,
|
||||||
|
relay_chain: Vec<String>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reasons for ending a call.
|
/// Reasons for ending a call.
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ pub mod pipeline;
|
|||||||
pub mod presence;
|
pub mod presence;
|
||||||
pub mod probe;
|
pub mod probe;
|
||||||
pub mod room;
|
pub mod room;
|
||||||
|
pub mod route;
|
||||||
pub mod session_mgr;
|
pub mod session_mgr;
|
||||||
pub mod trunk;
|
pub mod trunk;
|
||||||
|
|
||||||
|
|||||||
@@ -180,12 +180,16 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// Presence registry
|
// Presence registry
|
||||||
let presence = Arc::new(Mutex::new(PresenceRegistry::new()));
|
let presence = Arc::new(Mutex::new(PresenceRegistry::new()));
|
||||||
|
|
||||||
|
// Route resolver
|
||||||
|
let route_resolver = Arc::new(wzp_relay::route::RouteResolver::new(config.listen_addr));
|
||||||
|
|
||||||
// Prometheus metrics
|
// Prometheus metrics
|
||||||
let metrics = Arc::new(RelayMetrics::new());
|
let metrics = Arc::new(RelayMetrics::new());
|
||||||
if let Some(port) = config.metrics_port {
|
if let Some(port) = config.metrics_port {
|
||||||
let m = metrics.clone();
|
let m = metrics.clone();
|
||||||
let p = Some(presence.clone());
|
let p = Some(presence.clone());
|
||||||
tokio::spawn(wzp_relay::metrics::serve_metrics(port, m, p));
|
let rr = Some(route_resolver.clone());
|
||||||
|
tokio::spawn(wzp_relay::metrics::serve_metrics(port, m, p, rr));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate ephemeral relay identity for crypto handshake
|
// Generate ephemeral relay identity for crypto handshake
|
||||||
@@ -251,6 +255,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let metrics = metrics.clone();
|
let metrics = metrics.clone();
|
||||||
let trunking_enabled = config.trunking_enabled;
|
let trunking_enabled = config.trunking_enabled;
|
||||||
let presence = presence.clone();
|
let presence = presence.clone();
|
||||||
|
let route_resolver = route_resolver.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let addr = connection.remote_address();
|
let addr = connection.remote_address();
|
||||||
@@ -301,6 +306,39 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(Some(wzp_proto::SignalMessage::RouteQuery { fingerprint, ttl })) => {
|
||||||
|
// Look up the fingerprint in our local registry
|
||||||
|
let reg = presence.lock().await;
|
||||||
|
let route = route_resolver.resolve(®, &fingerprint);
|
||||||
|
drop(reg);
|
||||||
|
|
||||||
|
let (found, relay_chain) = match route {
|
||||||
|
wzp_relay::route::Route::Local => {
|
||||||
|
(true, vec![route_resolver.local_addr().to_string()])
|
||||||
|
}
|
||||||
|
wzp_relay::route::Route::DirectPeer(peer_addr) => {
|
||||||
|
(true, vec![route_resolver.local_addr().to_string(), peer_addr.to_string()])
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// Not found locally; if ttl > 0 we could forward
|
||||||
|
// to other peers (future multi-hop). For now, reply not found.
|
||||||
|
if ttl > 0 {
|
||||||
|
// TODO: forward RouteQuery to other peers with ttl-1
|
||||||
|
}
|
||||||
|
(false, vec![])
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let reply = wzp_proto::SignalMessage::RouteResponse {
|
||||||
|
fingerprint,
|
||||||
|
found,
|
||||||
|
relay_chain,
|
||||||
|
};
|
||||||
|
if let Err(e) = transport.send_signal(&reply).await {
|
||||||
|
error!(%addr, "route response send error: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Some(_)) => {
|
Ok(Some(_)) => {
|
||||||
// Ignore other signals on probe connections
|
// Ignore other signals on probe connections
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -201,18 +201,20 @@ impl RelayMetrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start an HTTP server serving GET /metrics, GET /mesh, and presence endpoints on the given port.
|
/// Start an HTTP server serving GET /metrics, GET /mesh, presence, and route endpoints on the given port.
|
||||||
pub async fn serve_metrics(
|
pub async fn serve_metrics(
|
||||||
port: u16,
|
port: u16,
|
||||||
metrics: Arc<RelayMetrics>,
|
metrics: Arc<RelayMetrics>,
|
||||||
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
|
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
|
||||||
|
route_resolver: Option<Arc<crate::route::RouteResolver>>,
|
||||||
) {
|
) {
|
||||||
use axum::{extract::Path, routing::get, Router};
|
use axum::{extract::Path, routing::get, Router};
|
||||||
|
|
||||||
let metrics_clone = metrics.clone();
|
let metrics_clone = metrics.clone();
|
||||||
let presence_all = presence.clone();
|
let presence_all = presence.clone();
|
||||||
let presence_lookup = presence.clone();
|
let presence_lookup = presence.clone();
|
||||||
let presence_peers = presence;
|
let presence_peers = presence.clone();
|
||||||
|
let presence_route = presence;
|
||||||
|
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route(
|
.route(
|
||||||
@@ -288,6 +290,32 @@ pub async fn serve_metrics(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/route/:fingerprint",
|
||||||
|
get(move |Path(fingerprint): Path<String>| {
|
||||||
|
let reg = presence_route.clone();
|
||||||
|
let resolver = route_resolver.clone();
|
||||||
|
async move {
|
||||||
|
match (reg, resolver) {
|
||||||
|
(Some(r), Some(res)) => {
|
||||||
|
let r = r.lock().await;
|
||||||
|
let route = res.resolve(&r, &fingerprint);
|
||||||
|
let json = res.route_json(&fingerprint, &route);
|
||||||
|
serde_json::to_string_pretty(&json)
|
||||||
|
.unwrap_or_else(|_| "{}".to_string())
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
serde_json::json!({
|
||||||
|
"fingerprint": fingerprint,
|
||||||
|
"route": "not_found",
|
||||||
|
"relay_chain": [],
|
||||||
|
})
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
|
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
|
|||||||
265
crates/wzp-relay/src/route.rs
Normal file
265
crates/wzp-relay/src/route.rs
Normal file
@@ -0,0 +1,265 @@
|
|||||||
|
//! Route resolution — given a target fingerprint, find the relay chain
|
||||||
|
//! needed to reach that user.
|
||||||
|
//!
|
||||||
|
//! Uses the [`PresenceRegistry`] as its data source. Currently supports
|
||||||
|
//! single-hop resolution (local or direct peer). The `resolve_multi_hop`
|
||||||
|
//! method has the signature for future multi-hop expansion but falls back
|
||||||
|
//! to single-hop for now.
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use crate::presence::{PresenceLocation, PresenceRegistry};
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Route type
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// The resolved route to a target fingerprint.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
|
||||||
|
pub enum Route {
|
||||||
|
/// Target is connected to this relay directly.
|
||||||
|
Local,
|
||||||
|
/// Target is on a directly connected peer relay.
|
||||||
|
DirectPeer(SocketAddr),
|
||||||
|
/// Target is reachable via a chain of relays (multi-hop).
|
||||||
|
Chain(Vec<SocketAddr>),
|
||||||
|
/// Target not found in any known relay.
|
||||||
|
NotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Route {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Route::Local => write!(f, "local"),
|
||||||
|
Route::DirectPeer(addr) => write!(f, "direct_peer({})", addr),
|
||||||
|
Route::Chain(chain) => {
|
||||||
|
let addrs: Vec<String> = chain.iter().map(|a| a.to_string()).collect();
|
||||||
|
write!(f, "chain({})", addrs.join(" -> "))
|
||||||
|
}
|
||||||
|
Route::NotFound => write!(f, "not_found"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// RouteResolver
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// Resolves fingerprints to relay routes using the presence registry.
|
||||||
|
pub struct RouteResolver {
|
||||||
|
/// Our own relay address (how peers know us).
|
||||||
|
local_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RouteResolver {
|
||||||
|
/// Create a new route resolver for the relay at `local_addr`.
|
||||||
|
pub fn new(local_addr: SocketAddr) -> Self {
|
||||||
|
Self { local_addr }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Our local relay address.
|
||||||
|
pub fn local_addr(&self) -> SocketAddr {
|
||||||
|
self.local_addr
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Look up a fingerprint in the registry and return the route.
|
||||||
|
///
|
||||||
|
/// - If `registry.lookup()` returns `Local` -> `Route::Local`
|
||||||
|
/// - If returns `Remote(addr)` -> `Route::DirectPeer(addr)`
|
||||||
|
/// - If not found -> `Route::NotFound`
|
||||||
|
pub fn resolve(&self, registry: &PresenceRegistry, target_fingerprint: &str) -> Route {
|
||||||
|
match registry.lookup(target_fingerprint) {
|
||||||
|
Some(PresenceLocation::Local) => Route::Local,
|
||||||
|
Some(PresenceLocation::Remote(addr)) => Route::DirectPeer(addr),
|
||||||
|
None => Route::NotFound,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multi-hop route resolution (future expansion).
|
||||||
|
///
|
||||||
|
/// For now this is equivalent to `resolve()` — single-hop only.
|
||||||
|
/// When multi-hop is implemented, this will query peers transitively
|
||||||
|
/// up to `max_hops` relays deep, using `RouteQuery` / `RouteResponse`
|
||||||
|
/// signals over probe connections.
|
||||||
|
pub fn resolve_multi_hop(
|
||||||
|
&self,
|
||||||
|
registry: &PresenceRegistry,
|
||||||
|
target: &str,
|
||||||
|
_max_hops: usize,
|
||||||
|
) -> Route {
|
||||||
|
// Phase 1: single-hop only (same as resolve).
|
||||||
|
// Future: if resolve returns NotFound and max_hops > 0,
|
||||||
|
// send RouteQuery to each known peer with ttl = max_hops - 1,
|
||||||
|
// collect RouteResponse, and build a Chain.
|
||||||
|
self.resolve(registry, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a JSON-serializable route response for the HTTP API.
|
||||||
|
pub fn route_json(
|
||||||
|
&self,
|
||||||
|
fingerprint: &str,
|
||||||
|
route: &Route,
|
||||||
|
) -> serde_json::Value {
|
||||||
|
let (route_type, relay_chain) = match route {
|
||||||
|
Route::Local => ("local", vec![self.local_addr.to_string()]),
|
||||||
|
Route::DirectPeer(addr) => ("direct_peer", vec![self.local_addr.to_string(), addr.to_string()]),
|
||||||
|
Route::Chain(chain) => {
|
||||||
|
let mut addrs = vec![self.local_addr.to_string()];
|
||||||
|
addrs.extend(chain.iter().map(|a| a.to_string()));
|
||||||
|
("chain", addrs)
|
||||||
|
}
|
||||||
|
Route::NotFound => ("not_found", vec![]),
|
||||||
|
};
|
||||||
|
|
||||||
|
serde_json::json!({
|
||||||
|
"fingerprint": fingerprint,
|
||||||
|
"route": route_type,
|
||||||
|
"relay_chain": relay_chain,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
fn addr(s: &str) -> SocketAddr {
|
||||||
|
s.parse().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_resolver() -> RouteResolver {
|
||||||
|
RouteResolver::new(addr("10.0.0.1:4433"))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolve_local() {
|
||||||
|
let resolver = make_resolver();
|
||||||
|
let mut reg = PresenceRegistry::new();
|
||||||
|
reg.register_local("aabbccdd", Some("alice".into()), Some("room1".into()));
|
||||||
|
|
||||||
|
let route = resolver.resolve(®, "aabbccdd");
|
||||||
|
assert_eq!(route, Route::Local);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolve_direct_peer() {
|
||||||
|
let resolver = make_resolver();
|
||||||
|
let mut reg = PresenceRegistry::new();
|
||||||
|
let peer = addr("10.0.0.2:4433");
|
||||||
|
let mut fps = HashSet::new();
|
||||||
|
fps.insert("deadbeef".to_string());
|
||||||
|
reg.update_peer(peer, fps);
|
||||||
|
|
||||||
|
let route = resolver.resolve(®, "deadbeef");
|
||||||
|
assert_eq!(route, Route::DirectPeer(peer));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolve_not_found() {
|
||||||
|
let resolver = make_resolver();
|
||||||
|
let reg = PresenceRegistry::new();
|
||||||
|
|
||||||
|
let route = resolver.resolve(®, "unknown_fp");
|
||||||
|
assert_eq!(route, Route::NotFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolve_multi_hop_fallback() {
|
||||||
|
// multi-hop currently falls back to single-hop behavior
|
||||||
|
let resolver = make_resolver();
|
||||||
|
let mut reg = PresenceRegistry::new();
|
||||||
|
reg.register_local("local_fp", None, None);
|
||||||
|
|
||||||
|
let peer = addr("10.0.0.3:4433");
|
||||||
|
let mut fps = HashSet::new();
|
||||||
|
fps.insert("remote_fp".to_string());
|
||||||
|
reg.update_peer(peer, fps);
|
||||||
|
|
||||||
|
// Local lookup works via multi-hop
|
||||||
|
assert_eq!(resolver.resolve_multi_hop(®, "local_fp", 3), Route::Local);
|
||||||
|
// Remote lookup works via multi-hop
|
||||||
|
assert_eq!(
|
||||||
|
resolver.resolve_multi_hop(®, "remote_fp", 3),
|
||||||
|
Route::DirectPeer(peer)
|
||||||
|
);
|
||||||
|
// Not-found works via multi-hop
|
||||||
|
assert_eq!(
|
||||||
|
resolver.resolve_multi_hop(®, "nobody", 3),
|
||||||
|
Route::NotFound
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn route_query_signal_roundtrip() {
|
||||||
|
use wzp_proto::SignalMessage;
|
||||||
|
|
||||||
|
let query = SignalMessage::RouteQuery {
|
||||||
|
fingerprint: "aabbccdd".to_string(),
|
||||||
|
ttl: 3,
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&query).unwrap();
|
||||||
|
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
decoded,
|
||||||
|
SignalMessage::RouteQuery { ref fingerprint, ttl }
|
||||||
|
if fingerprint == "aabbccdd" && ttl == 3
|
||||||
|
));
|
||||||
|
|
||||||
|
let response = SignalMessage::RouteResponse {
|
||||||
|
fingerprint: "aabbccdd".to_string(),
|
||||||
|
found: true,
|
||||||
|
relay_chain: vec!["10.0.0.1:4433".to_string(), "10.0.0.2:4433".to_string()],
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&response).unwrap();
|
||||||
|
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
decoded,
|
||||||
|
SignalMessage::RouteResponse { ref fingerprint, found, ref relay_chain }
|
||||||
|
if fingerprint == "aabbccdd" && found && relay_chain.len() == 2
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn route_display() {
|
||||||
|
assert_eq!(Route::Local.to_string(), "local");
|
||||||
|
assert_eq!(
|
||||||
|
Route::DirectPeer(addr("10.0.0.2:4433")).to_string(),
|
||||||
|
"direct_peer(10.0.0.2:4433)"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Route::Chain(vec![addr("10.0.0.2:4433"), addr("10.0.0.3:4433")]).to_string(),
|
||||||
|
"chain(10.0.0.2:4433 -> 10.0.0.3:4433)"
|
||||||
|
);
|
||||||
|
assert_eq!(Route::NotFound.to_string(), "not_found");
|
||||||
|
|
||||||
|
// Debug is also useful
|
||||||
|
let debug = format!("{:?}", Route::Local);
|
||||||
|
assert!(debug.contains("Local"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn route_json_output() {
|
||||||
|
let resolver = make_resolver();
|
||||||
|
|
||||||
|
let json = resolver.route_json("fp1", &Route::Local);
|
||||||
|
assert_eq!(json["route"], "local");
|
||||||
|
assert_eq!(json["fingerprint"], "fp1");
|
||||||
|
assert_eq!(json["relay_chain"].as_array().unwrap().len(), 1);
|
||||||
|
|
||||||
|
let json = resolver.route_json("fp2", &Route::DirectPeer(addr("10.0.0.2:4433")));
|
||||||
|
assert_eq!(json["route"], "direct_peer");
|
||||||
|
assert_eq!(json["relay_chain"].as_array().unwrap().len(), 2);
|
||||||
|
|
||||||
|
let json = resolver.route_json("fp3", &Route::NotFound);
|
||||||
|
assert_eq!(json["route"], "not_found");
|
||||||
|
assert_eq!(json["relay_chain"].as_array().unwrap().len(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user