diff --git a/warzone/Cargo.lock b/warzone/Cargo.lock index 5caf093..77bb10c 100644 --- a/warzone/Cargo.lock +++ b/warzone/Cargo.lock @@ -3040,6 +3040,7 @@ dependencies = [ "serde_json", "sha2", "sled", + "tempfile", "thiserror 2.0.18", "tokio", "tokio-tungstenite 0.21.0", diff --git a/warzone/Cargo.toml b/warzone/Cargo.toml index d4acd1e..17dab0e 100644 --- a/warzone/Cargo.toml +++ b/warzone/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.0.39" +version = "0.0.40" edition = "2021" license = "MIT" rust-version = "1.75" diff --git a/warzone/crates/warzone-client/src/tui/network.rs b/warzone/crates/warzone-client/src/tui/network.rs index 3047cc7..c2df8ee 100644 --- a/warzone/crates/warzone-client/src/tui/network.rs +++ b/warzone/crates/warzone-client/src/tui/network.rs @@ -75,6 +75,30 @@ fn cache_eth_lookup(fp: &str, client: &ServerClient, eth_cache: &EthCache) { }); } +/// Pre-populate the ETH cache for all known contacts. +pub async fn prefill_eth_cache( + db: &crate::storage::LocalDb, + client: &ServerClient, + eth_cache: &EthCache, +) { + if let Ok(contacts) = db.list_contacts() { + for c in &contacts { + if let Some(fp) = c.get("fingerprint").and_then(|v| v.as_str()) { + let fp = fp.to_string(); + if eth_cache.lock().unwrap().contains_key(&fp) { continue; } + let url = format!("{}/v1/resolve/{}", client.base_url, fp); + if let Ok(resp) = client.client.get(&url).send().await { + if let Ok(data) = resp.json::().await { + if let Some(eth) = data.get("eth_address").and_then(|v| v.as_str()) { + eth_cache.lock().unwrap().insert(fp, eth.to_string()); + } + } + } + } + } + } +} + fn store_received(db: &LocalDb, sender_fp: &str, text: &str) { let _ = db.touch_contact(sender_fp, None); let _ = db.store_message(sender_fp, sender_fp, text, false); @@ -584,6 +608,9 @@ pub async fn poll_loop( let fp = normfp(&our_fp); let eth_cache: EthCache = Arc::new(std::sync::Mutex::new(HashMap::new())); + // Pre-populate ETH cache for known contacts + prefill_eth_cache(&db, &client, ð_cache).await; + // Try WebSocket first let ws_url = client.base_url .replace("http://", "ws://") diff --git a/warzone/crates/warzone-protocol/Cargo.toml b/warzone/crates/warzone-protocol/Cargo.toml index 9ad9f13..11cbf80 100644 --- a/warzone/crates/warzone-protocol/Cargo.toml +++ b/warzone/crates/warzone-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "warzone-protocol" -version = "0.0.39" +version = "0.0.40" edition = "2021" license = "MIT" description = "Core crypto & wire protocol for featherChat (Warzone messenger)" diff --git a/warzone/crates/warzone-server/Cargo.toml b/warzone/crates/warzone-server/Cargo.toml index a6bb3b5..9884419 100644 --- a/warzone/crates/warzone-server/Cargo.toml +++ b/warzone/crates/warzone-server/Cargo.toml @@ -28,3 +28,7 @@ bincode.workspace = true sha2.workspace = true reqwest = { workspace = true, features = ["rustls-tls", "json"] } tokio-tungstenite.workspace = true + +[dev-dependencies] +tempfile = "3" +tokio = { workspace = true, features = ["test-util"] } diff --git a/warzone/crates/warzone-server/src/main.rs b/warzone/crates/warzone-server/src/main.rs index a42b5c3..467c671 100644 --- a/warzone/crates/warzone-server/src/main.rs +++ b/warzone/crates/warzone-server/src/main.rs @@ -47,6 +47,38 @@ async fn main() -> anyhow::Result<()> { let mut state = state::AppState::new(&cli.data_dir)?; + // Reload active calls from DB + { + let now = chrono::Utc::now().timestamp(); + let mut loaded = 0u32; + let mut expired = 0u32; + for item in state.db.calls.iter().flatten() { + if let Ok(call) = serde_json::from_slice::(&item.1) { + match call.status { + state::CallStatus::Ringing | state::CallStatus::Active => { + if now - call.created_at > 86400 { + let mut ended = call.clone(); + ended.status = state::CallStatus::Ended; + ended.ended_at = Some(now); + let _ = state.db.calls.insert( + &item.0, + serde_json::to_vec(&ended).unwrap_or_default(), + ); + expired += 1; + } else { + state.active_calls.lock().await.insert(call.call_id.clone(), call); + loaded += 1; + } + } + _ => {} // Ended calls stay in DB but not in memory + } + } + } + if loaded > 0 || expired > 0 { + tracing::info!("Calls: loaded {} active, expired {} stale", loaded, expired); + } + } + // Load federation config if provided if let Some(ref fed_path) = cli.federation { let fed_config = federation::load_config(fed_path)?; diff --git a/warzone/crates/warzone-server/src/routes/web.rs b/warzone/crates/warzone-server/src/routes/web.rs index 7c83aef..1ccca03 100644 --- a/warzone/crates/warzone-server/src/routes/web.rs +++ b/warzone/crates/warzone-server/src/routes/web.rs @@ -50,7 +50,7 @@ async fn pwa_manifest() -> impl IntoResponse { async fn service_worker() -> impl IntoResponse { ([(header::CONTENT_TYPE, "application/javascript")], r##" -const CACHE = 'wz-v21'; +const CACHE = 'wz-v22'; const SHELL = ['/', '/wasm/warzone_wasm.js', '/wasm/warzone_wasm_bg.wasm', '/icon.svg', '/manifest.json']; self.addEventListener('install', e => { @@ -287,7 +287,7 @@ let pollTimer = null; let ws = null; // WebSocket connection let wasmReady = false; -const VERSION = '0.0.39'; +const VERSION = '0.0.40'; let DEBUG = true; // toggle with /debug command // ── Receipt tracking ── diff --git a/warzone/crates/warzone-server/src/state.rs b/warzone/crates/warzone-server/src/state.rs index e3f1374..d9512a2 100644 --- a/warzone/crates/warzone-server/src/state.rs +++ b/warzone/crates/warzone-server/src/state.rs @@ -273,3 +273,142 @@ impl AppState { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_state() -> AppState { + let dir = tempfile::tempdir().unwrap(); + AppState::new(dir.path().to_str().unwrap()).unwrap() + } + + #[tokio::test] + async fn push_to_client_returns_false_when_offline() { + let state = test_state(); + assert!(!state.push_to_client("abc123", b"hello").await); + } + + #[tokio::test] + async fn register_ws_and_push() { + let state = test_state(); + let (_, mut rx) = state.register_ws("test_fp", None).await.unwrap(); + + assert!(state.push_to_client("test_fp", b"hello").await); + let msg = rx.recv().await.unwrap(); + assert_eq!(msg, b"hello"); + } + + #[tokio::test] + async fn ws_connection_cap() { + let state = test_state(); + // Hold receivers so senders stay open (register_ws prunes closed senders). + let mut _holders = Vec::new(); + for i in 0..5 { + let res = state.register_ws("same_fp", None).await; + assert!(res.is_some(), "connection {} should succeed", i); + _holders.push(res.unwrap()); + } + // 6th should fail + assert!(state.register_ws("same_fp", None).await.is_none()); + } + + #[tokio::test] + async fn is_online_and_device_count() { + let state = test_state(); + assert!(!state.is_online("fp1").await); + assert_eq!(state.device_count("fp1").await, 0); + + // Must hold receivers so the senders are not marked as closed. + let _r1 = state.register_ws("fp1", None).await; + assert!(state.is_online("fp1").await); + assert_eq!(state.device_count("fp1").await, 1); + + let _r2 = state.register_ws("fp1", None).await; + assert_eq!(state.device_count("fp1").await, 2); + } + + #[tokio::test] + async fn kick_device() { + let state = test_state(); + let (device_id, _) = state.register_ws("fp1", None).await.unwrap(); + + assert!(state.kick_device("fp1", &device_id).await); + assert!(!state.is_online("fp1").await); + } + + #[tokio::test] + async fn revoke_all_except() { + let state = test_state(); + let (id1, _rx1) = state.register_ws("fp1", None).await.unwrap(); + let (_id2, _rx2) = state.register_ws("fp1", None).await.unwrap(); + let (_id3, _rx3) = state.register_ws("fp1", None).await.unwrap(); + + let removed = state.revoke_all_except("fp1", &id1).await; + assert_eq!(removed, 2); + assert_eq!(state.device_count("fp1").await, 1); + } + + #[tokio::test] + async fn deliver_or_queue_offline() { + let state = test_state(); + // No WS connected -- should queue + let delivered = state.deliver_or_queue("offline_fp", b"test message").await; + assert!(!delivered); + + // Check message was queued in DB + let prefix = "queue:offline_fp"; + let count = state.db.messages.scan_prefix(prefix.as_bytes()).count(); + assert_eq!(count, 1); + } + + #[tokio::test] + async fn deliver_or_queue_online() { + let state = test_state(); + let (_, mut rx) = state.register_ws("online_fp", None).await.unwrap(); + + let delivered = state.deliver_or_queue("online_fp", b"instant").await; + assert!(delivered); + + let msg = rx.recv().await.unwrap(); + assert_eq!(msg, b"instant"); + } + + #[tokio::test] + async fn call_state_lifecycle() { + let state = test_state(); + + let call = CallState { + call_id: "call-001".into(), + caller_fp: "alice".into(), + callee_fp: "bob".into(), + group_name: None, + room_id: None, + status: CallStatus::Ringing, + created_at: chrono::Utc::now().timestamp(), + answered_at: None, + ended_at: None, + }; + + state.active_calls.lock().await.insert("call-001".into(), call); + assert_eq!(state.active_calls.lock().await.len(), 1); + + // End the call + if let Some(mut c) = state.active_calls.lock().await.remove("call-001") { + c.status = CallStatus::Ended; + c.ended_at = Some(chrono::Utc::now().timestamp()); + let _ = state.db.calls.insert(b"call-001", serde_json::to_vec(&c).unwrap()); + } + assert_eq!(state.active_calls.lock().await.len(), 0); + } + + #[tokio::test] + async fn list_devices() { + let state = test_state(); + let _r1 = state.register_ws("fp1", None).await; + let _r2 = state.register_ws("fp1", None).await; + + let devices = state.list_devices("fp1").await; + assert_eq!(devices.len(), 2); + } +}