From 464e95a4bd6a26361b4d9c6c898fafc4847b3e46 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sun, 29 Mar 2026 17:36:55 +0400 Subject: [PATCH] =?UTF-8?q?feat:=20P3-T4=20relay=20presence=20registry=20?= =?UTF-8?q?=E2=80=94=20gossip=20fingerprints=20across=20relay=20mesh?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PresenceRegistry tracks who is connected where: - register_local/unregister_local for directly connected users - update_peer for fingerprints reported by peer relays - lookup returns Local or Remote(addr) - expire_stale removes entries older than timeout Gossip via probe connections: - New SignalMessage::PresenceUpdate { fingerprints, relay_addr } - Probes send local fingerprints every 10s alongside Ping/Pong - Receiving relay updates its remote presence table HTTP API on metrics port: - GET /presence — all known fingerprints + locations - GET /presence/:fingerprint — single lookup - GET /peers — peer relays + their connected users Wired into relay main: - Registry created at startup - register_local after auth+handshake - unregister_local on disconnect - Passed to probe mesh and metrics server Also marks FC-10 as DONE in integration tracker. 48 relay tests + 42 proto tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 2 +- crates/wzp-client/src/featherchat.rs | 1 + crates/wzp-proto/src/packet.rs | 44 ++++ crates/wzp-relay/src/lib.rs | 1 + crates/wzp-relay/src/main.rs | 49 +++- crates/wzp-relay/src/metrics.rs | 74 +++++- crates/wzp-relay/src/presence.rs | 333 +++++++++++++++++++++++++++ crates/wzp-relay/src/probe.rs | 56 ++++- docs/INTEGRATION_TASKS.md | 4 +- 9 files changed, 546 insertions(+), 18 deletions(-) create mode 100644 crates/wzp-relay/src/presence.rs diff --git a/Cargo.lock b/Cargo.lock index ddf928b..24a5d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3507,7 +3507,7 @@ dependencies = [ [[package]] name = "warzone-protocol" -version = "0.0.21" +version = "0.0.38" dependencies = [ "base64", "bincode", diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index 46bdbaa..8253a48 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -104,6 +104,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::Unmute => CallSignalType::Unmute, SignalMessage::Transfer { .. } => CallSignalType::Transfer, SignalMessage::TransferAck => CallSignalType::Offer, // reuse + SignalMessage::PresenceUpdate { .. } => CallSignalType::Offer, // reuse } } diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index d58fe5e..c4fb260 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -591,6 +591,16 @@ pub enum SignalMessage { }, /// Acknowledge a transfer request. TransferAck, + + /// Presence update from a peer relay (gossip protocol). + /// Sent periodically over probe connections to share which fingerprints + /// are connected to the sending relay. + PresenceUpdate { + /// Fingerprints currently connected to the sending relay. + fingerprints: Vec, + /// Address of the sending relay (e.g., "192.168.1.10:4433"). + relay_addr: String, + }, } /// Reasons for ending a call. @@ -776,6 +786,40 @@ mod tests { assert!(matches!(decoded, SignalMessage::TransferAck)); } + #[test] + fn presence_update_signal_roundtrip() { + let msg = SignalMessage::PresenceUpdate { + fingerprints: vec!["aabb".to_string(), "ccdd".to_string()], + relay_addr: "10.0.0.1:4433".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let decoded: SignalMessage = serde_json::from_str(&json).unwrap(); + match decoded { + SignalMessage::PresenceUpdate { fingerprints, relay_addr } => { + assert_eq!(fingerprints.len(), 2); + assert!(fingerprints.contains(&"aabb".to_string())); + assert!(fingerprints.contains(&"ccdd".to_string())); + assert_eq!(relay_addr, "10.0.0.1:4433"); + } + _ => panic!("expected PresenceUpdate variant"), + } + + // Empty fingerprints list + let msg_empty = SignalMessage::PresenceUpdate { + fingerprints: vec![], + relay_addr: "10.0.0.2:4433".to_string(), + }; + let json = serde_json::to_string(&msg_empty).unwrap(); + let decoded: SignalMessage = serde_json::from_str(&json).unwrap(); + match decoded { + SignalMessage::PresenceUpdate { fingerprints, relay_addr } => { + assert!(fingerprints.is_empty()); + assert_eq!(relay_addr, "10.0.0.2:4433"); + } + _ => panic!("expected PresenceUpdate variant"), + } + } + #[test] fn fec_ratio_encode_decode() { let ratio = 0.5; diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index dfd998e..75c7dca 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -12,6 +12,7 @@ pub mod config; pub mod handshake; pub mod metrics; pub mod pipeline; +pub mod presence; pub mod probe; pub mod room; pub mod session_mgr; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index aabd1e9..8e7b981 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -19,6 +19,7 @@ use wzp_proto::MediaTransport; use wzp_relay::config::RelayConfig; use wzp_relay::metrics::RelayMetrics; use wzp_relay::pipeline::{PipelineConfig, RelayPipeline}; +use wzp_relay::presence::PresenceRegistry; use wzp_relay::room::{self, RoomManager}; use wzp_relay::session_mgr::SessionManager; @@ -176,11 +177,15 @@ async fn main() -> anyhow::Result<()> { .install_default() .expect("failed to install rustls crypto provider"); + // Presence registry + let presence = Arc::new(Mutex::new(PresenceRegistry::new())); + // Prometheus metrics let metrics = Arc::new(RelayMetrics::new()); if let Some(port) = config.metrics_port { let m = metrics.clone(); - tokio::spawn(wzp_relay::metrics::serve_metrics(port, m)); + let p = Some(presence.clone()); + tokio::spawn(wzp_relay::metrics::serve_metrics(port, m, p)); } // Generate ephemeral relay identity for crypto handshake @@ -214,6 +219,7 @@ async fn main() -> anyhow::Result<()> { let mesh = wzp_relay::probe::ProbeMesh::new( config.probe_targets.clone(), metrics.registry(), + Some(presence.clone()), ); info!( targets = mesh.target_count(), @@ -244,6 +250,7 @@ async fn main() -> anyhow::Result<()> { let relay_seed_bytes = relay_seed.0; let metrics = metrics.clone(); let trunking_enabled = config.trunking_enabled; + let presence = presence.clone(); tokio::spawn(async move { let addr = connection.remote_address(); @@ -259,9 +266,9 @@ async fn main() -> anyhow::Result<()> { let transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); // Probe connections use SNI "_probe" to identify themselves. - // They skip auth + handshake and just do Ping->Pong. + // They skip auth + handshake and just do Ping->Pong + presence gossip. if room_name == "_probe" { - info!(%addr, "probe connection detected, entering Ping/Pong responder"); + info!(%addr, "probe connection detected, entering Ping/Pong + presence responder"); loop { match transport.recv_signal().await { Ok(Some(wzp_proto::SignalMessage::Ping { timestamp_ms })) => { @@ -272,8 +279,30 @@ async fn main() -> anyhow::Result<()> { break; } } + Ok(Some(wzp_proto::SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => { + // A peer relay is telling us which fingerprints it has + let peer_addr: std::net::SocketAddr = relay_addr.parse().unwrap_or(addr); + let fps: std::collections::HashSet = fingerprints.into_iter().collect(); + { + let mut reg = presence.lock().await; + reg.update_peer(peer_addr, fps); + } + // Reply with our own local fingerprints + let local_fps: Vec = { + let reg = presence.lock().await; + reg.local_fingerprints().into_iter().collect() + }; + let reply = wzp_proto::SignalMessage::PresenceUpdate { + fingerprints: local_fps, + relay_addr: addr.to_string(), + }; + if let Err(e) = transport.send_signal(&reply).await { + error!(%addr, "presence reply send error: {e}"); + break; + } + } Ok(Some(_)) => { - // Ignore non-Ping signals on probe connections + // Ignore other signals on probe connections } Ok(None) => { info!(%addr, "probe connection closed"); @@ -352,6 +381,12 @@ async fn main() -> anyhow::Result<()> { } }; + // Register in presence registry + if let Some(ref fp) = authenticated_fp { + let mut reg = presence.lock().await; + reg.register_local(fp, None, Some(room_name.clone())); + } + info!(%addr, room = %room_name, "client joining"); if let Some(remote) = remote_transport { @@ -431,7 +466,11 @@ async fn main() -> anyhow::Result<()> { trunking_enabled, ).await; - // Participant disconnected — clean up per-session metrics + // Participant disconnected — clean up presence + per-session metrics + if let Some(ref fp) = authenticated_fp { + let mut reg = presence.lock().await; + reg.unregister_local(fp); + } metrics.remove_session_metrics(&session_id_str); metrics.active_sessions.dec(); { diff --git a/crates/wzp-relay/src/metrics.rs b/crates/wzp-relay/src/metrics.rs index fb7e728..f4e4738 100644 --- a/crates/wzp-relay/src/metrics.rs +++ b/crates/wzp-relay/src/metrics.rs @@ -201,11 +201,19 @@ impl RelayMetrics { } } -/// Start an HTTP server serving GET /metrics and GET /mesh on the given port. -pub async fn serve_metrics(port: u16, metrics: Arc) { - use axum::{routing::get, Router}; +/// Start an HTTP server serving GET /metrics, GET /mesh, and presence endpoints on the given port. +pub async fn serve_metrics( + port: u16, + metrics: Arc, + presence: Option>>, +) { + use axum::{extract::Path, routing::get, Router}; let metrics_clone = metrics.clone(); + let presence_all = presence.clone(); + let presence_lookup = presence.clone(); + let presence_peers = presence; + let app = Router::new() .route( "/metrics", @@ -220,6 +228,66 @@ pub async fn serve_metrics(port: u16, metrics: Arc) { let m = metrics_clone.clone(); async move { crate::probe::mesh_summary(m.registry()) } }), + ) + .route( + "/presence", + get(move || { + let reg = presence_all.clone(); + async move { + match reg { + Some(r) => { + let r = r.lock().await; + let entries: Vec = r.all_known().into_iter().map(|(fp, loc)| { + serde_json::json!({ "fingerprint": fp, "location": loc }) + }).collect(); + serde_json::to_string_pretty(&entries).unwrap_or_else(|_| "[]".to_string()) + } + None => "[]".to_string(), + } + } + }), + ) + .route( + "/presence/:fingerprint", + get(move |Path(fingerprint): Path| { + let reg = presence_lookup.clone(); + async move { + match reg { + Some(r) => { + let r = r.lock().await; + match r.lookup(&fingerprint) { + Some(loc) => serde_json::to_string_pretty( + &serde_json::json!({ "fingerprint": fingerprint, "location": loc }) + ).unwrap_or_else(|_| "{}".to_string()), + None => serde_json::json!({ "fingerprint": fingerprint, "location": null }).to_string(), + } + } + None => serde_json::json!({ "fingerprint": fingerprint, "location": null }).to_string(), + } + } + }), + ) + .route( + "/peers", + get(move || { + let reg = presence_peers.clone(); + async move { + match reg { + Some(r) => { + let r = r.lock().await; + let peers: Vec = r.peers().iter().map(|(addr, peer)| { + serde_json::json!({ + "addr": addr.to_string(), + "fingerprints": peer.fingerprints.iter().collect::>(), + "rtt_ms": peer.rtt_ms, + }) + }).collect(); + serde_json::to_string_pretty(&peers).unwrap_or_else(|_| "[]".to_string()) + } + None => "[]".to_string(), + } + } + }), ); let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); diff --git a/crates/wzp-relay/src/presence.rs b/crates/wzp-relay/src/presence.rs new file mode 100644 index 0000000..9b999f1 --- /dev/null +++ b/crates/wzp-relay/src/presence.rs @@ -0,0 +1,333 @@ +//! Presence registry — tracks which fingerprints are connected to this relay +//! and to peer relays (via gossip over probe connections). +//! +//! This enables route resolution: given a fingerprint, determine whether the +//! user is local, on a known peer relay, or unknown. + +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use serde::Serialize; + +// --------------------------------------------------------------------------- +// Data structures +// --------------------------------------------------------------------------- + +/// Where a fingerprint is connected. +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub enum PresenceLocation { + /// Connected directly to this relay. + Local, + /// Connected to a peer relay at the given address. + Remote(SocketAddr), +} + +/// Presence entry for a fingerprint connected directly to this relay. +#[derive(Clone, Debug)] +pub struct LocalPresence { + pub fingerprint: String, + pub alias: Option, + pub connected_at: Instant, + pub room: Option, +} + +/// Presence entry for a fingerprint reported by a peer relay. +#[derive(Clone, Debug)] +pub struct RemotePresence { + pub fingerprint: String, + pub relay_addr: SocketAddr, + pub last_seen: Instant, +} + +/// Known peer relay and its reported fingerprints. +#[derive(Clone, Debug)] +pub struct PeerRelay { + pub addr: SocketAddr, + pub fingerprints: HashSet, + pub last_update: Instant, + pub rtt_ms: Option, +} + +// --------------------------------------------------------------------------- +// Registry +// --------------------------------------------------------------------------- + +/// Central presence registry tracking local and remote fingerprints. +pub struct PresenceRegistry { + /// Fingerprints connected directly to THIS relay. + local: HashMap, + /// Fingerprints reported by peer relays (via gossip). + remote: HashMap, + /// Known peer relays and their status. + peers: HashMap, +} + +impl PresenceRegistry { + /// Create an empty registry. + pub fn new() -> Self { + Self { + local: HashMap::new(), + remote: HashMap::new(), + peers: HashMap::new(), + } + } + + /// Register a fingerprint as locally connected (called after auth + handshake). + pub fn register_local(&mut self, fingerprint: &str, alias: Option, room: Option) { + self.local.insert(fingerprint.to_string(), LocalPresence { + fingerprint: fingerprint.to_string(), + alias, + connected_at: Instant::now(), + room, + }); + } + + /// Unregister a locally connected fingerprint (called on disconnect). + pub fn unregister_local(&mut self, fingerprint: &str) { + self.local.remove(fingerprint); + } + + /// Update the fingerprints reported by a peer relay. + /// Replaces the previous set for that peer. + pub fn update_peer(&mut self, addr: SocketAddr, fingerprints: HashSet) { + let now = Instant::now(); + + // Remove old remote entries that belonged to this peer + self.remote.retain(|_, rp| rp.relay_addr != addr); + + // Insert new remote entries + for fp in &fingerprints { + self.remote.insert(fp.clone(), RemotePresence { + fingerprint: fp.clone(), + relay_addr: addr, + last_seen: now, + }); + } + + // Update the peer record + let peer = self.peers.entry(addr).or_insert_with(|| PeerRelay { + addr, + fingerprints: HashSet::new(), + last_update: now, + rtt_ms: None, + }); + peer.fingerprints = fingerprints; + peer.last_update = now; + } + + /// Look up where a fingerprint is connected. + /// Local presence takes priority over remote. + pub fn lookup(&self, fingerprint: &str) -> Option { + if self.local.contains_key(fingerprint) { + return Some(PresenceLocation::Local); + } + if let Some(rp) = self.remote.get(fingerprint) { + return Some(PresenceLocation::Remote(rp.relay_addr)); + } + None + } + + /// Return all fingerprints connected directly to this relay. + pub fn local_fingerprints(&self) -> HashSet { + self.local.keys().cloned().collect() + } + + /// Return a full dump of every known fingerprint and its location. + pub fn all_known(&self) -> Vec<(String, PresenceLocation)> { + let mut out = Vec::new(); + for fp in self.local.keys() { + out.push((fp.clone(), PresenceLocation::Local)); + } + for (fp, rp) in &self.remote { + // Skip if also local (local wins) + if !self.local.contains_key(fp) { + out.push((fp.clone(), PresenceLocation::Remote(rp.relay_addr))); + } + } + out + } + + /// Remove remote entries older than `timeout`. + pub fn expire_stale(&mut self, timeout: Duration) { + let cutoff = Instant::now() - timeout; + + // Expire remote presence entries + self.remote.retain(|_, rp| rp.last_seen > cutoff); + + // Expire peer relay records and their fingerprint sets + let stale_peers: Vec = self.peers + .iter() + .filter(|(_, p)| p.last_update <= cutoff) + .map(|(addr, _)| *addr) + .collect(); + + for addr in stale_peers { + self.peers.remove(&addr); + } + } + + /// Return a reference to the peer relay map (for HTTP API). + pub fn peers(&self) -> &HashMap { + &self.peers + } + + /// Return a reference to the local presence map (for HTTP API). + pub fn local_entries(&self) -> &HashMap { + &self.local + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + + fn addr(s: &str) -> SocketAddr { + s.parse().unwrap() + } + + #[test] + fn register_and_lookup_local() { + let mut reg = PresenceRegistry::new(); + reg.register_local("aabbccdd", Some("alice".into()), Some("room1".into())); + + assert_eq!(reg.lookup("aabbccdd"), Some(PresenceLocation::Local)); + // Unknown fingerprint returns None + assert_eq!(reg.lookup("00000000"), None); + } + + #[test] + fn unregister_removes() { + let mut reg = PresenceRegistry::new(); + reg.register_local("aabbccdd", None, None); + assert_eq!(reg.lookup("aabbccdd"), Some(PresenceLocation::Local)); + + reg.unregister_local("aabbccdd"); + assert_eq!(reg.lookup("aabbccdd"), None); + } + + #[test] + fn update_peer_and_lookup() { + let mut reg = PresenceRegistry::new(); + let peer = addr("10.0.0.2:4433"); + let mut fps = HashSet::new(); + fps.insert("deadbeef".to_string()); + fps.insert("cafebabe".to_string()); + + reg.update_peer(peer, fps); + + assert_eq!(reg.lookup("deadbeef"), Some(PresenceLocation::Remote(peer))); + assert_eq!(reg.lookup("cafebabe"), Some(PresenceLocation::Remote(peer))); + assert_eq!(reg.lookup("unknown"), None); + } + + #[test] + fn expire_stale_removes_old() { + let mut reg = PresenceRegistry::new(); + let peer = addr("10.0.0.3:4433"); + + let mut fps = HashSet::new(); + fps.insert("olduser".to_string()); + reg.update_peer(peer, fps); + + // Verify it's there + assert_eq!(reg.lookup("olduser"), Some(PresenceLocation::Remote(peer))); + + // Manually backdate the last_seen and last_update + if let Some(rp) = reg.remote.get_mut("olduser") { + rp.last_seen = Instant::now() - Duration::from_secs(120); + } + if let Some(p) = reg.peers.get_mut(&peer) { + p.last_update = Instant::now() - Duration::from_secs(120); + } + + // Expire with 60s timeout — should remove the 120s-old entries + reg.expire_stale(Duration::from_secs(60)); + + assert_eq!(reg.lookup("olduser"), None); + assert!(reg.peers.get(&peer).is_none()); + } + + #[test] + fn local_fingerprints_list() { + let mut reg = PresenceRegistry::new(); + reg.register_local("fp1", None, None); + reg.register_local("fp2", Some("bob".into()), Some("room-a".into())); + reg.register_local("fp3", None, None); + + let fps = reg.local_fingerprints(); + assert_eq!(fps.len(), 3); + assert!(fps.contains("fp1")); + assert!(fps.contains("fp2")); + assert!(fps.contains("fp3")); + } + + #[test] + fn all_known_includes_local_and_remote() { + let mut reg = PresenceRegistry::new(); + reg.register_local("local1", None, None); + + let peer = addr("10.0.0.5:4433"); + let mut fps = HashSet::new(); + fps.insert("remote1".to_string()); + reg.update_peer(peer, fps); + + let all = reg.all_known(); + assert_eq!(all.len(), 2); + + let local_entries: Vec<_> = all.iter() + .filter(|(_, loc)| *loc == PresenceLocation::Local) + .collect(); + assert_eq!(local_entries.len(), 1); + assert_eq!(local_entries[0].0, "local1"); + + let remote_entries: Vec<_> = all.iter() + .filter(|(_, loc)| matches!(loc, PresenceLocation::Remote(_))) + .collect(); + assert_eq!(remote_entries.len(), 1); + assert_eq!(remote_entries[0].0, "remote1"); + } + + #[test] + fn local_overrides_remote_in_lookup() { + let mut reg = PresenceRegistry::new(); + let peer = addr("10.0.0.6:4433"); + + // Register as remote first + let mut fps = HashSet::new(); + fps.insert("dupfp".to_string()); + reg.update_peer(peer, fps); + assert_eq!(reg.lookup("dupfp"), Some(PresenceLocation::Remote(peer))); + + // Now register locally — local should win + reg.register_local("dupfp", None, None); + assert_eq!(reg.lookup("dupfp"), Some(PresenceLocation::Local)); + } + + #[test] + fn update_peer_replaces_old_fingerprints() { + let mut reg = PresenceRegistry::new(); + let peer = addr("10.0.0.7:4433"); + + let mut fps1 = HashSet::new(); + fps1.insert("user_a".to_string()); + fps1.insert("user_b".to_string()); + reg.update_peer(peer, fps1); + + assert_eq!(reg.lookup("user_a"), Some(PresenceLocation::Remote(peer))); + assert_eq!(reg.lookup("user_b"), Some(PresenceLocation::Remote(peer))); + + // Update with only user_b — user_a should be gone + let mut fps2 = HashSet::new(); + fps2.insert("user_b".to_string()); + reg.update_peer(peer, fps2); + + assert_eq!(reg.lookup("user_a"), None); + assert_eq!(reg.lookup("user_b"), Some(PresenceLocation::Remote(peer))); + } +} diff --git a/crates/wzp-relay/src/probe.rs b/crates/wzp-relay/src/probe.rs index ab73f87..0693e11 100644 --- a/crates/wzp-relay/src/probe.rs +++ b/crates/wzp-relay/src/probe.rs @@ -156,14 +156,19 @@ impl SlidingWindow { pub struct ProbeRunner { config: ProbeConfig, metrics: ProbeMetrics, + presence: Option>>, } impl ProbeRunner { /// Create a new probe runner, registering metrics with the given registry. - pub fn new(config: ProbeConfig, registry: &Registry) -> Self { + pub fn new( + config: ProbeConfig, + registry: &Registry, + presence: Option>>, + ) -> Self { let target_str = config.target.to_string(); let metrics = ProbeMetrics::register(&target_str, registry); - Self { config, metrics } + Self { config, metrics, presence } } /// Run the probe forever. This function never returns under normal operation. @@ -215,6 +220,8 @@ impl ProbeRunner { let jitter_gauge = self.metrics.jitter_ms.clone(); let up_gauge = self.metrics.up.clone(); + let recv_presence = self.presence.clone(); + let recv_target = self.config.target; let recv_handle = tokio::spawn(async move { loop { match recv_transport.recv_signal().await { @@ -230,8 +237,17 @@ impl ProbeRunner { loss_gauge.set(w.loss_pct()); jitter_gauge.set(w.jitter_ms()); } + Ok(Some(SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => { + if let Some(ref reg) = recv_presence { + // Parse the relay_addr; fall back to the connection target + let addr = relay_addr.parse().unwrap_or(recv_target); + let fps: std::collections::HashSet = fingerprints.into_iter().collect(); + let mut r = reg.lock().await; + r.update_peer(addr, fps); + } + } Ok(Some(_)) => { - // Ignore non-Pong signals + // Ignore other signals } Ok(None) => { info!("probe recv: connection closed"); @@ -247,8 +263,9 @@ impl ProbeRunner { } }); - // Send ping loop + // Send ping loop (+ presence gossip every 10 pings) let mut interval = tokio::time::interval(self.config.interval); + let mut ping_count: u64 = 0; loop { interval.tick().await; @@ -275,6 +292,24 @@ impl ProbeRunner { recv_handle.abort(); return Err(e.into()); } + + // Send presence update every 10 pings (~10 seconds) + ping_count += 1; + if ping_count % 10 == 0 { + if let Some(ref reg) = self.presence { + let fps: Vec = { + let r = reg.lock().await; + r.local_fingerprints().into_iter().collect() + }; + let msg = SignalMessage::PresenceUpdate { + fingerprints: fps, + relay_addr: self.config.target.to_string(), + }; + if let Err(e) = transport.send_signal(&msg).await { + warn!(target = %self.config.target, "presence update send error: {e}"); + } + } + } } } } @@ -289,12 +324,16 @@ pub struct ProbeMesh { impl ProbeMesh { /// Create a new mesh coordinator, registering metrics for every target. - pub fn new(targets: Vec, registry: &Registry) -> Self { + pub fn new( + targets: Vec, + registry: &Registry, + presence: Option>>, + ) -> Self { let runners = targets .into_iter() .map(|addr| { let config = ProbeConfig::new(addr); - ProbeRunner::new(config, registry) + ProbeRunner::new(config, registry, presence.clone()) }) .collect(); Self { runners } @@ -409,6 +448,7 @@ mod tests { fn probe_metrics_register() { let registry = Registry::new(); let _metrics = ProbeMetrics::register("127.0.0.1:4433", ®istry); + // (ProbeRunner::new signature changed but this test only checks ProbeMetrics) let encoder = prometheus::TextEncoder::new(); let families = registry.gather(); @@ -526,7 +566,7 @@ mod tests { "127.0.0.2:4433".parse().unwrap(), "127.0.0.3:4433".parse().unwrap(), ]; - let mesh = ProbeMesh::new(targets, ®istry); + let mesh = ProbeMesh::new(targets, ®istry, None); assert_eq!(mesh.target_count(), 3); // Verify metrics were registered for each target @@ -586,7 +626,7 @@ mod tests { #[test] fn mesh_zero_targets() { let registry = Registry::new(); - let mesh = ProbeMesh::new(vec![], ®istry); + let mesh = ProbeMesh::new(vec![], ®istry, None); assert_eq!(mesh.target_count(), 0); } } diff --git a/docs/INTEGRATION_TASKS.md b/docs/INTEGRATION_TASKS.md index 0b0b2d7..fa0f5a5 100644 --- a/docs/INTEGRATION_TASKS.md +++ b/docs/INTEGRATION_TASKS.md @@ -77,7 +77,9 @@ Based on featherChat commit 65f6390 — FUTURE_TASKS.md with WZP integration ite ### WZP-FC-7. Missed call notifications — TODO (0.5d) ### WZP-FC-8. Cross-project identity verification — DONE (15 tests, 26dc848) ### WZP-FC-9. HKDF salt investigation — DONE (no mismatch) -### WZP-FC-10. Web bridge shared auth — TODO (1-2d) +### WZP-FC-10. Web bridge shared auth — DONE +- FC: GET /v1/wzp/relay-config, CORS layer, service token +- WZP: web bridge --auth-url validates browser tokens via FC ### FC-CRATE-1. Standalone warzone-protocol — DONE (v0.0.21, 4a4fa9f) ---