From 2599ce956aaa42b5294e629f24bc4662e11c75fc Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Fri, 27 Mar 2026 11:00:58 +0400 Subject: [PATCH] v0.0.8: Server-side message deduplication Server: - DedupTracker in AppState: bounded HashSet (10,000 IDs, FIFO eviction) - send_message: extracts message ID from bincode, drops duplicates - WS handler: dedup on both binary and JSON message frames - extract_message_id() parses all WireMessage variants Co-Authored-By: Claude Opus 4.6 (1M context) --- warzone/Cargo.lock | 10 ++--- warzone/Cargo.toml | 2 +- .../warzone-server/src/routes/messages.rs | 24 +++++++++++ .../crates/warzone-server/src/routes/ws.rs | 32 +++++++++++++++ warzone/crates/warzone-server/src/state.rs | 41 ++++++++++++++++++- 5 files changed, 102 insertions(+), 7 deletions(-) diff --git a/warzone/Cargo.lock b/warzone/Cargo.lock index a778e53..d786776 100644 --- a/warzone/Cargo.lock +++ b/warzone/Cargo.lock @@ -2647,7 +2647,7 @@ dependencies = [ [[package]] name = "warzone-client" -version = "0.0.7" +version = "0.0.8" dependencies = [ "anyhow", "argon2", @@ -2680,7 +2680,7 @@ dependencies = [ [[package]] name = "warzone-mule" -version = "0.0.7" +version = "0.0.8" dependencies = [ "anyhow", "clap", @@ -2689,7 +2689,7 @@ dependencies = [ [[package]] name = "warzone-protocol" -version = "0.0.7" +version = "0.0.8" dependencies = [ "base64", "bincode", @@ -2712,7 +2712,7 @@ dependencies = [ [[package]] name = "warzone-server" -version = "0.0.7" +version = "0.0.8" dependencies = [ "anyhow", "axum", @@ -2739,7 +2739,7 @@ dependencies = [ [[package]] name = "warzone-wasm" -version = "0.0.7" +version = "0.0.8" dependencies = [ "base64", "bincode", diff --git a/warzone/Cargo.toml b/warzone/Cargo.toml index 25ffa44..58eaa99 100644 --- a/warzone/Cargo.toml +++ b/warzone/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.0.7" +version = "0.0.8" edition = "2021" license = "MIT" rust-version = "1.75" diff --git a/warzone/crates/warzone-server/src/routes/messages.rs b/warzone/crates/warzone-server/src/routes/messages.rs index ed09511..6fe757a 100644 --- a/warzone/crates/warzone-server/src/routes/messages.rs +++ b/warzone/crates/warzone-server/src/routes/messages.rs @@ -4,10 +4,26 @@ use axum::{ Json, Router, }; use serde::Deserialize; +use warzone_protocol::message::WireMessage; use crate::errors::AppResult; use crate::state::AppState; +/// Try to extract the message ID from raw bincode-serialized WireMessage bytes. +fn extract_message_id(data: &[u8]) -> Option { + if let Ok(wire) = bincode::deserialize::(data) { + match wire { + WireMessage::KeyExchange { id, .. } => Some(id), + WireMessage::Message { id, .. } => Some(id), + WireMessage::FileHeader { id, .. } => Some(id), + WireMessage::FileChunk { id, .. } => Some(id), + WireMessage::Receipt { message_id, .. } => Some(message_id), + } + } else { + None + } +} + /// Touch the alias TTL for a fingerprint (renew on authenticated action). pub fn renew_alias_ttl(db: &sled::Tree, fp: &str) { let alias_key = format!("fp:{}", fp); @@ -55,6 +71,14 @@ async fn send_message( ) -> AppResult> { let to = normalize_fp(&req.to); + // Dedup: if we have already seen this message ID, silently drop it + if let Some(msg_id) = extract_message_id(&req.message) { + if state.dedup.check_and_insert(&msg_id) { + tracing::debug!("Dedup: dropping duplicate message {}", msg_id); + return Ok(Json(serde_json::json!({ "ok": true }))); + } + } + // Try WebSocket push first (instant delivery) if state.push_to_client(&to, &req.message).await { tracing::info!("Pushed message to {} via WS ({} bytes)", to, req.message.len()); diff --git a/warzone/crates/warzone-server/src/routes/ws.rs b/warzone/crates/warzone-server/src/routes/ws.rs index a2d4f1e..216c50d 100644 --- a/warzone/crates/warzone-server/src/routes/ws.rs +++ b/warzone/crates/warzone-server/src/routes/ws.rs @@ -17,9 +17,25 @@ use axum::{ Router, }; use futures_util::{SinkExt, StreamExt}; +use warzone_protocol::message::WireMessage; use crate::state::AppState; +/// Try to extract the message ID from raw bincode-serialized WireMessage bytes. +fn extract_message_id(data: &[u8]) -> Option { + if let Ok(wire) = bincode::deserialize::(data) { + match wire { + WireMessage::KeyExchange { id, .. } => Some(id), + WireMessage::Message { id, .. } => Some(id), + WireMessage::FileHeader { id, .. } => Some(id), + WireMessage::FileChunk { id, .. } => Some(id), + WireMessage::Receipt { message_id, .. } => Some(message_id), + } + } else { + None + } +} + pub fn routes() -> Router { Router::new().route("/ws/:fingerprint", get(ws_handler)) } @@ -90,6 +106,14 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String) let to_fp = normalize_fp(&header); let message = &data[64..]; + // Dedup: skip if we already processed this message ID + if let Some(msg_id) = extract_message_id(message) { + if state_clone.dedup.check_and_insert(&msg_id) { + tracing::debug!("WS dedup: dropping duplicate binary message {}", msg_id); + continue; + } + } + // Try push to connected client first if !state_clone.push_to_client(&to_fp, message).await { // Queue in DB @@ -110,6 +134,14 @@ async fn handle_socket(socket: WebSocket, state: AppState, fingerprint: String) .filter_map(|v| v.as_u64().map(|n| n as u8)) .collect(); + // Dedup: skip if we already processed this message ID + if let Some(msg_id) = extract_message_id(&message) { + if state_clone.dedup.check_and_insert(&msg_id) { + tracing::debug!("WS dedup: dropping duplicate JSON message {}", msg_id); + continue; + } + } + if !state_clone.push_to_client(&to_fp, &message).await { let key = format!("queue:{}:{}", to_fp, uuid::Uuid::new_v4()); let _ = state_clone.db.messages.insert(key.as_bytes(), message); diff --git a/warzone/crates/warzone-server/src/state.rs b/warzone/crates/warzone-server/src/state.rs index b275f26..899cd1d 100644 --- a/warzone/crates/warzone-server/src/state.rs +++ b/warzone/crates/warzone-server/src/state.rs @@ -1,19 +1,57 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use tokio::sync::{Mutex, mpsc}; use crate::db::Database; +/// Maximum number of message IDs to track for deduplication. +const DEDUP_CAPACITY: usize = 10_000; + /// Per-connection sender: messages are pushed here for instant delivery. pub type WsSender = mpsc::UnboundedSender>; /// Connected clients: fingerprint → list of WS senders (multiple devices). pub type Connections = Arc>>>; +/// Bounded dedup tracker: FIFO eviction when capacity is exceeded. +#[derive(Clone)] +pub struct DedupTracker { + seen: Arc>>, + order: Arc>>, +} + +impl DedupTracker { + pub fn new() -> Self { + DedupTracker { + seen: Arc::new(std::sync::Mutex::new(HashSet::with_capacity(DEDUP_CAPACITY))), + order: Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(DEDUP_CAPACITY))), + } + } + + /// Returns `true` if this ID was already seen (i.e. it is a duplicate). + /// If new, inserts it and evicts the oldest if over capacity. + pub fn check_and_insert(&self, id: &str) -> bool { + let mut seen = self.seen.lock().unwrap(); + if seen.contains(id) { + return true; // duplicate + } + let mut order = self.order.lock().unwrap(); + if seen.len() >= DEDUP_CAPACITY { + if let Some(oldest) = order.pop_front() { + seen.remove(&oldest); + } + } + seen.insert(id.to_string()); + order.push_back(id.to_string()); + false // not a duplicate + } +} + #[derive(Clone)] pub struct AppState { pub db: Arc, pub connections: Connections, + pub dedup: DedupTracker, } impl AppState { @@ -22,6 +60,7 @@ impl AppState { Ok(AppState { db: Arc::new(db), connections: Arc::new(Mutex::new(HashMap::new())), + dedup: DedupTracker::new(), }) }