Files
featherChat/warzone/crates/warzone-server/src/routes/messages.rs
Siavash Sameni 2599ce956a v0.0.8: Server-side message deduplication
Server:
- DedupTracker in AppState: bounded HashSet (10,000 IDs, FIFO eviction)
- send_message: extracts message ID from bincode, drops duplicates
- WS handler: dedup on both binary and JSON message frames
- extract_message_id() parses all WireMessage variants

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 11:00:58 +04:00

143 lines
4.7 KiB
Rust

use axum::{
extract::{Path, State},
routing::{delete, get, post},
Json, Router,
};
use serde::Deserialize;
use warzone_protocol::message::WireMessage;
use crate::errors::AppResult;
use crate::state::AppState;
/// Try to extract the message ID from raw bincode-serialized WireMessage bytes.
fn extract_message_id(data: &[u8]) -> Option<String> {
if let Ok(wire) = bincode::deserialize::<WireMessage>(data) {
match wire {
WireMessage::KeyExchange { id, .. } => Some(id),
WireMessage::Message { id, .. } => Some(id),
WireMessage::FileHeader { id, .. } => Some(id),
WireMessage::FileChunk { id, .. } => Some(id),
WireMessage::Receipt { message_id, .. } => Some(message_id),
}
} else {
None
}
}
/// Touch the alias TTL for a fingerprint (renew on authenticated action).
pub fn renew_alias_ttl(db: &sled::Tree, fp: &str) {
let alias_key = format!("fp:{}", fp);
if let Ok(Some(alias_bytes)) = db.get(alias_key.as_bytes()) {
let alias = String::from_utf8_lossy(&alias_bytes).to_string();
let rec_key = format!("rec:{}", alias);
if let Ok(Some(rec_data)) = db.get(rec_key.as_bytes()) {
if let Ok(mut record) = serde_json::from_slice::<serde_json::Value>(&rec_data) {
if let Some(obj) = record.as_object_mut() {
obj.insert("last_active".into(), serde_json::json!(chrono::Utc::now().timestamp()));
if let Ok(updated) = serde_json::to_vec(&record) {
let _ = db.insert(rec_key.as_bytes(), updated);
}
}
}
}
}
}
pub fn routes() -> Router<AppState> {
Router::new()
.route("/messages/send", post(send_message))
.route("/messages/poll/:fingerprint", get(poll_messages))
.route("/messages/:id/ack", delete(ack_message))
}
#[derive(Deserialize)]
struct SendRequest {
to: String,
#[serde(default)]
from: Option<String>,
message: Vec<u8>,
}
fn normalize_fp(fp: &str) -> String {
fp.chars()
.filter(|c| c.is_ascii_hexdigit())
.collect::<String>()
.to_lowercase()
}
async fn send_message(
State(state): State<AppState>,
Json(req): Json<SendRequest>,
) -> AppResult<Json<serde_json::Value>> {
let to = normalize_fp(&req.to);
// Dedup: if we have already seen this message ID, silently drop it
if let Some(msg_id) = extract_message_id(&req.message) {
if state.dedup.check_and_insert(&msg_id) {
tracing::debug!("Dedup: dropping duplicate message {}", msg_id);
return Ok(Json(serde_json::json!({ "ok": true })));
}
}
// Try WebSocket push first (instant delivery)
if state.push_to_client(&to, &req.message).await {
tracing::info!("Pushed message to {} via WS ({} bytes)", to, req.message.len());
} else {
// Queue in DB (offline delivery)
let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4());
tracing::info!("Queuing message for {} ({} bytes)", to, req.message.len());
state.db.messages.insert(key.as_bytes(), req.message)?;
}
// Renew sender's alias TTL (sending = authenticated action)
if let Some(ref from) = req.from {
renew_alias_ttl(&state.db.aliases, &normalize_fp(from));
}
Ok(Json(serde_json::json!({ "ok": true })))
}
/// Poll fetches all queued messages and deletes them from the server.
/// This is store-and-forward: once delivered, the server drops them.
async fn poll_messages(
State(state): State<AppState>,
Path(fingerprint): Path<String>,
) -> AppResult<Json<Vec<String>>> {
let prefix = format!("queue:{}", normalize_fp(&fingerprint));
let mut messages = Vec::new();
let mut keys_to_delete = Vec::new();
for item in state.db.messages.scan_prefix(prefix.as_bytes()) {
let (key, value) = item?;
messages.push(base64::Engine::encode(
&base64::engine::general_purpose::STANDARD,
&value,
));
keys_to_delete.push(key);
}
// Delete after collecting (fetch-and-delete)
for key in &keys_to_delete {
state.db.messages.remove(key)?;
}
if !messages.is_empty() {
tracing::info!(
"Delivered {} message(s) to {}, deleted from queue",
messages.len(),
normalize_fp(&fingerprint)
);
}
Ok(Json(messages))
}
/// Explicit ack endpoint (for future use with selective delivery).
async fn ack_message(
State(state): State<AppState>,
Path(id): Path<String>,
) -> AppResult<Json<serde_json::Value>> {
state.db.messages.remove(id.as_bytes())?;
Ok(Json(serde_json::json!({ "ok": true })))
}