feat: P3-T4 relay presence registry — gossip fingerprints across relay mesh

PresenceRegistry tracks who is connected where:
- register_local/unregister_local for directly connected users
- update_peer for fingerprints reported by peer relays
- lookup returns Local or Remote(addr)
- expire_stale removes entries older than timeout

Gossip via probe connections:
- New SignalMessage::PresenceUpdate { fingerprints, relay_addr }
- Probes send local fingerprints every 10s alongside Ping/Pong
- Receiving relay updates its remote presence table

HTTP API on metrics port:
- GET /presence — all known fingerprints + locations
- GET /presence/:fingerprint — single lookup
- GET /peers — peer relays + their connected users

Wired into relay main:
- Registry created at startup
- register_local after auth+handshake
- unregister_local on disconnect
- Passed to probe mesh and metrics server

Also marks FC-10 as DONE in integration tracker.

48 relay tests + 42 proto tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-29 17:36:55 +04:00
parent fd95167705
commit 464e95a4bd
9 changed files with 546 additions and 18 deletions

View File

@@ -156,14 +156,19 @@ impl SlidingWindow {
pub struct ProbeRunner {
config: ProbeConfig,
metrics: ProbeMetrics,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
}
impl ProbeRunner {
/// Create a new probe runner, registering metrics with the given registry.
pub fn new(config: ProbeConfig, registry: &Registry) -> Self {
pub fn new(
config: ProbeConfig,
registry: &Registry,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
) -> Self {
let target_str = config.target.to_string();
let metrics = ProbeMetrics::register(&target_str, registry);
Self { config, metrics }
Self { config, metrics, presence }
}
/// Run the probe forever. This function never returns under normal operation.
@@ -215,6 +220,8 @@ impl ProbeRunner {
let jitter_gauge = self.metrics.jitter_ms.clone();
let up_gauge = self.metrics.up.clone();
let recv_presence = self.presence.clone();
let recv_target = self.config.target;
let recv_handle = tokio::spawn(async move {
loop {
match recv_transport.recv_signal().await {
@@ -230,8 +237,17 @@ impl ProbeRunner {
loss_gauge.set(w.loss_pct());
jitter_gauge.set(w.jitter_ms());
}
Ok(Some(SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => {
if let Some(ref reg) = recv_presence {
// Parse the relay_addr; fall back to the connection target
let addr = relay_addr.parse().unwrap_or(recv_target);
let fps: std::collections::HashSet<String> = fingerprints.into_iter().collect();
let mut r = reg.lock().await;
r.update_peer(addr, fps);
}
}
Ok(Some(_)) => {
// Ignore non-Pong signals
// Ignore other signals
}
Ok(None) => {
info!("probe recv: connection closed");
@@ -247,8 +263,9 @@ impl ProbeRunner {
}
});
// Send ping loop
// Send ping loop (+ presence gossip every 10 pings)
let mut interval = tokio::time::interval(self.config.interval);
let mut ping_count: u64 = 0;
loop {
interval.tick().await;
@@ -275,6 +292,24 @@ impl ProbeRunner {
recv_handle.abort();
return Err(e.into());
}
// Send presence update every 10 pings (~10 seconds)
ping_count += 1;
if ping_count % 10 == 0 {
if let Some(ref reg) = self.presence {
let fps: Vec<String> = {
let r = reg.lock().await;
r.local_fingerprints().into_iter().collect()
};
let msg = SignalMessage::PresenceUpdate {
fingerprints: fps,
relay_addr: self.config.target.to_string(),
};
if let Err(e) = transport.send_signal(&msg).await {
warn!(target = %self.config.target, "presence update send error: {e}");
}
}
}
}
}
}
@@ -289,12 +324,16 @@ pub struct ProbeMesh {
impl ProbeMesh {
/// Create a new mesh coordinator, registering metrics for every target.
pub fn new(targets: Vec<SocketAddr>, registry: &Registry) -> Self {
pub fn new(
targets: Vec<SocketAddr>,
registry: &Registry,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
) -> Self {
let runners = targets
.into_iter()
.map(|addr| {
let config = ProbeConfig::new(addr);
ProbeRunner::new(config, registry)
ProbeRunner::new(config, registry, presence.clone())
})
.collect();
Self { runners }
@@ -409,6 +448,7 @@ mod tests {
fn probe_metrics_register() {
let registry = Registry::new();
let _metrics = ProbeMetrics::register("127.0.0.1:4433", &registry);
// (ProbeRunner::new signature changed but this test only checks ProbeMetrics)
let encoder = prometheus::TextEncoder::new();
let families = registry.gather();
@@ -526,7 +566,7 @@ mod tests {
"127.0.0.2:4433".parse().unwrap(),
"127.0.0.3:4433".parse().unwrap(),
];
let mesh = ProbeMesh::new(targets, &registry);
let mesh = ProbeMesh::new(targets, &registry, None);
assert_eq!(mesh.target_count(), 3);
// Verify metrics were registered for each target
@@ -586,7 +626,7 @@ mod tests {
#[test]
fn mesh_zero_targets() {
let registry = Registry::new();
let mesh = ProbeMesh::new(vec![], &registry);
let mesh = ProbeMesh::new(vec![], &registry, None);
assert_eq!(mesh.target_count(), 0);
}
}