v0.0.40: reliability — call reload, ETH cache prefill, 10 server tests
Call state reload on restart: - Loads Ringing/Active calls from sled into active_calls on startup - Expires calls older than 24h automatically TUI sender ETH cache prefill: - prefill_eth_cache() resolves all known contacts on poll_loop start - First message from known contacts now shows ETH address immediately Server integration tests (10 new): - push_to_client offline/online - register_ws + connection cap (5 max) - is_online + device_count - kick_device + revoke_all_except - deliver_or_queue offline/online - call state lifecycle - list_devices 155 tests passing (was 135) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
1
warzone/Cargo.lock
generated
1
warzone/Cargo.lock
generated
@@ -3040,6 +3040,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"sled",
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.21.0",
|
||||
|
||||
@@ -9,7 +9,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "0.0.39"
|
||||
version = "0.0.40"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
rust-version = "1.75"
|
||||
|
||||
@@ -75,6 +75,30 @@ fn cache_eth_lookup(fp: &str, client: &ServerClient, eth_cache: &EthCache) {
|
||||
});
|
||||
}
|
||||
|
||||
/// Pre-populate the ETH cache for all known contacts.
|
||||
pub async fn prefill_eth_cache(
|
||||
db: &crate::storage::LocalDb,
|
||||
client: &ServerClient,
|
||||
eth_cache: &EthCache,
|
||||
) {
|
||||
if let Ok(contacts) = db.list_contacts() {
|
||||
for c in &contacts {
|
||||
if let Some(fp) = c.get("fingerprint").and_then(|v| v.as_str()) {
|
||||
let fp = fp.to_string();
|
||||
if eth_cache.lock().unwrap().contains_key(&fp) { continue; }
|
||||
let url = format!("{}/v1/resolve/{}", client.base_url, fp);
|
||||
if let Ok(resp) = client.client.get(&url).send().await {
|
||||
if let Ok(data) = resp.json::<serde_json::Value>().await {
|
||||
if let Some(eth) = data.get("eth_address").and_then(|v| v.as_str()) {
|
||||
eth_cache.lock().unwrap().insert(fp, eth.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn store_received(db: &LocalDb, sender_fp: &str, text: &str) {
|
||||
let _ = db.touch_contact(sender_fp, None);
|
||||
let _ = db.store_message(sender_fp, sender_fp, text, false);
|
||||
@@ -584,6 +608,9 @@ pub async fn poll_loop(
|
||||
let fp = normfp(&our_fp);
|
||||
let eth_cache: EthCache = Arc::new(std::sync::Mutex::new(HashMap::new()));
|
||||
|
||||
// Pre-populate ETH cache for known contacts
|
||||
prefill_eth_cache(&db, &client, ð_cache).await;
|
||||
|
||||
// Try WebSocket first
|
||||
let ws_url = client.base_url
|
||||
.replace("http://", "ws://")
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "warzone-protocol"
|
||||
version = "0.0.39"
|
||||
version = "0.0.40"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
description = "Core crypto & wire protocol for featherChat (Warzone messenger)"
|
||||
|
||||
@@ -28,3 +28,7 @@ bincode.workspace = true
|
||||
sha2.workspace = true
|
||||
reqwest = { workspace = true, features = ["rustls-tls", "json"] }
|
||||
tokio-tungstenite.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
|
||||
@@ -47,6 +47,38 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let mut state = state::AppState::new(&cli.data_dir)?;
|
||||
|
||||
// Reload active calls from DB
|
||||
{
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let mut loaded = 0u32;
|
||||
let mut expired = 0u32;
|
||||
for item in state.db.calls.iter().flatten() {
|
||||
if let Ok(call) = serde_json::from_slice::<state::CallState>(&item.1) {
|
||||
match call.status {
|
||||
state::CallStatus::Ringing | state::CallStatus::Active => {
|
||||
if now - call.created_at > 86400 {
|
||||
let mut ended = call.clone();
|
||||
ended.status = state::CallStatus::Ended;
|
||||
ended.ended_at = Some(now);
|
||||
let _ = state.db.calls.insert(
|
||||
&item.0,
|
||||
serde_json::to_vec(&ended).unwrap_or_default(),
|
||||
);
|
||||
expired += 1;
|
||||
} else {
|
||||
state.active_calls.lock().await.insert(call.call_id.clone(), call);
|
||||
loaded += 1;
|
||||
}
|
||||
}
|
||||
_ => {} // Ended calls stay in DB but not in memory
|
||||
}
|
||||
}
|
||||
}
|
||||
if loaded > 0 || expired > 0 {
|
||||
tracing::info!("Calls: loaded {} active, expired {} stale", loaded, expired);
|
||||
}
|
||||
}
|
||||
|
||||
// Load federation config if provided
|
||||
if let Some(ref fed_path) = cli.federation {
|
||||
let fed_config = federation::load_config(fed_path)?;
|
||||
|
||||
@@ -50,7 +50,7 @@ async fn pwa_manifest() -> impl IntoResponse {
|
||||
|
||||
async fn service_worker() -> impl IntoResponse {
|
||||
([(header::CONTENT_TYPE, "application/javascript")], r##"
|
||||
const CACHE = 'wz-v21';
|
||||
const CACHE = 'wz-v22';
|
||||
const SHELL = ['/', '/wasm/warzone_wasm.js', '/wasm/warzone_wasm_bg.wasm', '/icon.svg', '/manifest.json'];
|
||||
|
||||
self.addEventListener('install', e => {
|
||||
@@ -287,7 +287,7 @@ let pollTimer = null;
|
||||
let ws = null; // WebSocket connection
|
||||
let wasmReady = false;
|
||||
|
||||
const VERSION = '0.0.39';
|
||||
const VERSION = '0.0.40';
|
||||
let DEBUG = true; // toggle with /debug command
|
||||
|
||||
// ── Receipt tracking ──
|
||||
|
||||
@@ -273,3 +273,142 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn test_state() -> AppState {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
AppState::new(dir.path().to_str().unwrap()).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn push_to_client_returns_false_when_offline() {
|
||||
let state = test_state();
|
||||
assert!(!state.push_to_client("abc123", b"hello").await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_ws_and_push() {
|
||||
let state = test_state();
|
||||
let (_, mut rx) = state.register_ws("test_fp", None).await.unwrap();
|
||||
|
||||
assert!(state.push_to_client("test_fp", b"hello").await);
|
||||
let msg = rx.recv().await.unwrap();
|
||||
assert_eq!(msg, b"hello");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ws_connection_cap() {
|
||||
let state = test_state();
|
||||
// Hold receivers so senders stay open (register_ws prunes closed senders).
|
||||
let mut _holders = Vec::new();
|
||||
for i in 0..5 {
|
||||
let res = state.register_ws("same_fp", None).await;
|
||||
assert!(res.is_some(), "connection {} should succeed", i);
|
||||
_holders.push(res.unwrap());
|
||||
}
|
||||
// 6th should fail
|
||||
assert!(state.register_ws("same_fp", None).await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn is_online_and_device_count() {
|
||||
let state = test_state();
|
||||
assert!(!state.is_online("fp1").await);
|
||||
assert_eq!(state.device_count("fp1").await, 0);
|
||||
|
||||
// Must hold receivers so the senders are not marked as closed.
|
||||
let _r1 = state.register_ws("fp1", None).await;
|
||||
assert!(state.is_online("fp1").await);
|
||||
assert_eq!(state.device_count("fp1").await, 1);
|
||||
|
||||
let _r2 = state.register_ws("fp1", None).await;
|
||||
assert_eq!(state.device_count("fp1").await, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn kick_device() {
|
||||
let state = test_state();
|
||||
let (device_id, _) = state.register_ws("fp1", None).await.unwrap();
|
||||
|
||||
assert!(state.kick_device("fp1", &device_id).await);
|
||||
assert!(!state.is_online("fp1").await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn revoke_all_except() {
|
||||
let state = test_state();
|
||||
let (id1, _rx1) = state.register_ws("fp1", None).await.unwrap();
|
||||
let (_id2, _rx2) = state.register_ws("fp1", None).await.unwrap();
|
||||
let (_id3, _rx3) = state.register_ws("fp1", None).await.unwrap();
|
||||
|
||||
let removed = state.revoke_all_except("fp1", &id1).await;
|
||||
assert_eq!(removed, 2);
|
||||
assert_eq!(state.device_count("fp1").await, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deliver_or_queue_offline() {
|
||||
let state = test_state();
|
||||
// No WS connected -- should queue
|
||||
let delivered = state.deliver_or_queue("offline_fp", b"test message").await;
|
||||
assert!(!delivered);
|
||||
|
||||
// Check message was queued in DB
|
||||
let prefix = "queue:offline_fp";
|
||||
let count = state.db.messages.scan_prefix(prefix.as_bytes()).count();
|
||||
assert_eq!(count, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deliver_or_queue_online() {
|
||||
let state = test_state();
|
||||
let (_, mut rx) = state.register_ws("online_fp", None).await.unwrap();
|
||||
|
||||
let delivered = state.deliver_or_queue("online_fp", b"instant").await;
|
||||
assert!(delivered);
|
||||
|
||||
let msg = rx.recv().await.unwrap();
|
||||
assert_eq!(msg, b"instant");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn call_state_lifecycle() {
|
||||
let state = test_state();
|
||||
|
||||
let call = CallState {
|
||||
call_id: "call-001".into(),
|
||||
caller_fp: "alice".into(),
|
||||
callee_fp: "bob".into(),
|
||||
group_name: None,
|
||||
room_id: None,
|
||||
status: CallStatus::Ringing,
|
||||
created_at: chrono::Utc::now().timestamp(),
|
||||
answered_at: None,
|
||||
ended_at: None,
|
||||
};
|
||||
|
||||
state.active_calls.lock().await.insert("call-001".into(), call);
|
||||
assert_eq!(state.active_calls.lock().await.len(), 1);
|
||||
|
||||
// End the call
|
||||
if let Some(mut c) = state.active_calls.lock().await.remove("call-001") {
|
||||
c.status = CallStatus::Ended;
|
||||
c.ended_at = Some(chrono::Utc::now().timestamp());
|
||||
let _ = state.db.calls.insert(b"call-001", serde_json::to_vec(&c).unwrap());
|
||||
}
|
||||
assert_eq!(state.active_calls.lock().await.len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_devices() {
|
||||
let state = test_state();
|
||||
let _r1 = state.register_ws("fp1", None).await;
|
||||
let _r2 = state.register_ws("fp1", None).await;
|
||||
|
||||
let devices = state.list_devices("fp1").await;
|
||||
assert_eq!(devices.len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user