From f8eaf30bb43e343c2b89fc79adaec90bbcfc439c Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sat, 28 Mar 2026 16:56:13 +0400 Subject: [PATCH] refactor: federation uses persistent WS instead of HTTP polling - Server-to-server communication via WebSocket at /v1/federation/ws - Auth as first WS frame (shared secret), presence + forwards over same connection - Auto-reconnect every 3s on disconnect, instant presence push on connect - Replaces HTTP REST polling (no more 5s intervals, lower latency) - Removed dead HMAC helpers (auth is now direct secret comparison over WS) - Simplified ARCHITECTURE.md mermaid diagrams for Gitea rendering Co-Authored-By: Claude Opus 4.6 (1M context) --- warzone/Cargo.lock | 43 ++- warzone/Cargo.toml | 3 + warzone/crates/warzone-server/Cargo.toml | 1 + .../crates/warzone-server/src/federation.rs | 280 ++++++++++-------- warzone/crates/warzone-server/src/main.rs | 10 +- .../warzone-server/src/routes/federation.rs | 217 +++++++------- warzone/docs/ARCHITECTURE.md | 116 +++----- 7 files changed, 364 insertions(+), 306 deletions(-) diff --git a/warzone/Cargo.lock b/warzone/Cargo.lock index 0bd3fd2..f2bd0a8 100644 --- a/warzone/Cargo.lock +++ b/warzone/Cargo.lock @@ -163,7 +163,7 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "tower 0.5.3", "tower-layer", "tower-service", @@ -1795,7 +1795,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2593,6 +2593,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-tungstenite" version = "0.24.0" @@ -2604,7 +2618,7 @@ dependencies = [ "native-tls", "tokio", "tokio-native-tls", - "tungstenite", + "tungstenite 0.24.0", ] [[package]] @@ -2766,6 +2780,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.24.0" @@ -2943,7 +2977,7 @@ dependencies = [ "sha2", "sled", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "tracing", "tracing-subscriber", "url", @@ -3008,6 +3042,7 @@ dependencies = [ "sled", "thiserror 2.0.18", "tokio", + "tokio-tungstenite 0.21.0", "tower 0.4.13", "tower-http 0.5.2", "tracing", diff --git a/warzone/Cargo.toml b/warzone/Cargo.toml index 2d1fa5c..da33c53 100644 --- a/warzone/Cargo.toml +++ b/warzone/Cargo.toml @@ -78,5 +78,8 @@ base64 = "0.22" # UUID uuid = { version = "1", features = ["v4", "serde"] } +# WebSocket client +tokio-tungstenite = { version = "0.21", features = ["native-tls"] } + # Zero secrets in memory zeroize = { version = "1", features = ["derive"] } diff --git a/warzone/crates/warzone-server/Cargo.toml b/warzone/crates/warzone-server/Cargo.toml index 8753ffe..a6bb3b5 100644 --- a/warzone/crates/warzone-server/Cargo.toml +++ b/warzone/crates/warzone-server/Cargo.toml @@ -27,3 +27,4 @@ ed25519-dalek.workspace = true bincode.workspace = true sha2.workspace = true reqwest = { workspace = true, features = ["rustls-tls", "json"] } +tokio-tungstenite.workspace = true diff --git a/warzone/crates/warzone-server/src/federation.rs b/warzone/crates/warzone-server/src/federation.rs index 3c5500a..6566562 100644 --- a/warzone/crates/warzone-server/src/federation.rs +++ b/warzone/crates/warzone-server/src/federation.rs @@ -1,12 +1,12 @@ -//! Federation: two-server message relay with shared-secret authentication. +//! Federation: two-server message relay via persistent WebSocket. //! -//! Each server periodically announces its connected clients to the peer. -//! When a message is destined for a remote client, it's forwarded via HTTP. +//! Each server maintains a WS connection to its peer. Presence updates +//! and message forwards flow over this single connection. Reconnects +//! automatically on failure. use std::collections::HashSet; use std::sync::Arc; use tokio::sync::Mutex; -use sha2::{Sha256, Digest}; /// Federation configuration loaded from JSON. #[derive(Clone, Debug, serde::Deserialize)] @@ -14,8 +14,6 @@ pub struct FederationConfig { pub server_id: String, pub shared_secret: String, pub peer: PeerConfig, - #[serde(default = "default_interval")] - pub presence_interval_secs: u64, } #[derive(Clone, Debug, serde::Deserialize)] @@ -24,9 +22,7 @@ pub struct PeerConfig { pub url: String, } -fn default_interval() -> u64 { 5 } - -/// Load federation config from a JSON file. Returns None if path is empty. +/// Load federation config from a JSON file. pub fn load_config(path: &str) -> anyhow::Result { let data = std::fs::read_to_string(path) .map_err(|e| anyhow::anyhow!("failed to read federation config '{}': {}", path, e))?; @@ -38,175 +34,227 @@ pub fn load_config(path: &str) -> anyhow::Result { /// Remote presence: which fingerprints are on the peer server. #[derive(Clone, Debug)] pub struct RemotePresence { - pub peer_url: String, pub peer_id: String, pub fingerprints: HashSet, pub last_updated: i64, + pub connected: bool, } impl RemotePresence { - pub fn new(peer_url: String, peer_id: String) -> Self { + pub fn new(peer_id: String) -> Self { RemotePresence { - peer_url, peer_id, fingerprints: HashSet::new(), last_updated: 0, + connected: false, } } - /// Check if a fingerprint is on the remote server. pub fn contains(&self, fp: &str) -> bool { - self.fingerprints.contains(fp) - } - - /// Is the peer still alive? (heard from within 3 intervals) - pub fn is_alive(&self, interval_secs: u64) -> bool { - let now = chrono::Utc::now().timestamp(); - now - self.last_updated < (interval_secs as i64 * 3) + self.connected && self.fingerprints.contains(fp) } } +/// Sender for outgoing federation messages over the WS. +pub type FederationSender = Arc>>>; + /// Handle for communicating with the federation peer. #[derive(Clone)] pub struct FederationHandle { pub config: FederationConfig, - pub client: reqwest::Client, pub remote_presence: Arc>, + /// Channel to send messages over the outgoing WS to the peer. + pub outgoing: FederationSender, } impl FederationHandle { pub fn new(config: FederationConfig) -> Self { let remote_presence = Arc::new(Mutex::new(RemotePresence::new( - config.peer.url.clone(), config.peer.id.clone(), ))); - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(5)) - .build() - .expect("failed to build HTTP client"); - FederationHandle { config, client, remote_presence } + FederationHandle { + config, + remote_presence, + outgoing: Arc::new(Mutex::new(None)), + } } /// Check if a fingerprint is known to be on the peer server. pub async fn is_remote(&self, fp: &str) -> bool { let rp = self.remote_presence.lock().await; - rp.is_alive(self.config.presence_interval_secs) && rp.contains(fp) + rp.contains(fp) } - /// Forward a message to the peer server for delivery. - /// Returns true if the peer accepted it. + /// Forward a message to the peer server via the persistent WS. pub async fn forward_message(&self, to_fp: &str, message: &[u8]) -> bool { - let url = format!("{}/v1/federation/forward", self.config.peer.url); - let body = serde_json::json!({ + let msg = serde_json::json!({ + "type": "forward", "to": to_fp, "message": base64::Engine::encode(&base64::engine::general_purpose::STANDARD, message), "from_server": self.config.server_id, }); - let body_str = serde_json::to_string(&body).unwrap_or_default(); - let token = compute_token(&self.config.shared_secret, body_str.as_bytes()); - - match self.client.post(&url) - .header("X-Federation-Token", &token) - .header("Content-Type", "application/json") - .body(body_str) - .send() - .await - { - Ok(resp) if resp.status().is_success() => { - tracing::debug!("Federation: forwarded message to {} for {}", self.config.peer.id, to_fp); - true - } - Ok(resp) => { - tracing::warn!("Federation: peer {} rejected forward: {}", self.config.peer.id, resp.status()); - false - } - Err(e) => { - tracing::warn!("Federation: failed to forward to {}: {}", self.config.peer.id, e); - false - } - } + self.send_json(msg).await } - /// Send our local presence to the peer. - pub async fn announce_presence(&self, fingerprints: Vec) -> bool { - let url = format!("{}/v1/federation/presence", self.config.peer.url); - let body = serde_json::json!({ + /// Push local presence to peer via the persistent WS. + pub async fn push_presence(&self, fingerprints: Vec) -> bool { + let msg = serde_json::json!({ + "type": "presence", "server_id": self.config.server_id, "fingerprints": fingerprints, - "timestamp": chrono::Utc::now().timestamp(), }); - let body_str = serde_json::to_string(&body).unwrap_or_default(); - let token = compute_token(&self.config.shared_secret, body_str.as_bytes()); + self.send_json(msg).await + } - match self.client.post(&url) - .header("X-Federation-Token", &token) - .header("Content-Type", "application/json") - .body(body_str) - .send() - .await - { - Ok(resp) if resp.status().is_success() => true, - Ok(resp) => { - tracing::warn!("Federation: presence announce to {} failed: {}", self.config.peer.id, resp.status()); - false - } - Err(e) => { - tracing::warn!("Federation: presence announce to {} error: {}", self.config.peer.id, e); - false - } + /// Send a JSON message over the outgoing WS channel. + async fn send_json(&self, msg: serde_json::Value) -> bool { + let guard = self.outgoing.lock().await; + if let Some(ref tx) = *guard { + let json_str = serde_json::to_string(&msg).unwrap_or_default(); + tx.send(json_str).is_ok() + } else { + false } } } -/// Background task: periodically sync presence with peer. -pub async fn presence_sync_loop( +/// Background task: connect to peer's WS endpoint, send auth, then loop. +/// Handles reconnection on failure. +pub async fn outgoing_ws_loop( handle: FederationHandle, - connections: crate::state::Connections, + state: crate::state::AppState, ) { - let interval = std::time::Duration::from_secs(handle.config.presence_interval_secs); - tracing::info!( - "Federation: presence sync started (peer={}, interval={}s)", - handle.config.peer.id, handle.config.presence_interval_secs - ); + let ws_url = handle.config.peer.url + .replace("http://", "ws://") + .replace("https://", "wss://"); + let ws_url = format!("{}/v1/federation/ws", ws_url); loop { - // Collect local fingerprints - let fps: Vec = { - let conns = connections.lock().await; - conns.keys().cloned().collect() - }; + tracing::info!("Federation: connecting to peer {} at {}", handle.config.peer.id, ws_url); - // Announce to peer - let ok = handle.announce_presence(fps.clone()).await; - if ok { - tracing::debug!("Federation: announced {} fingerprints to {}", fps.len(), handle.config.peer.id); - } + match tokio_tungstenite::connect_async(&ws_url).await { + Ok((ws_stream, _)) => { + tracing::info!("Federation: connected to peer {}", handle.config.peer.id); - // Clear stale remote presence if peer hasn't responded - { - let mut rp = handle.remote_presence.lock().await; - if !rp.is_alive(handle.config.presence_interval_secs) && !rp.fingerprints.is_empty() { - tracing::warn!("Federation: peer {} stale — clearing remote presence ({} fps)", - handle.config.peer.id, rp.fingerprints.len()); - rp.fingerprints.clear(); + use futures_util::{SinkExt, StreamExt}; + let (mut ws_tx, mut ws_rx) = ws_stream.split(); + + // Send auth as first message + let auth_msg = serde_json::json!({ + "type": "auth", + "secret": handle.config.shared_secret, + "server_id": handle.config.server_id, + }); + if ws_tx.send(tokio_tungstenite::tungstenite::Message::Text( + serde_json::to_string(&auth_msg).unwrap_or_default() + )).await.is_err() { + tracing::warn!("Federation: failed to send auth to peer"); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + continue; + } + + // Set up outgoing channel + let (out_tx, mut out_rx) = tokio::sync::mpsc::unbounded_channel::(); + { + let mut guard = handle.outgoing.lock().await; + *guard = Some(out_tx); + } + { + let mut rp = handle.remote_presence.lock().await; + rp.connected = true; + } + + // Send initial presence + let fps: Vec = { + let conns = state.connections.lock().await; + conns.keys().cloned().collect() + }; + let _ = handle.push_presence(fps).await; + + // Spawn task to forward outgoing channel to WS + let send_task = tokio::spawn(async move { + while let Some(msg) = out_rx.recv().await { + if ws_tx.send(tokio_tungstenite::tungstenite::Message::Text(msg)).await.is_err() { + break; + } + } + }); + + // Read incoming messages from peer + while let Some(Ok(msg)) = ws_rx.next().await { + if let tokio_tungstenite::tungstenite::Message::Text(text) = msg { + handle_incoming_federation_msg(&text, &handle, &state).await; + } + } + + // Connection lost + send_task.abort(); + { + let mut guard = handle.outgoing.lock().await; + *guard = None; + } + { + let mut rp = handle.remote_presence.lock().await; + rp.connected = false; + rp.fingerprints.clear(); + } + tracing::warn!("Federation: lost connection to peer {}, reconnecting...", handle.config.peer.id); + } + Err(e) => { + tracing::warn!("Federation: failed to connect to peer {}: {}", handle.config.peer.id, e); } } - tokio::time::sleep(interval).await; + tokio::time::sleep(std::time::Duration::from_secs(3)).await; } } -/// Compute an auth token: SHA-256(secret || body). Simple HMAC-like construction. -pub fn compute_token(secret: &str, body: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(secret.as_bytes()); - hasher.update(body); - hex::encode(hasher.finalize()) +/// Process a single incoming JSON message from the federated peer WS. +async fn handle_incoming_federation_msg( + text: &str, + handle: &FederationHandle, + state: &crate::state::AppState, +) { + let parsed: serde_json::Value = match serde_json::from_str(text) { + Ok(v) => v, + Err(_) => return, + }; + + let msg_type = parsed.get("type").and_then(|v| v.as_str()).unwrap_or(""); + + match msg_type { + "presence" => { + let fingerprints: Vec = parsed.get("fingerprints") + .and_then(|v| v.as_array()) + .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect()) + .unwrap_or_default(); + let server_id = parsed.get("server_id").and_then(|v| v.as_str()).unwrap_or("?"); + let count = fingerprints.len(); + + let mut rp = handle.remote_presence.lock().await; + rp.fingerprints = fingerprints.into_iter().collect(); + rp.last_updated = chrono::Utc::now().timestamp(); + tracing::debug!("Federation: received {} fingerprints from {}", count, server_id); + } + "forward" => { + let to = parsed.get("to").and_then(|v| v.as_str()).unwrap_or(""); + let message_b64 = parsed.get("message").and_then(|v| v.as_str()).unwrap_or(""); + let from_server = parsed.get("from_server").and_then(|v| v.as_str()).unwrap_or("?"); + + if let Ok(message) = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, message_b64) { + let delivered = state.push_to_client(to, &message).await; + if !delivered { + let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4()); + let _ = state.db.messages.insert(key.as_bytes(), message.as_slice()); + tracing::info!("Federation: queued message from {} for offline {}", from_server, to); + } else { + tracing::debug!("Federation: delivered message from {} to {}", from_server, to); + } + } + } + _ => { + tracing::debug!("Federation: unknown message type '{}'", msg_type); + } + } } -/// Verify an auth token. -pub fn verify_token(secret: &str, body: &[u8], token: &str) -> bool { - let expected = compute_token(secret, body); - // Constant-time comparison to prevent timing attacks - expected.len() == token.len() && expected.as_bytes().iter().zip(token.as_bytes()).all(|(a, b)| a == b) -} diff --git a/warzone/crates/warzone-server/src/main.rs b/warzone/crates/warzone-server/src/main.rs index 93d2c9c..418a120 100644 --- a/warzone/crates/warzone-server/src/main.rs +++ b/warzone/crates/warzone-server/src/main.rs @@ -49,12 +49,12 @@ async fn main() -> anyhow::Result<()> { state.federation = Some(handle); } - // Spawn federation presence sync if enabled - if let Some(ref federation) = state.federation { - let handle = federation.clone(); - let connections = state.connections.clone(); + // Spawn federation outgoing WS connection if enabled + if let Some(ref fed) = state.federation { + let handle = fed.clone(); + let fed_state = state.clone(); tokio::spawn(async move { - federation::presence_sync_loop(handle, connections).await; + federation::outgoing_ws_loop(handle, fed_state).await; }); } diff --git a/warzone/crates/warzone-server/src/routes/federation.rs b/warzone/crates/warzone-server/src/routes/federation.rs index 3d2d718..a9b34eb 100644 --- a/warzone/crates/warzone-server/src/routes/federation.rs +++ b/warzone/crates/warzone-server/src/routes/federation.rs @@ -1,124 +1,143 @@ -//! Federation route handlers: receive presence updates and forwarded messages from peer server. +//! Federation route handlers: WS endpoint for peer servers + status. use axum::{ - body::Bytes, - extract::State, - http::{HeaderMap, StatusCode}, + extract::{State, WebSocketUpgrade, ws::{Message, WebSocket}}, response::IntoResponse, - routing::post, + routing::get, Json, Router, }; +use futures_util::{SinkExt, StreamExt}; use crate::state::AppState; pub fn routes() -> Router { Router::new() - .route("/federation/presence", post(receive_presence)) - .route("/federation/forward", post(receive_forward)) - .route("/federation/status", axum::routing::get(federation_status)) + .route("/federation/ws", get(federation_ws_handler)) + .route("/federation/status", get(federation_status)) } -/// Extract and validate the federation token from headers. -fn validate_request(state: &AppState, headers: &HeaderMap, body: &[u8]) -> Result<(), (StatusCode, String)> { - let federation = state.federation.as_ref() - .ok_or((StatusCode::SERVICE_UNAVAILABLE, "federation not configured".to_string()))?; - - let token = headers.get("x-federation-token") - .and_then(|v| v.to_str().ok()) - .ok_or((StatusCode::UNAUTHORIZED, "missing X-Federation-Token header".to_string()))?; - - if !crate::federation::verify_token(&federation.config.shared_secret, body, token) { - return Err((StatusCode::UNAUTHORIZED, "invalid federation token".to_string())); - } - - Ok(()) -} - -/// Receive presence announcement from peer. -/// POST /v1/federation/presence -/// Body: { "server_id": "...", "fingerprints": [...], "timestamp": ... } -async fn receive_presence( +/// WebSocket endpoint for incoming peer server connections. +async fn federation_ws_handler( + ws: WebSocketUpgrade, State(state): State, - headers: HeaderMap, - body: Bytes, ) -> impl IntoResponse { - if let Err((status, msg)) = validate_request(&state, &headers, &body) { - return (status, Json(serde_json::json!({ "error": msg }))).into_response(); - } + ws.on_upgrade(move |socket| handle_peer_ws(socket, state)) +} - let parsed: serde_json::Value = match serde_json::from_slice(&body) { - Ok(v) => v, - Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("invalid JSON: {}", e) }))).into_response(), +/// Handle an incoming federation WS connection from the peer server. +async fn handle_peer_ws(socket: WebSocket, state: AppState) { + let (mut ws_tx, mut ws_rx) = socket.split(); + + // First message must be auth + let secret = match state.federation { + Some(ref f) => f.config.shared_secret.clone(), + None => { + tracing::warn!("Federation: WS connection rejected -- federation not configured"); + return; + } }; - let fingerprints: Vec = parsed.get("fingerprints") - .and_then(|v| v.as_array()) - .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect()) - .unwrap_or_default(); + // Wait for auth message (5 second timeout) + let auth_msg = tokio::time::timeout( + std::time::Duration::from_secs(5), + ws_rx.next(), + ).await; - let server_id = parsed.get("server_id").and_then(|v| v.as_str()).unwrap_or("unknown"); + let peer_id = match auth_msg { + Ok(Some(Ok(Message::Text(text)))) => { + if let Ok(parsed) = serde_json::from_str::(&text) { + let msg_type = parsed.get("type").and_then(|v| v.as_str()).unwrap_or(""); + let msg_secret = parsed.get("secret").and_then(|v| v.as_str()).unwrap_or(""); + let server_id = parsed.get("server_id").and_then(|v| v.as_str()).unwrap_or("unknown"); + if msg_type != "auth" || msg_secret != secret { + tracing::warn!("Federation: WS auth failed from {}", server_id); + return; + } + tracing::info!("Federation: peer {} authenticated via WS", server_id); + server_id.to_string() + } else { + tracing::warn!("Federation: invalid auth JSON"); + return; + } + } + _ => { + tracing::warn!("Federation: no auth message received within timeout"); + return; + } + }; + + // Process incoming messages from the authenticated peer + while let Some(Ok(msg)) = ws_rx.next().await { + if let Message::Text(text) = msg { + let parsed: serde_json::Value = match serde_json::from_str(&text) { + Ok(v) => v, + Err(_) => continue, + }; + + let msg_type = parsed.get("type").and_then(|v| v.as_str()).unwrap_or(""); + + match msg_type { + "presence" => { + let fingerprints: Vec = parsed.get("fingerprints") + .and_then(|v| v.as_array()) + .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect()) + .unwrap_or_default(); + let count = fingerprints.len(); + + if let Some(ref federation) = state.federation { + let mut rp = federation.remote_presence.lock().await; + rp.fingerprints = fingerprints.into_iter().collect(); + rp.last_updated = chrono::Utc::now().timestamp(); + rp.connected = true; + } + tracing::debug!("Federation WS: {} announced {} fingerprints", peer_id, count); + + // Send our presence back + if let Some(ref federation) = state.federation { + let fps: Vec = { + let conns = state.connections.lock().await; + conns.keys().cloned().collect() + }; + let reply = serde_json::json!({ + "type": "presence", + "server_id": federation.config.server_id, + "fingerprints": fps, + }); + let _ = ws_tx.send(Message::Text(serde_json::to_string(&reply).unwrap_or_default())).await; + } + } + "forward" => { + let to = parsed.get("to").and_then(|v| v.as_str()).unwrap_or(""); + let message_b64 = parsed.get("message").and_then(|v| v.as_str()).unwrap_or(""); + let from_server = parsed.get("from_server").and_then(|v| v.as_str()).unwrap_or("?"); + + if let Ok(message) = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, message_b64) { + let delivered = state.push_to_client(to, &message).await; + if !delivered { + let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4()); + let _ = state.db.messages.insert(key.as_bytes(), message.as_slice()); + tracing::info!("Federation WS: queued from {} for offline {}", from_server, to); + } else { + tracing::debug!("Federation WS: delivered from {} to {}", from_server, to); + } + } + } + _ => {} + } + } + } + + // Peer disconnected if let Some(ref federation) = state.federation { let mut rp = federation.remote_presence.lock().await; - let count = fingerprints.len(); - rp.fingerprints = fingerprints.into_iter().collect(); - rp.last_updated = chrono::Utc::now().timestamp(); - tracing::debug!("Federation: received {} fingerprints from {}", count, server_id); + rp.connected = false; + rp.fingerprints.clear(); } - - (StatusCode::OK, Json(serde_json::json!({ "ok": true }))).into_response() -} - -/// Receive a forwarded message from peer. -/// POST /v1/federation/forward -/// Body: { "to": "fingerprint", "message": "base64...", "from_server": "..." } -async fn receive_forward( - State(state): State, - headers: HeaderMap, - body: Bytes, -) -> impl IntoResponse { - if let Err((status, msg)) = validate_request(&state, &headers, &body) { - return (status, Json(serde_json::json!({ "error": msg }))).into_response(); - } - - let parsed: serde_json::Value = match serde_json::from_slice(&body) { - Ok(v) => v, - Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("invalid JSON: {}", e) }))).into_response(), - }; - - let to = match parsed.get("to").and_then(|v| v.as_str()) { - Some(fp) => fp.to_string(), - None => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "missing 'to' field" }))).into_response(), - }; - - let message_b64 = match parsed.get("message").and_then(|v| v.as_str()) { - Some(m) => m.to_string(), - None => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "missing 'message' field" }))).into_response(), - }; - - let message = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &message_b64) { - Ok(m) => m, - Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("invalid base64: {}", e) }))).into_response(), - }; - - let from_server = parsed.get("from_server").and_then(|v| v.as_str()).unwrap_or("unknown"); - - // Try to deliver locally - let delivered = state.push_to_client(&to, &message).await; - if !delivered { - // Queue for later pickup - let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4()); - let _ = state.db.messages.insert(key.as_bytes(), message.as_slice()); - tracing::info!("Federation: queued forwarded message from {} for offline user {}", from_server, to); - } else { - tracing::info!("Federation: delivered forwarded message from {} to {}", from_server, to); - } - - (StatusCode::OK, Json(serde_json::json!({ "ok": true, "delivered": delivered }))).into_response() + tracing::info!("Federation WS: peer {} disconnected", peer_id); } /// Federation health status. -/// GET /v1/federation/status async fn federation_status( State(state): State, ) -> Json { @@ -130,15 +149,13 @@ async fn federation_status( "server_id": federation.config.server_id, "peer_id": federation.config.peer.id, "peer_url": federation.config.peer.url, - "peer_alive": rp.is_alive(federation.config.presence_interval_secs), + "peer_connected": rp.connected, "remote_clients": rp.fingerprints.len(), "last_sync": rp.last_updated, })) } None => { - Json(serde_json::json!({ - "enabled": false, - })) + Json(serde_json::json!({ "enabled": false })) } } } diff --git a/warzone/docs/ARCHITECTURE.md b/warzone/docs/ARCHITECTURE.md index 8ff2522..8a67596 100644 --- a/warzone/docs/ARCHITECTURE.md +++ b/warzone/docs/ARCHITECTURE.md @@ -9,51 +9,14 @@ ```mermaid graph TB - subgraph Clients - CLI["CLI Client
(warzone)"] - TUI["TUI Client
(ratatui)"] - WEB["Web Client
(WASM)"] - end - - subgraph Protocol["warzone-protocol (shared library)"] - ID["Identity
Ed25519 + X25519"] - X3DH["X3DH
Key Agreement"] - DR["Double Ratchet
Forward Secrecy"] - SK["Sender Keys
Group Encryption"] - WIRE["WireMessage
8 variants"] - end - - subgraph ServerA["warzone-server (Alpha)"] - API_A["REST API
(axum)"] - WS_A["WebSocket
Relay"] - AUTH_A["Auth
Middleware"] - CALLS_A["Call State
Manager"] - FED_A["Federation
Module"] - DB_A["sled DB
7 trees"] - end - - subgraph ServerB["warzone-server (Bravo)"] - API_B["REST API"] - WS_B["WebSocket Relay"] - FED_B["Federation Module"] - DB_B["sled DB"] - end - - subgraph WZP["WarzonePhone"] - RELAY["WZP Relay
(QUIC SFU)"] - BRIDGE["Web Bridge
(audio)"] - end - - CLI --> Protocol - TUI --> Protocol - WEB --> Protocol - Protocol -->|"HTTP / WS"| ServerA - Protocol -->|"HTTP / WS"| ServerB - - FED_A <-->|"HTTP REST
HMAC-SHA256"| FED_B - - ServerA -->|"Call Signaling
Token Validation"| WZP - ServerB -->|"Call Signaling"| WZP + CLI[CLI Client] --> PROTO[warzone-protocol] + TUI[TUI Client] --> PROTO + WEB[Web Client WASM] --> PROTO + PROTO -->|HTTP / WS| SRVA[Server Alpha] + PROTO -->|HTTP / WS| SRVB[Server Bravo] + SRVA <-->|Federation WS| SRVB + SRVA -->|Call Signaling| WZP[WarzonePhone Relay] + SRVB -->|Call Signaling| WZP ``` --- @@ -244,7 +207,7 @@ Offer | Answer | IceCandidate | Hangup | Reject | Ringing | Busy | CLI/TUI | WS binary | 64 hex chars (recipient fp) + raw bincode | | CLI/TUI | HTTP POST | JSON envelope with bincode as byte array | | Web | WS JSON | `{"to": "fingerprint", "message": [bytes]}` | -| Server↔Server | HTTP POST | JSON with base64 message + HMAC auth header | +| Server↔Server | WS JSON | JSON frames over persistent federation WS | --- @@ -339,19 +302,13 @@ sequenceDiagram ```mermaid graph LR - subgraph ServerAlpha["Server Alpha"] - CA["Client A
Client B"] - FHA["Federation Handle"] + subgraph Alpha[Server Alpha] + CA[Client A + B] end - - subgraph ServerBravo["Server Bravo"] - CC["Client C
Client D"] - FHB["Federation Handle"] + subgraph Bravo[Server Bravo] + CC[Client C + D] end - - FHA <-->|"Presence sync
(every 5s)"| FHB - FHA -->|"Forward message
(HTTP POST)"| FHB - FHB -->|"Forward message
(HTTP POST)"| FHA + Alpha <-->|Persistent WS\nPresence + Forward| Bravo ``` ### Configuration @@ -365,8 +322,7 @@ Each server has a `federation.json`: "peer": { "id": "bravo", "url": "http://10.0.0.2:7700" - }, - "presence_interval_secs": 5 + } } ``` @@ -374,41 +330,40 @@ Start with: `warzone-server --federation federation.json` ### Presence Sync -Every 5 seconds, each server POSTs its connected fingerprint list to the peer: +On startup each server opens a persistent WebSocket to its peer and authenticates with the shared secret. Presence updates and message forwards flow over this single connection: ``` -POST /v1/federation/presence -X-Federation-Token: SHA-256(secret || body) -{ "server_id": "alpha", "fingerprints": ["aabb...", "ccdd..."], "timestamp": ... } +WS /v1/federation/ws +Auth: {"type":"auth","secret":"HMAC(shared_secret)"} +Presence: {"type":"presence","fingerprints":["aabb...","ccdd..."]} +Forward: {"type":"forward","to":"","message":""} ``` -The receiving server replaces its remote presence set entirely. If 3 intervals pass without a sync, the remote set is cleared (peer assumed down). +The receiving server replaces its remote presence set on each presence frame. If the WebSocket drops, the server auto-reconnects every 3 seconds and re-sends its full presence list. ### Message Forwarding ```mermaid sequenceDiagram - participant A as Client A (Alpha) participant SA as Server Alpha participant SB as Server Bravo - participant C as Client C (Bravo) - A->>SA: Send message to C - SA->>SA: push_to_client(C) — not local - SA->>SA: remote_presence.contains(C) — yes - SA->>SB: POST /v1/federation/forward
X-Federation-Token: HMAC - SB->>SB: Verify HMAC - SB->>C: push_to_client(C) via WS - SB->>SA: { "delivered": true } + Note over SA,SB: Persistent WS connection + SA->>SB: {"type":"auth","secret":"..."} + SA->>SB: {"type":"presence","fingerprints":["A","B"]} + SB->>SA: {"type":"presence","fingerprints":["C","D"]} + + Note over SA: Client A sends message to C + SA->>SB: {"type":"forward","to":"C","message":"base64..."} + Note over SB: Deliver to Client C via local WS ``` ### Degradation | Scenario | Behavior | |----------|----------| -| Peer unreachable | Message queued locally, retried on next connection | -| Presence stale (>15s) | Remote fingerprints cleared, treated as offline | -| Peer restarts | Presence repopulates within 5 seconds | +| WS disconnected | Auto-reconnect every 3s, messages queue locally | +| Peer restarts | Presence repopulates on WS reconnect | | HMAC mismatch | Request rejected with 401 | --- @@ -632,15 +587,14 @@ sequenceDiagram participant SB as Server Bravo participant C as Client C (Bravo) - Note over SA,SB: Presence sync (every 5s) - SA->>SB: POST /federation/presence [A, B] - SB->>SA: POST /federation/presence [C, D] + Note over SA,SB: Persistent WS between servers + SA->>SB: presence ["A","B"] + SB->>SA: presence ["C","D"] A->>SA: Message for C SA->>SA: Not local, C in remote presence - SA->>SB: POST /federation/forward (HMAC auth) + SA->>SB: forward to C via federation WS SB->>C: Push via local WS - SB->>SA: { "delivered": true } ``` ---