v0.0.21: TUI overhaul, WZP call infrastructure, security hardening, federation

TUI:
- Split 1,756-line app.rs monolith into 7 modules (types, draw, commands, input, file_transfer, network, mod)
- Message timestamps [HH:MM], scrolling (PageUp/Down/arrows), connection status dot, unread badge
- /help command, terminal bell on incoming DM, /devices + /kick commands
- 44 unit tests (types, input, draw with TestBackend)

Server — WZP Call Infrastructure (FC-2/3/5/6/7/10):
- Call state management (CallState, CallStatus, active_calls, calls + missed_calls sled trees)
- WS call signal awareness (Offer/Answer/Hangup update state, missed call on offline)
- Group call endpoint (POST /groups/:name/call with SHA-256 room ID, fan-out)
- Presence API (GET /presence/:fp, POST /presence/batch)
- Missed call flush on WS reconnect
- WZP relay config + CORS

Server — Security (FC-P1):
- Auth enforcement middleware (AuthFingerprint extractor on 13 write handlers)
- Session auto-recovery (delete corrupted ratchet, show [session reset])
- WS connection cap (5/fingerprint) + global concurrency limit (200)
- Device management (GET /devices, POST /devices/:id/kick, POST /devices/revoke-all)

Server — Federation:
- Two-server federation via JSON config (--federation flag)
- Periodic presence sync (every 5s, full-state, self-healing)
- Message forwarding via HTTP POST with SHA-256(secret||body) auth
- Graceful degradation (peer down = queue locally)
- deliver_or_queue() replaces push-or-queue in ws.rs + messages.rs

Client — Group Messaging:
- SenderKeyDistribution storage + GroupSenderKey decryption in TUI
- sender_keys sled tree in LocalDb

WASM:
- All 8 WireMessage variants handled (no more "unsupported")
- decrypt_group_message() + create_sender_key_from_distribution() exports
- CallSignal parsing with signal_type mapping

Docs:
- ARCHITECTURE.md rewritten with Mermaid diagrams
- README.md created
- TASK_PLAN.md with FC-P{phase}-T{task} naming
- PROGRESS.md updated to v0.0.21

WZP submodule updated to 6f4e8eb (IAX2 trunking, adaptive quality, metrics, all S-tasks done)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-28 16:45:58 +04:00
parent 4a4fa9fab4
commit 3e0889e5dc
36 changed files with 5237 additions and 2232 deletions

View File

@@ -0,0 +1,84 @@
//! Auth enforcement middleware: axum extractor that validates bearer tokens.
//!
//! Reads `Authorization: Bearer <token>` from request headers, validates via
//! [`crate::routes::auth::validate_token`], and returns the authenticated
//! fingerprint or a 401 rejection.
use axum::{
extract::FromRequestParts,
http::{request::Parts, StatusCode},
response::{IntoResponse, Response},
};
use crate::state::AppState;
/// Extractor that validates a bearer token and provides the authenticated fingerprint.
///
/// Place this as the **first** parameter in any handler that requires authentication.
/// The extractor will reject the request with 401 if the token is missing or invalid.
///
/// # Example
///
/// ```ignore
/// async fn my_handler(
/// auth: AuthFingerprint,
/// State(state): State<AppState>,
/// ) -> impl IntoResponse {
/// let fp = auth.fingerprint; // guaranteed valid
/// // ...
/// }
/// ```
pub struct AuthFingerprint {
pub fingerprint: String,
}
#[axum::async_trait]
impl FromRequestParts<AppState> for AuthFingerprint {
type Rejection = AuthError;
async fn from_request_parts(
parts: &mut Parts,
state: &AppState,
) -> Result<Self, Self::Rejection> {
let header = parts
.headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.map(|s| s.trim().to_string());
let token = match header {
Some(t) if !t.is_empty() => t,
_ => return Err(AuthError::MissingToken),
};
match crate::routes::auth::validate_token(&state.db.tokens, &token) {
Some(fingerprint) => Ok(AuthFingerprint { fingerprint }),
None => Err(AuthError::InvalidToken),
}
}
}
/// Rejection type for [`AuthFingerprint`] extractor failures.
pub enum AuthError {
/// No `Authorization: Bearer <token>` header was present (or it was empty).
MissingToken,
/// The token was present but did not pass validation (expired or unknown).
InvalidToken,
}
impl IntoResponse for AuthError {
fn into_response(self) -> Response {
let (status, msg) = match self {
AuthError::MissingToken => (
StatusCode::UNAUTHORIZED,
"missing or empty Authorization: Bearer <token> header",
),
AuthError::InvalidToken => (
StatusCode::UNAUTHORIZED,
"invalid or expired token",
),
};
(status, axum::Json(serde_json::json!({ "error": msg }))).into_response()
}
}

View File

@@ -6,6 +6,8 @@ pub struct Database {
pub groups: sled::Tree,
pub aliases: sled::Tree,
pub tokens: sled::Tree,
pub calls: sled::Tree,
pub missed_calls: sled::Tree,
_db: sled::Db,
}
@@ -17,12 +19,16 @@ impl Database {
let groups = db.open_tree("groups")?;
let aliases = db.open_tree("aliases")?;
let tokens = db.open_tree("tokens")?;
let calls = db.open_tree("calls")?;
let missed_calls = db.open_tree("missed_calls")?;
Ok(Database {
keys,
messages,
groups,
aliases,
tokens,
calls,
missed_calls,
_db: db,
})
}

View File

@@ -0,0 +1,212 @@
//! Federation: two-server message relay with shared-secret authentication.
//!
//! Each server periodically announces its connected clients to the peer.
//! When a message is destined for a remote client, it's forwarded via HTTP.
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use sha2::{Sha256, Digest};
/// Federation configuration loaded from JSON.
#[derive(Clone, Debug, serde::Deserialize)]
pub struct FederationConfig {
pub server_id: String,
pub shared_secret: String,
pub peer: PeerConfig,
#[serde(default = "default_interval")]
pub presence_interval_secs: u64,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct PeerConfig {
pub id: String,
pub url: String,
}
fn default_interval() -> u64 { 5 }
/// Load federation config from a JSON file. Returns None if path is empty.
pub fn load_config(path: &str) -> anyhow::Result<FederationConfig> {
let data = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("failed to read federation config '{}': {}", path, e))?;
let config: FederationConfig = serde_json::from_str(&data)
.map_err(|e| anyhow::anyhow!("invalid federation config: {}", e))?;
Ok(config)
}
/// Remote presence: which fingerprints are on the peer server.
#[derive(Clone, Debug)]
pub struct RemotePresence {
pub peer_url: String,
pub peer_id: String,
pub fingerprints: HashSet<String>,
pub last_updated: i64,
}
impl RemotePresence {
pub fn new(peer_url: String, peer_id: String) -> Self {
RemotePresence {
peer_url,
peer_id,
fingerprints: HashSet::new(),
last_updated: 0,
}
}
/// Check if a fingerprint is on the remote server.
pub fn contains(&self, fp: &str) -> bool {
self.fingerprints.contains(fp)
}
/// Is the peer still alive? (heard from within 3 intervals)
pub fn is_alive(&self, interval_secs: u64) -> bool {
let now = chrono::Utc::now().timestamp();
now - self.last_updated < (interval_secs as i64 * 3)
}
}
/// Handle for communicating with the federation peer.
#[derive(Clone)]
pub struct FederationHandle {
pub config: FederationConfig,
pub client: reqwest::Client,
pub remote_presence: Arc<Mutex<RemotePresence>>,
}
impl FederationHandle {
pub fn new(config: FederationConfig) -> Self {
let remote_presence = Arc::new(Mutex::new(RemotePresence::new(
config.peer.url.clone(),
config.peer.id.clone(),
)));
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.expect("failed to build HTTP client");
FederationHandle { config, client, remote_presence }
}
/// Check if a fingerprint is known to be on the peer server.
pub async fn is_remote(&self, fp: &str) -> bool {
let rp = self.remote_presence.lock().await;
rp.is_alive(self.config.presence_interval_secs) && rp.contains(fp)
}
/// Forward a message to the peer server for delivery.
/// Returns true if the peer accepted it.
pub async fn forward_message(&self, to_fp: &str, message: &[u8]) -> bool {
let url = format!("{}/v1/federation/forward", self.config.peer.url);
let body = serde_json::json!({
"to": to_fp,
"message": base64::Engine::encode(&base64::engine::general_purpose::STANDARD, message),
"from_server": self.config.server_id,
});
let body_str = serde_json::to_string(&body).unwrap_or_default();
let token = compute_token(&self.config.shared_secret, body_str.as_bytes());
match self.client.post(&url)
.header("X-Federation-Token", &token)
.header("Content-Type", "application/json")
.body(body_str)
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
tracing::debug!("Federation: forwarded message to {} for {}", self.config.peer.id, to_fp);
true
}
Ok(resp) => {
tracing::warn!("Federation: peer {} rejected forward: {}", self.config.peer.id, resp.status());
false
}
Err(e) => {
tracing::warn!("Federation: failed to forward to {}: {}", self.config.peer.id, e);
false
}
}
}
/// Send our local presence to the peer.
pub async fn announce_presence(&self, fingerprints: Vec<String>) -> bool {
let url = format!("{}/v1/federation/presence", self.config.peer.url);
let body = serde_json::json!({
"server_id": self.config.server_id,
"fingerprints": fingerprints,
"timestamp": chrono::Utc::now().timestamp(),
});
let body_str = serde_json::to_string(&body).unwrap_or_default();
let token = compute_token(&self.config.shared_secret, body_str.as_bytes());
match self.client.post(&url)
.header("X-Federation-Token", &token)
.header("Content-Type", "application/json")
.body(body_str)
.send()
.await
{
Ok(resp) if resp.status().is_success() => true,
Ok(resp) => {
tracing::warn!("Federation: presence announce to {} failed: {}", self.config.peer.id, resp.status());
false
}
Err(e) => {
tracing::warn!("Federation: presence announce to {} error: {}", self.config.peer.id, e);
false
}
}
}
}
/// Background task: periodically sync presence with peer.
pub async fn presence_sync_loop(
handle: FederationHandle,
connections: crate::state::Connections,
) {
let interval = std::time::Duration::from_secs(handle.config.presence_interval_secs);
tracing::info!(
"Federation: presence sync started (peer={}, interval={}s)",
handle.config.peer.id, handle.config.presence_interval_secs
);
loop {
// Collect local fingerprints
let fps: Vec<String> = {
let conns = connections.lock().await;
conns.keys().cloned().collect()
};
// Announce to peer
let ok = handle.announce_presence(fps.clone()).await;
if ok {
tracing::debug!("Federation: announced {} fingerprints to {}", fps.len(), handle.config.peer.id);
}
// Clear stale remote presence if peer hasn't responded
{
let mut rp = handle.remote_presence.lock().await;
if !rp.is_alive(handle.config.presence_interval_secs) && !rp.fingerprints.is_empty() {
tracing::warn!("Federation: peer {} stale — clearing remote presence ({} fps)",
handle.config.peer.id, rp.fingerprints.len());
rp.fingerprints.clear();
}
}
tokio::time::sleep(interval).await;
}
}
/// Compute an auth token: SHA-256(secret || body). Simple HMAC-like construction.
pub fn compute_token(secret: &str, body: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(secret.as_bytes());
hasher.update(body);
hex::encode(hasher.finalize())
}
/// Verify an auth token.
pub fn verify_token(secret: &str, body: &[u8], token: &str) -> bool {
let expected = compute_token(secret, body);
// Constant-time comparison to prevent timing attacks
expected.len() == token.len() && expected.as_bytes().iter().zip(token.as_bytes()).all(|(a, b)| a == b)
}

View File

@@ -1,5 +1,7 @@
pub mod auth_middleware;
pub mod config;
pub mod db;
pub mod errors;
pub mod federation;
pub mod routes;
pub mod state;

View File

@@ -1,8 +1,10 @@
use clap::Parser;
pub mod auth_middleware;
mod config;
mod db;
mod errors;
mod federation;
mod routes;
mod state;
@@ -16,6 +18,10 @@ struct Cli {
/// Database directory
#[arg(short, long, default_value = "./warzone-data")]
data_dir: String,
/// Federation config file (JSON). Enables server-to-server message relay.
#[arg(short, long)]
federation: Option<String>,
}
#[tokio::main]
@@ -30,11 +36,38 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("Warzone server starting on {}", cli.bind);
let state = state::AppState::new(&cli.data_dir)?;
let mut state = state::AppState::new(&cli.data_dir)?;
// Load federation config if provided
if let Some(ref fed_path) = cli.federation {
let fed_config = federation::load_config(fed_path)?;
tracing::info!(
"Federation enabled: server_id={}, peer={}@{}",
fed_config.server_id, fed_config.peer.id, fed_config.peer.url
);
let handle = federation::FederationHandle::new(fed_config);
state.federation = Some(handle);
}
// Spawn federation presence sync if enabled
if let Some(ref federation) = state.federation {
let handle = federation.clone();
let connections = state.connections.clone();
tokio::spawn(async move {
federation::presence_sync_loop(handle, connections).await;
});
}
let cors = tower_http::cors::CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods(tower_http::cors::Any)
.allow_headers(tower_http::cors::Any);
let app = axum::Router::new()
.merge(routes::web_router())
.nest("/v1", routes::router())
.layer(cors)
.layer(tower::limit::ConcurrencyLimitLayer::new(200))
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(state);

View File

@@ -112,6 +112,7 @@ struct RegisterRequest {
/// - Expired aliases (past grace period) can be reclaimed by anyone
/// - Expired aliases (within grace period) can only be reclaimed by recovery key
async fn register_alias(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<RegisterRequest>,
) -> AppResult<Json<serde_json::Value>> {
@@ -190,6 +191,7 @@ struct RecoverRequest {
/// Recover an alias using the recovery key. Works even if expired (within or past grace).
async fn recover_alias(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<RecoverRequest>,
) -> AppResult<Json<serde_json::Value>> {
@@ -244,6 +246,7 @@ struct RenewRequest {
/// Renew/heartbeat — resets the TTL. Called automatically on activity.
async fn renew_alias(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<RenewRequest>,
) -> AppResult<Json<serde_json::Value>> {
@@ -347,6 +350,7 @@ struct UnregisterRequest {
/// Remove your own alias.
async fn unregister_alias(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<UnregisterRequest>,
) -> AppResult<Json<serde_json::Value>> {
@@ -381,6 +385,7 @@ struct AdminRemoveRequest {
/// Admin: remove any alias.
async fn admin_remove_alias(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<AdminRemoveRequest>,
) -> AppResult<Json<serde_json::Value>> {

View File

@@ -0,0 +1,233 @@
use axum::{
extract::{Path, Query, State},
routing::{get, post},
Json, Router,
};
use serde::Deserialize;
use sha2::{Sha256, Digest};
use crate::errors::AppResult;
use crate::state::{AppState, CallState, CallStatus};
pub fn routes() -> Router<AppState> {
Router::new()
.route("/calls/initiate", post(initiate_call))
.route("/calls/:id", get(get_call))
.route("/calls/:id/end", post(end_call))
.route("/calls/active", get(active_calls))
.route("/calls/missed", post(get_missed_calls))
.route("/groups/:name/call", post(initiate_group_call))
}
fn normalize_fp(fp: &str) -> String {
fp.chars().filter(|c| c.is_ascii_hexdigit()).collect::<String>().to_lowercase()
}
#[derive(Deserialize)]
struct InitiateRequest {
caller: String,
callee: String,
}
async fn initiate_call(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<InitiateRequest>,
) -> AppResult<Json<serde_json::Value>> {
let call_id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().timestamp();
let call = CallState {
call_id: call_id.clone(),
caller_fp: normalize_fp(&req.caller),
callee_fp: normalize_fp(&req.callee),
group_name: None,
room_id: None,
status: CallStatus::Ringing,
created_at: now,
answered_at: None,
ended_at: None,
};
state.active_calls.lock().await.insert(call_id.clone(), call.clone());
state.db.calls.insert(call_id.as_bytes(), serde_json::to_vec(&call)?.as_slice())?;
tracing::info!("Call initiated: {} -> {}", call.caller_fp, call.callee_fp);
Ok(Json(serde_json::json!({
"call_id": call_id,
"status": "ringing",
})))
}
async fn get_call(
State(state): State<AppState>,
Path(id): Path<String>,
) -> AppResult<Json<serde_json::Value>> {
// Try in-memory first, then DB
if let Some(call) = state.active_calls.lock().await.get(&id) {
return Ok(Json(serde_json::to_value(call)?));
}
if let Some(data) = state.db.calls.get(id.as_bytes())? {
let call: CallState = serde_json::from_slice(&data)?;
return Ok(Json(serde_json::to_value(&call)?));
}
Ok(Json(serde_json::json!({ "error": "call not found" })))
}
#[derive(Deserialize)]
struct EndCallRequest {
fingerprint: String,
}
async fn end_call(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Path(id): Path<String>,
Json(req): Json<EndCallRequest>,
) -> AppResult<Json<serde_json::Value>> {
let now = chrono::Utc::now().timestamp();
let _fp = normalize_fp(&req.fingerprint);
let mut calls = state.active_calls.lock().await;
if let Some(mut call) = calls.remove(&id) {
call.status = CallStatus::Ended;
call.ended_at = Some(now);
state.db.calls.insert(id.as_bytes(), serde_json::to_vec(&call)?.as_slice())?;
return Ok(Json(serde_json::json!({ "ok": true, "call_id": id })));
}
Ok(Json(serde_json::json!({ "error": "call not found or already ended" })))
}
#[derive(Deserialize)]
struct ActiveQuery {
fingerprint: Option<String>,
}
async fn active_calls(
State(state): State<AppState>,
Query(q): Query<ActiveQuery>,
) -> AppResult<Json<serde_json::Value>> {
let calls = state.active_calls.lock().await;
let filtered: Vec<&CallState> = match q.fingerprint {
Some(ref fp) => {
let fp = normalize_fp(fp);
calls.values().filter(|c| c.caller_fp == fp || c.callee_fp == fp).collect()
}
None => calls.values().collect(),
};
Ok(Json(serde_json::json!({ "calls": filtered })))
}
#[derive(Deserialize)]
struct MissedRequest {
fingerprint: String,
}
async fn get_missed_calls(
State(state): State<AppState>,
Json(req): Json<MissedRequest>,
) -> AppResult<Json<serde_json::Value>> {
let fp = normalize_fp(&req.fingerprint);
let prefix = format!("missed:{}", fp);
let mut missed = Vec::new();
let mut keys = Vec::new();
for (key, value) in state.db.missed_calls.scan_prefix(prefix.as_bytes()).flatten() {
if let Ok(record) = serde_json::from_slice::<serde_json::Value>(&value) {
missed.push(record);
keys.push(key);
}
}
// Delete after reading
for key in &keys {
let _ = state.db.missed_calls.remove(key);
}
Ok(Json(serde_json::json!({ "missed_calls": missed })))
}
// --- FC-5: Group call ---
#[derive(Deserialize)]
struct GroupCallRequest {
fingerprint: String,
}
/// Deterministic room ID from group name: hex(SHA-256("featherchat-group:" + name)[:16])
fn hash_room_name(group_name: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(format!("featherchat-group:{}", group_name).as_bytes());
let hash = hasher.finalize();
hex::encode(&hash[..16])
}
async fn initiate_group_call(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Path(name): Path<String>,
Json(req): Json<GroupCallRequest>,
) -> AppResult<Json<serde_json::Value>> {
let caller_fp = normalize_fp(&req.fingerprint);
// Load group
let group_data = match state.db.groups.get(name.as_bytes())? {
Some(d) => d,
None => return Ok(Json(serde_json::json!({ "error": "group not found" }))),
};
let group: serde_json::Value = serde_json::from_slice(&group_data)?;
let members: Vec<String> = group.get("members")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect())
.unwrap_or_default();
// Verify caller is a member
if !members.contains(&caller_fp) {
return Ok(Json(serde_json::json!({ "error": "not a member of this group" })));
}
let room_id = hash_room_name(&name);
let call_id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().timestamp();
// Create call state
let call = CallState {
call_id: call_id.clone(),
caller_fp: caller_fp.clone(),
callee_fp: "group".to_string(),
group_name: Some(name.clone()),
room_id: Some(room_id.clone()),
status: CallStatus::Ringing,
created_at: now,
answered_at: None,
ended_at: None,
};
state.active_calls.lock().await.insert(call_id.clone(), call.clone());
state.db.calls.insert(call_id.as_bytes(), serde_json::to_vec(&call)?.as_slice())?;
// Fan out CallSignal::Offer to all online members (except caller)
let offer = warzone_protocol::message::WireMessage::CallSignal {
id: call_id.clone(),
sender_fingerprint: caller_fp.clone(),
signal_type: warzone_protocol::message::CallSignalType::Offer,
payload: serde_json::json!({ "room_id": room_id, "group": name }).to_string(),
target: format!("#{}", name),
};
let encoded = bincode::serialize(&offer)?;
let mut delivered = 0;
for member in &members {
if *member == caller_fp { continue; }
if state.push_to_client(member, &encoded).await {
delivered += 1;
} else {
// Queue for offline members
let key = format!("queue:{}:{}", member, uuid::Uuid::new_v4());
state.db.messages.insert(key.as_bytes(), encoded.as_slice())?;
}
}
tracing::info!("Group call #{}: room={}, caller={}, notified={}/{}",
name, room_id, caller_fp, delivered, members.len() - 1);
Ok(Json(serde_json::json!({
"call_id": call_id,
"room_id": room_id,
"group": name,
"members_notified": delivered,
"members_total": members.len() - 1,
})))
}

View File

@@ -0,0 +1,102 @@
use axum::{
extract::State,
routing::{get, post},
Json, Router,
};
use crate::auth_middleware::AuthFingerprint;
use crate::errors::AppResult;
use crate::state::AppState;
pub fn routes() -> Router<AppState> {
Router::new()
.route("/devices", get(list_devices))
.route("/devices/:id/kick", post(kick_device))
.route("/devices/revoke-all", post(revoke_all))
}
/// List active WS connections for the authenticated user.
async fn list_devices(
auth: AuthFingerprint,
State(state): State<AppState>,
) -> AppResult<Json<serde_json::Value>> {
let devices = state.list_devices(&auth.fingerprint).await;
let list: Vec<serde_json::Value> = devices
.iter()
.map(|(id, connected_at)| {
serde_json::json!({
"device_id": id,
"connected_at": connected_at,
})
})
.collect();
let count = list.len();
Ok(Json(serde_json::json!({
"fingerprint": auth.fingerprint,
"devices": list,
"count": count,
})))
}
/// Kick a specific device by ID. Requires auth -- only the device owner can kick.
async fn kick_device(
auth: AuthFingerprint,
State(state): State<AppState>,
axum::extract::Path(device_id): axum::extract::Path<String>,
) -> AppResult<Json<serde_json::Value>> {
let kicked = state.kick_device(&auth.fingerprint, &device_id).await;
if kicked {
tracing::info!("Device {} kicked by {}", device_id, auth.fingerprint);
Ok(Json(serde_json::json!({ "ok": true, "kicked": device_id })))
} else {
Ok(Json(serde_json::json!({ "error": "device not found" })))
}
}
/// Revoke all sessions except the current one. Panic button.
async fn revoke_all(
auth: AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<serde_json::Value>,
) -> AppResult<Json<serde_json::Value>> {
let keep_device = req
.get("keep_device_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let removed = state
.revoke_all_except(&auth.fingerprint, keep_device)
.await;
// Also clear all tokens for this fingerprint except the current one
// Scan tokens tree for this fingerprint
let mut tokens_to_remove = Vec::new();
for item in state.db.tokens.iter().flatten() {
if let Ok(val) = serde_json::from_slice::<serde_json::Value>(&item.1) {
if val.get("fingerprint").and_then(|v| v.as_str()) == Some(&auth.fingerprint) {
tokens_to_remove.push(item.0.clone());
}
}
}
// Only remove tokens if we actually revoked devices
let tokens_cleared = if removed > 0 {
let count = tokens_to_remove.len();
for key in &tokens_to_remove {
let _ = state.db.tokens.remove(key);
}
count
} else {
0
};
tracing::info!(
"Revoke-all for {}: {} devices removed, {} tokens cleared",
auth.fingerprint,
removed,
tokens_cleared,
);
Ok(Json(serde_json::json!({
"ok": true,
"devices_removed": removed,
"tokens_cleared": tokens_cleared,
})))
}

View File

@@ -0,0 +1,144 @@
//! Federation route handlers: receive presence updates and forwarded messages from peer server.
use axum::{
body::Bytes,
extract::State,
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::post,
Json, Router,
};
use crate::state::AppState;
pub fn routes() -> Router<AppState> {
Router::new()
.route("/federation/presence", post(receive_presence))
.route("/federation/forward", post(receive_forward))
.route("/federation/status", axum::routing::get(federation_status))
}
/// Extract and validate the federation token from headers.
fn validate_request(state: &AppState, headers: &HeaderMap, body: &[u8]) -> Result<(), (StatusCode, String)> {
let federation = state.federation.as_ref()
.ok_or((StatusCode::SERVICE_UNAVAILABLE, "federation not configured".to_string()))?;
let token = headers.get("x-federation-token")
.and_then(|v| v.to_str().ok())
.ok_or((StatusCode::UNAUTHORIZED, "missing X-Federation-Token header".to_string()))?;
if !crate::federation::verify_token(&federation.config.shared_secret, body, token) {
return Err((StatusCode::UNAUTHORIZED, "invalid federation token".to_string()));
}
Ok(())
}
/// Receive presence announcement from peer.
/// POST /v1/federation/presence
/// Body: { "server_id": "...", "fingerprints": [...], "timestamp": ... }
async fn receive_presence(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
if let Err((status, msg)) = validate_request(&state, &headers, &body) {
return (status, Json(serde_json::json!({ "error": msg }))).into_response();
}
let parsed: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("invalid JSON: {}", e) }))).into_response(),
};
let fingerprints: Vec<String> = parsed.get("fingerprints")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect())
.unwrap_or_default();
let server_id = parsed.get("server_id").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(ref federation) = state.federation {
let mut rp = federation.remote_presence.lock().await;
let count = fingerprints.len();
rp.fingerprints = fingerprints.into_iter().collect();
rp.last_updated = chrono::Utc::now().timestamp();
tracing::debug!("Federation: received {} fingerprints from {}", count, server_id);
}
(StatusCode::OK, Json(serde_json::json!({ "ok": true }))).into_response()
}
/// Receive a forwarded message from peer.
/// POST /v1/federation/forward
/// Body: { "to": "fingerprint", "message": "base64...", "from_server": "..." }
async fn receive_forward(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
if let Err((status, msg)) = validate_request(&state, &headers, &body) {
return (status, Json(serde_json::json!({ "error": msg }))).into_response();
}
let parsed: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("invalid JSON: {}", e) }))).into_response(),
};
let to = match parsed.get("to").and_then(|v| v.as_str()) {
Some(fp) => fp.to_string(),
None => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "missing 'to' field" }))).into_response(),
};
let message_b64 = match parsed.get("message").and_then(|v| v.as_str()) {
Some(m) => m.to_string(),
None => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "missing 'message' field" }))).into_response(),
};
let message = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &message_b64) {
Ok(m) => m,
Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("invalid base64: {}", e) }))).into_response(),
};
let from_server = parsed.get("from_server").and_then(|v| v.as_str()).unwrap_or("unknown");
// Try to deliver locally
let delivered = state.push_to_client(&to, &message).await;
if !delivered {
// Queue for later pickup
let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4());
let _ = state.db.messages.insert(key.as_bytes(), message.as_slice());
tracing::info!("Federation: queued forwarded message from {} for offline user {}", from_server, to);
} else {
tracing::info!("Federation: delivered forwarded message from {} to {}", from_server, to);
}
(StatusCode::OK, Json(serde_json::json!({ "ok": true, "delivered": delivered }))).into_response()
}
/// Federation health status.
/// GET /v1/federation/status
async fn federation_status(
State(state): State<AppState>,
) -> Json<serde_json::Value> {
match state.federation {
Some(ref federation) => {
let rp = federation.remote_presence.lock().await;
Json(serde_json::json!({
"enabled": true,
"server_id": federation.config.server_id,
"peer_id": federation.config.peer.id,
"peer_url": federation.config.peer.url,
"peer_alive": rp.is_alive(federation.config.presence_interval_secs),
"remote_clients": rp.fingerprints.len(),
"last_sync": rp.last_updated,
}))
}
None => {
Json(serde_json::json!({
"enabled": false,
}))
}
}
}

View File

@@ -75,6 +75,7 @@ fn save_group(db: &sled::Tree, group: &GroupInfo) -> anyhow::Result<()> {
}
async fn create_group(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<CreateRequest>,
) -> AppResult<Json<serde_json::Value>> {
@@ -99,6 +100,7 @@ async fn create_group(
}
async fn join_group(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Path(name): Path<String>,
Json(req): Json<JoinRequest>,
@@ -169,6 +171,7 @@ async fn list_groups(
/// queue infrastructure — group messages look like 1:1 messages to the
/// recipient, but with a group tag.
async fn send_to_group(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Path(name): Path<String>,
Json(req): Json<GroupSendRequest>,
@@ -210,6 +213,7 @@ async fn send_to_group(
}
async fn leave_group(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Path(name): Path<String>,
Json(req): Json<JoinRequest>,
@@ -235,6 +239,7 @@ struct KickRequest {
}
async fn kick_member(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Path(name): Path<String>,
Json(req): Json<KickRequest>,

View File

@@ -54,6 +54,7 @@ struct RegisterResponse {
}
async fn register_keys(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<RegisterRequest>,
) -> Json<RegisterResponse> {
@@ -129,6 +130,7 @@ struct OtpkEntry {
/// Upload additional one-time pre-keys.
async fn replenish_otpks(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<ReplenishRequest>,
) -> Json<serde_json::Value> {

View File

@@ -71,6 +71,7 @@ fn normalize_fp(fp: &str) -> String {
}
async fn send_message(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<SendRequest>,
) -> AppResult<Json<serde_json::Value>> {
@@ -84,14 +85,11 @@ async fn send_message(
}
}
// Try WebSocket push first (instant delivery)
if state.push_to_client(&to, &req.message).await {
tracing::info!("Pushed message to {} via WS ({} bytes)", to, req.message.len());
let delivered = state.deliver_or_queue(&to, &req.message).await;
if delivered {
tracing::info!("Delivered message to {} ({} bytes)", to, req.message.len());
} else {
// Queue in DB (offline delivery)
let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4());
tracing::info!("Queuing message for {} ({} bytes)", to, req.message.len());
state.db.messages.insert(key.as_bytes(), req.message)?;
tracing::info!("Queued message for {} ({} bytes)", to, req.message.len());
}
// Renew sender's alias TTL (sending = authenticated action)

View File

@@ -1,11 +1,16 @@
mod aliases;
pub mod auth;
mod calls;
mod devices;
mod federation;
mod groups;
mod health;
mod keys;
pub mod messages;
mod presence;
mod web;
mod ws;
mod wzp;
use axum::Router;
@@ -20,6 +25,11 @@ pub fn router() -> Router<AppState> {
.merge(aliases::routes())
.merge(auth::routes())
.merge(ws::routes())
.merge(calls::routes())
.merge(devices::routes())
.merge(presence::routes())
.merge(wzp::routes())
.merge(federation::routes())
}
/// Web UI router (served at root, outside /v1)

View File

@@ -0,0 +1,57 @@
use axum::{
extract::{Path, State},
routing::{get, post},
Json, Router,
};
use serde::Deserialize;
use crate::errors::AppResult;
use crate::state::AppState;
pub fn routes() -> Router<AppState> {
Router::new()
.route("/presence/:fingerprint", get(get_presence))
.route("/presence/batch", post(batch_presence))
}
fn normalize_fp(fp: &str) -> String {
fp.chars().filter(|c| c.is_ascii_hexdigit()).collect::<String>().to_lowercase()
}
async fn get_presence(
State(state): State<AppState>,
Path(fingerprint): Path<String>,
) -> AppResult<Json<serde_json::Value>> {
let fp = normalize_fp(&fingerprint);
let online = state.is_online(&fp).await;
let devices = state.device_count(&fp).await;
Ok(Json(serde_json::json!({
"fingerprint": fp,
"online": online,
"devices": devices,
})))
}
#[derive(Deserialize)]
struct BatchRequest {
fingerprints: Vec<String>,
}
async fn batch_presence(
_auth: crate::auth_middleware::AuthFingerprint,
State(state): State<AppState>,
Json(req): Json<BatchRequest>,
) -> AppResult<Json<serde_json::Value>> {
let mut results = Vec::new();
for fp in &req.fingerprints {
let fp = normalize_fp(fp);
let online = state.is_online(&fp).await;
let devices = state.device_count(&fp).await;
results.push(serde_json::json!({
"fingerprint": fp,
"online": online,
"devices": devices,
}));
}
Ok(Json(serde_json::json!({ "results": results })))
}

View File

@@ -66,16 +66,20 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String)
let (mut ws_tx, mut ws_rx) = socket.split();
// Register for push delivery
let mut push_rx = state.register_ws(&fingerprint).await;
let (_device_id, mut push_rx) = match state.register_ws(&fingerprint, None).await {
Some(pair) => pair,
None => {
tracing::warn!("WS {}: rejected — connection limit reached", fingerprint);
return; // closes the socket
}
};
// Send any queued messages from DB
let prefix = format!("queue:{}", fingerprint);
let mut keys_to_delete = Vec::new();
for item in state.db.messages.scan_prefix(prefix.as_bytes()) {
if let Ok((key, value)) = item {
if ws_tx.send(Message::Binary(value.to_vec().into())).await.is_ok() {
keys_to_delete.push(key);
}
for (key, value) in state.db.messages.scan_prefix(prefix.as_bytes()).flatten() {
if ws_tx.send(Message::Binary(value.to_vec())).await.is_ok() {
keys_to_delete.push(key);
}
}
for key in &keys_to_delete {
@@ -85,11 +89,34 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String)
tracing::info!("WS {}: flushed {} queued messages", fingerprint, keys_to_delete.len());
}
// Flush missed calls (FC-7)
let missed_prefix = format!("missed:{}", fingerprint);
let mut missed_keys = Vec::new();
for (key, value) in state.db.missed_calls.scan_prefix(missed_prefix.as_bytes()).flatten() {
if let Ok(missed) = serde_json::from_slice::<serde_json::Value>(&value) {
let wrapper = serde_json::json!({
"type": "missed_call",
"data": missed,
});
if let Ok(json_str) = serde_json::to_string(&wrapper) {
if ws_tx.send(Message::Text(json_str)).await.is_ok() {
missed_keys.push(key);
}
}
}
}
for key in &missed_keys {
let _ = state.db.missed_calls.remove(key);
}
if !missed_keys.is_empty() {
tracing::info!("WS {}: flushed {} missed call notifications", fingerprint, missed_keys.len());
}
// Spawn task to forward push messages to WS
let _fp_clone = fingerprint.clone();
let mut push_task = tokio::spawn(async move {
while let Some(msg) = push_rx.recv().await {
if ws_tx.send(Message::Binary(msg.into())).await.is_err() {
if ws_tx.send(Message::Binary(msg)).await.is_err() {
break;
}
}
@@ -119,13 +146,77 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String)
}
}
// Try push to connected client first
if !state_clone.push_to_client(&to_fp, message).await {
// Queue in DB
let key = format!("queue:{}:{}", to_fp, uuid::Uuid::new_v4());
let _ = state_clone.db.messages.insert(key.as_bytes(), message);
// Call signal side effects
if let Ok(WireMessage::CallSignal { ref id, ref sender_fingerprint, ref signal_type, .. }) = bincode::deserialize::<WireMessage>(message) {
use warzone_protocol::message::CallSignalType;
let now = chrono::Utc::now().timestamp();
match signal_type {
CallSignalType::Offer => {
let call = crate::state::CallState {
call_id: id.clone(),
caller_fp: sender_fingerprint.clone(),
callee_fp: to_fp.clone(),
group_name: None,
room_id: None,
status: crate::state::CallStatus::Ringing,
created_at: now,
answered_at: None,
ended_at: None,
};
state_clone.active_calls.lock().await.insert(id.clone(), call.clone());
// Persist to DB
let _ = state_clone.db.calls.insert(
id.as_bytes(),
serde_json::to_vec(&call).unwrap_or_default(),
);
tracing::info!("Call {} started: {} -> {}", id, sender_fingerprint, to_fp);
// If callee is offline, record missed call (FC-7)
if !state_clone.is_online(&to_fp).await {
let missed_key = format!("missed:{}:{}", to_fp, id);
let missed = serde_json::json!({
"call_id": id,
"caller_fp": sender_fingerprint,
"timestamp": now,
});
let _ = state_clone.db.missed_calls.insert(
missed_key.as_bytes(),
serde_json::to_vec(&missed).unwrap_or_default(),
);
tracing::info!("Missed call recorded for offline user {}", to_fp);
}
}
CallSignalType::Answer => {
let mut calls = state_clone.active_calls.lock().await;
if let Some(call) = calls.get_mut(id) {
call.status = crate::state::CallStatus::Active;
call.answered_at = Some(now);
let _ = state_clone.db.calls.insert(
id.as_bytes(),
serde_json::to_vec(&call).unwrap_or_default(),
);
}
tracing::info!("Call {} answered", id);
}
CallSignalType::Hangup | CallSignalType::Reject => {
let mut calls = state_clone.active_calls.lock().await;
if let Some(mut call) = calls.remove(id) {
call.status = crate::state::CallStatus::Ended;
call.ended_at = Some(now);
let _ = state_clone.db.calls.insert(
id.as_bytes(),
serde_json::to_vec(&call).unwrap_or_default(),
);
}
tracing::info!("Call {} ended", id);
}
_ => {} // Ringing, Busy, IceCandidate — route opaquely
}
}
// Deliver via local WS, federation, or queue in DB
state_clone.deliver_or_queue(&to_fp, message).await;
tracing::debug!("WS {}: routed message to {}", fp_clone2, to_fp);
}
}
@@ -147,10 +238,8 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String)
}
}
if !state_clone.push_to_client(&to_fp, &message).await {
let key = format!("queue:{}:{}", to_fp, uuid::Uuid::new_v4());
let _ = state_clone.db.messages.insert(key.as_bytes(), message);
}
// Deliver via local WS, federation, or queue in DB
state_clone.deliver_or_queue(&to_fp, &message).await;
// Renew alias TTL
crate::routes::messages::renew_alias_ttl(
@@ -181,9 +270,9 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String)
// We can't easily get the sender ref here, so just clean up by fingerprint
// In production, use a unique connection ID
let mut conns = state.connections.lock().await;
if let Some(senders) = conns.get_mut(&fingerprint) {
senders.retain(|s| !s.is_closed());
if senders.is_empty() {
if let Some(devices) = conns.get_mut(&fingerprint) {
devices.retain(|d| !d.sender.is_closed());
if devices.is_empty() {
conns.remove(&fingerprint);
}
}

View File

@@ -0,0 +1,45 @@
use axum::{
extract::State,
routing::get,
Json, Router,
};
use crate::errors::AppResult;
use crate::state::AppState;
pub fn routes() -> Router<AppState> {
Router::new()
.route("/wzp/relay-config", get(relay_config))
}
/// Returns the WZP relay address and a short-lived service token.
///
/// The web client calls this to discover where to connect for voice/video
/// and gets a token to present to the relay for authentication.
async fn relay_config(
State(state): State<AppState>,
) -> AppResult<Json<serde_json::Value>> {
// Issue a short-lived service token (5 minutes) for WZP relay auth.
let token = hex::encode(rand::random::<[u8; 32]>());
let expires = chrono::Utc::now().timestamp() + 300; // 5 minutes
state.db.tokens.insert(
token.as_bytes(),
serde_json::to_vec(&serde_json::json!({
"fingerprint": "service:wzp",
"service": "wzp",
"expires_at": expires,
}))?.as_slice(),
)?;
// The relay address is configured server-side. For now, return a
// placeholder that the admin sets via environment variable.
let relay_addr = std::env::var("WZP_RELAY_ADDR")
.unwrap_or_else(|_| "127.0.0.1:4433".to_string());
Ok(Json(serde_json::json!({
"relay_addr": relay_addr,
"token": token,
"expires_in": 300,
})))
}

View File

@@ -4,14 +4,26 @@ use tokio::sync::{Mutex, mpsc};
use crate::db::Database;
/// Maximum WebSocket connections per fingerprint (multi-device cap).
const MAX_WS_PER_FINGERPRINT: usize = 5;
/// Maximum number of message IDs to track for deduplication.
const DEDUP_CAPACITY: usize = 10_000;
/// Per-connection sender: messages are pushed here for instant delivery.
pub type WsSender = mpsc::UnboundedSender<Vec<u8>>;
/// Connected clients: fingerprint → list of WS senders (multiple devices).
pub type Connections = Arc<Mutex<HashMap<String, Vec<WsSender>>>>;
/// Metadata for a single connected device.
#[derive(Clone)]
pub struct DeviceConnection {
pub device_id: String,
pub sender: WsSender,
pub connected_at: i64,
pub token: Option<String>,
}
/// Connected clients: fingerprint → list of device connections (multiple devices).
pub type Connections = Arc<Mutex<HashMap<String, Vec<DeviceConnection>>>>;
/// Bounded dedup tracker: FIFO eviction when capacity is exceeded.
#[derive(Clone)]
@@ -47,11 +59,35 @@ impl DedupTracker {
}
}
/// Call lifecycle status.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum CallStatus {
Ringing,
Active,
Ended,
}
/// Server-side state for an active or recently ended call.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct CallState {
pub call_id: String,
pub caller_fp: String,
pub callee_fp: String,
pub group_name: Option<String>,
pub room_id: Option<String>,
pub status: CallStatus,
pub created_at: i64,
pub answered_at: Option<i64>,
pub ended_at: Option<i64>,
}
#[derive(Clone)]
pub struct AppState {
pub db: Arc<Database>,
pub connections: Connections,
pub dedup: DedupTracker,
pub active_calls: Arc<Mutex<HashMap<String, CallState>>>,
pub federation: Option<crate::federation::FederationHandle>,
}
impl AppState {
@@ -61,16 +97,18 @@ impl AppState {
db: Arc::new(db),
connections: Arc::new(Mutex::new(HashMap::new())),
dedup: DedupTracker::new(),
active_calls: Arc::new(Mutex::new(HashMap::new())),
federation: None,
})
}
/// Try to push a message to a connected client. Returns true if delivered.
pub async fn push_to_client(&self, fingerprint: &str, message: &[u8]) -> bool {
let conns = self.connections.lock().await;
if let Some(senders) = conns.get(fingerprint) {
if let Some(devices) = conns.get(fingerprint) {
let mut delivered = false;
for sender in senders {
if sender.send(message.to_vec()).is_ok() {
for device in devices {
if device.sender.send(message.to_vec()).is_ok() {
delivered = true;
}
}
@@ -81,25 +119,127 @@ impl AppState {
}
/// Register a WS connection for a fingerprint.
pub async fn register_ws(&self, fingerprint: &str) -> mpsc::UnboundedReceiver<Vec<u8>> {
///
/// Returns `None` if the per-fingerprint connection cap has been reached.
/// On success, returns the assigned device ID and a receiver for push messages.
pub async fn register_ws(&self, fingerprint: &str, token: Option<String>) -> Option<(String, mpsc::UnboundedReceiver<Vec<u8>>)> {
let (tx, rx) = mpsc::unbounded_channel();
let device_id = uuid::Uuid::new_v4().to_string()[..8].to_string();
let mut conns = self.connections.lock().await;
conns.entry(fingerprint.to_string()).or_default().push(tx);
tracing::info!("WS registered for {} ({} total connections)", fingerprint,
conns.values().map(|v| v.len()).sum::<usize>());
rx
let entry = conns.entry(fingerprint.to_string()).or_default();
// Clean up closed connections first
entry.retain(|d| !d.sender.is_closed());
if entry.len() >= MAX_WS_PER_FINGERPRINT {
tracing::warn!(
"WS connection cap reached for {} ({} connections)",
fingerprint,
entry.len()
);
return None;
}
entry.push(DeviceConnection {
device_id: device_id.clone(),
sender: tx,
connected_at: chrono::Utc::now().timestamp(),
token,
});
tracing::info!(
"WS registered for {} device={} ({} total)",
fingerprint,
device_id,
conns.values().map(|v| v.len()).sum::<usize>()
);
Some((device_id, rx))
}
/// Unregister a WS connection.
#[allow(dead_code)]
pub async fn unregister_ws(&self, fingerprint: &str, sender: &WsSender) {
let mut conns = self.connections.lock().await;
if let Some(senders) = conns.get_mut(fingerprint) {
senders.retain(|s| !s.same_channel(sender));
if senders.is_empty() {
if let Some(devices) = conns.get_mut(fingerprint) {
devices.retain(|d| !d.sender.same_channel(sender));
if devices.is_empty() {
conns.remove(fingerprint);
}
}
tracing::info!("WS unregistered for {}", fingerprint);
}
/// Try to deliver a message: local push → federation forward → DB queue.
/// Returns true if delivered instantly (local or remote).
pub async fn deliver_or_queue(&self, to_fp: &str, message: &[u8]) -> bool {
// 1. Try local WebSocket push
if self.push_to_client(to_fp, message).await {
return true;
}
// 2. Try federation forward
if let Some(ref federation) = self.federation {
if federation.is_remote(to_fp).await {
if federation.forward_message(to_fp, message).await {
return true;
}
}
}
// 3. Queue in local DB
let key = format!("queue:{}:{}", to_fp, uuid::Uuid::new_v4());
let _ = self.db.messages.insert(key.as_bytes(), message);
false
}
/// Check if a fingerprint has any active WS connections.
pub async fn is_online(&self, fingerprint: &str) -> bool {
let conns = self.connections.lock().await;
conns.get(fingerprint).map(|d| !d.is_empty()).unwrap_or(false)
}
/// Count active WS connections for a fingerprint (multi-device).
pub async fn device_count(&self, fingerprint: &str) -> usize {
let conns = self.connections.lock().await;
conns.get(fingerprint).map(|d| d.len()).unwrap_or(0)
}
/// List devices for a fingerprint with metadata.
pub async fn list_devices(&self, fingerprint: &str) -> Vec<(String, i64)> {
let conns = self.connections.lock().await;
conns.get(fingerprint)
.map(|devices| devices.iter().map(|d| (d.device_id.clone(), d.connected_at)).collect())
.unwrap_or_default()
}
/// Kick a specific device by ID. Returns true if found and kicked.
pub async fn kick_device(&self, fingerprint: &str, device_id: &str) -> bool {
let mut conns = self.connections.lock().await;
if let Some(devices) = conns.get_mut(fingerprint) {
let before = devices.len();
devices.retain(|d| d.device_id != device_id);
let kicked = devices.len() < before;
if devices.is_empty() {
conns.remove(fingerprint);
}
kicked
} else {
false
}
}
/// Revoke all connections for a fingerprint except one device_id.
pub async fn revoke_all_except(&self, fingerprint: &str, keep_device_id: &str) -> usize {
let mut conns = self.connections.lock().await;
if let Some(devices) = conns.get_mut(fingerprint) {
let before = devices.len();
devices.retain(|d| d.device_id == keep_device_id);
let removed = before - devices.len();
if devices.is_empty() {
conns.remove(fingerprint);
}
removed
} else {
0
}
}
}