use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use sha2::{Sha256, Digest}; use warzone_protocol::identity::IdentityKeyPair; use warzone_protocol::message::{ReceiptType, WireMessage}; use warzone_protocol::ratchet::RatchetState; use warzone_protocol::types::Fingerprint; use warzone_protocol::x3dh; use x25519_dalek::PublicKey; use crate::net::ServerClient; use crate::storage::LocalDb; use chrono::Local; use super::types::{ChatLine, PendingFileTransfer, ReceiptStatus, normfp}; /// Send a delivery receipt for a message back to its sender. fn send_receipt( our_fp: &str, sender_fp: &str, message_id: &str, receipt_type: ReceiptType, client: &ServerClient, ) { let receipt = WireMessage::Receipt { sender_fingerprint: our_fp.to_string(), message_id: message_id.to_string(), receipt_type, }; let encoded = match bincode::serialize(&receipt) { Ok(e) => e, Err(_) => return, }; let client = client.clone(); let to = sender_fp.to_string(); let from = our_fp.to_string(); tokio::spawn(async move { let _ = client.send_message(&to, Some(&from), &encoded).await; }); } /// ETH address cache: fingerprint → ETH address (populated async, read sync). pub type EthCache = Arc>>; /// Display a fingerprint as short ETH address if cached, otherwise truncated fingerprint. fn display_sender(fp: &str, eth_cache: &EthCache) -> String { let cache = eth_cache.lock().unwrap(); if let Some(eth) = cache.get(fp) { format!("{}...", ð[..eth.len().min(12)]) } else { fp[..fp.len().min(12)].to_string() } } /// Async: look up ETH address for a fingerprint and cache it. fn cache_eth_lookup(fp: &str, client: &ServerClient, eth_cache: &EthCache) { let fp = fp.to_string(); let client = client.clone(); let cache = eth_cache.clone(); // Check if already cached if cache.lock().unwrap().contains_key(&fp) { return; } tokio::spawn(async move { let url = format!("{}/v1/resolve/{}", client.base_url, fp); if let Ok(resp) = client.client.get(&url).send().await { if let Ok(data) = resp.json::().await { if let Some(eth) = data.get("eth_address").and_then(|v| v.as_str()) { cache.lock().unwrap().insert(fp, eth.to_string()); } } } }); } fn store_received(db: &LocalDb, sender_fp: &str, text: &str) { let _ = db.touch_contact(sender_fp, None); let _ = db.store_message(sender_fp, sender_fp, text, false); } /// Process a single incoming raw message (shared by WS and HTTP paths). pub fn process_incoming( raw: &[u8], identity: &IdentityKeyPair, db: &LocalDb, messages: &Arc>>, receipts: &Arc>>, pending_files: &Arc>>, our_fp: &str, client: &ServerClient, eth_cache: &EthCache, last_dm_peer: &Arc>>, ) { match bincode::deserialize::(raw) { Ok(wire) => process_wire_message(wire, identity, db, messages, receipts, pending_files, our_fp, client, last_dm_peer, eth_cache), Err(_) => {} } } fn process_wire_message( wire: WireMessage, identity: &IdentityKeyPair, db: &LocalDb, messages: &Arc>>, receipts: &Arc>>, pending_files: &Arc>>, our_fp: &str, client: &ServerClient, last_dm_peer: &Arc>>, eth_cache: &EthCache, ) { match wire { WireMessage::KeyExchange { id, sender_fingerprint, sender_identity_encryption_key, ephemeral_public, used_one_time_pre_key_id, ratchet_message, } => { let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { Ok(fp) => fp, Err(_) => return, }; let spk_secret = match db.load_signed_pre_key(1) { Ok(Some(s)) => s, _ => return, }; let otpk_secret = if let Some(otpk_id) = used_one_time_pre_key_id { db.take_one_time_pre_key(otpk_id).ok().flatten() } else { None }; let their_id_x25519 = PublicKey::from(sender_identity_encryption_key); let their_eph = PublicKey::from(ephemeral_public); let shared_secret = match x3dh::respond( identity, &spk_secret, otpk_secret.as_ref(), &their_id_x25519, &their_eph, ) { Ok(s) => s, Err(_) => return, }; let mut state = RatchetState::init_bob(shared_secret, spk_secret); match state.decrypt(&ratchet_message) { Ok(plaintext) => { let text = String::from_utf8_lossy(&plaintext).to_string(); let _ = db.save_session(&sender_fp, &state); if normfp(&sender_fingerprint) != normfp(our_fp) { *last_dm_peer.lock().unwrap() = Some(sender_fingerprint.clone()); } store_received(db, &sender_fingerprint, &text); messages.lock().unwrap().push(ChatLine { sender: { cache_eth_lookup(&sender_fingerprint, client, eth_cache); display_sender(&sender_fingerprint, eth_cache) }, text, is_system: false, is_self: false, message_id: None, timestamp: Local::now(), }); send_receipt(our_fp, &sender_fingerprint, &id, ReceiptType::Delivered, client); // Terminal bell for incoming DM print!("\x07"); } Err(e) => { // Session auto-recovery: delete corrupted session, show warning let _ = db.delete_session(&sender_fp); messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "[session reset] Decryption failed for {}. Session cleared — next message will re-establish.", &sender_fingerprint[..sender_fingerprint.len().min(12)] ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); tracing::warn!("Session auto-recovery: cleared session for {} after decrypt error: {}", sender_fingerprint, e); } } } WireMessage::Message { id, sender_fingerprint, ratchet_message, } => { let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { Ok(fp) => fp, Err(_) => return, }; let mut state = match db.load_session(&sender_fp) { Ok(Some(s)) => s, _ => return, }; match state.decrypt(&ratchet_message) { Ok(plaintext) => { let text = String::from_utf8_lossy(&plaintext).to_string(); let _ = db.save_session(&sender_fp, &state); if normfp(&sender_fingerprint) != normfp(our_fp) { *last_dm_peer.lock().unwrap() = Some(sender_fingerprint.clone()); } store_received(db, &sender_fingerprint, &text); messages.lock().unwrap().push(ChatLine { sender: { cache_eth_lookup(&sender_fingerprint, client, eth_cache); display_sender(&sender_fingerprint, eth_cache) }, text, is_system: false, is_self: false, message_id: None, timestamp: Local::now(), }); send_receipt(our_fp, &sender_fingerprint, &id, ReceiptType::Delivered, client); // Terminal bell for incoming DM print!("\x07"); } Err(e) => { // Session auto-recovery: delete corrupted session, show warning let _ = db.delete_session(&sender_fp); messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "[session reset] Decryption failed for {}. Session cleared — next message will re-establish.", &sender_fingerprint[..sender_fingerprint.len().min(12)] ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); tracing::warn!("Session auto-recovery: cleared session for {} after decrypt error: {}", sender_fingerprint, e); } } } WireMessage::Receipt { sender_fingerprint: _, message_id, receipt_type, } => { // Update receipt status for the referenced message let mut r = receipts.lock().unwrap(); let current = r.get(&message_id); let should_update = match (&receipt_type, current) { (ReceiptType::Read, _) => true, (ReceiptType::Delivered, Some(ReceiptStatus::Sent)) => true, (ReceiptType::Delivered, None) => true, _ => false, }; if should_update { let new_status = match receipt_type { ReceiptType::Delivered => ReceiptStatus::Delivered, ReceiptType::Read => ReceiptStatus::Read, }; r.insert(message_id, new_status); } } WireMessage::FileHeader { id, sender_fingerprint, filename, file_size, total_chunks, sha256, } => { let short_sender = &sender_fingerprint[..sender_fingerprint.len().min(12)]; messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "Incoming file '{}' from {} ({} bytes, {} chunks)", filename, short_sender, file_size, total_chunks ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); let transfer = PendingFileTransfer { filename, total_chunks, received: 0, chunks: vec![None; total_chunks as usize], sha256, file_size, }; pending_files.lock().unwrap().insert(id, transfer); } WireMessage::FileChunk { id, sender_fingerprint, filename: _, chunk_index, total_chunks: _, data, } => { // Decrypt the chunk data using our ratchet session with the sender let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { Ok(fp) => fp, Err(_) => return, }; let mut state = match db.load_session(&sender_fp) { Ok(Some(s)) => s, _ => return, }; // The data field is a bincode-serialized RatchetMessage let ratchet_msg = match bincode::deserialize(&data) { Ok(m) => m, Err(_) => return, }; let plaintext = match state.decrypt(&ratchet_msg) { Ok(pt) => { let _ = db.save_session(&sender_fp, &state); pt } Err(_) => return, }; let mut pf = pending_files.lock().unwrap(); if let Some(transfer) = pf.get_mut(&id) { if (chunk_index as usize) < transfer.chunks.len() { if transfer.chunks[chunk_index as usize].is_none() { transfer.chunks[chunk_index as usize] = Some(plaintext); transfer.received += 1; } messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "Receiving {} [{}/{}]...", transfer.filename, transfer.received, transfer.total_chunks ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); // Check if all chunks received if transfer.received == transfer.total_chunks { let mut assembled = Vec::with_capacity(transfer.file_size as usize); for chunk in &transfer.chunks { if let Some(data) = chunk { assembled.extend_from_slice(data); } } // Verify SHA-256 let mut hasher = Sha256::new(); hasher.update(&assembled); let computed_hash = format!("{:x}", hasher.finalize()); if computed_hash != transfer.sha256 { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "File '{}' integrity check FAILED (hash mismatch)", transfer.filename ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } else { // Save to data_dir/downloads/ let download_dir = crate::keystore::data_dir().join("downloads"); let _ = std::fs::create_dir_all(&download_dir); let save_path = download_dir.join(&transfer.filename); match std::fs::write(&save_path, &assembled) { Ok(_) => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "File saved: {}", save_path.display() ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } Err(e) => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!("Failed to save file: {}", e), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } } } // Remove completed transfer pf.remove(&id); } } } else { // Received chunk without header — ignore } } WireMessage::GroupSenderKey { id: _, sender_fingerprint, group_name, generation, counter, ciphertext, } => { match db.load_sender_key(&sender_fingerprint, &group_name) { Ok(Some(mut sender_key)) => { let msg = warzone_protocol::sender_keys::SenderKeyMessage { sender_fingerprint: sender_fingerprint.clone(), group_name: group_name.clone(), generation, counter, ciphertext, }; match sender_key.decrypt(&msg) { Ok(plaintext) => { let text = String::from_utf8_lossy(&plaintext).to_string(); // Save updated sender key (counter advanced) let _ = db.save_sender_key(&sender_fingerprint, &group_name, &sender_key); store_received(db, &sender_fingerprint, &text); messages.lock().unwrap().push(ChatLine { sender: format!( "{} [#{}]", &sender_fingerprint[..sender_fingerprint.len().min(12)], group_name ), text, is_system: false, is_self: false, message_id: None, timestamp: Local::now(), }); } Err(e) => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "[group #{}] decrypt failed from {}: {}", group_name, &sender_fingerprint[..sender_fingerprint.len().min(12)], e ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } } } _ => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "[group #{}] no sender key for {} — key distribution needed", group_name, &sender_fingerprint[..sender_fingerprint.len().min(12)] ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } } } WireMessage::SenderKeyDistribution { sender_fingerprint, group_name, chain_key, generation, } => { let dist = warzone_protocol::sender_keys::SenderKeyDistribution { sender_fingerprint: sender_fingerprint.clone(), group_name: group_name.clone(), chain_key, generation, }; let sender_key = dist.into_sender_key(); let _ = db.save_sender_key(&sender_fingerprint, &group_name, &sender_key); messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!( "Received sender key from {} for #{}", &sender_fingerprint[..sender_fingerprint.len().min(12)], group_name ), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } WireMessage::CallSignal { id: _, sender_fingerprint, signal_type, payload: _, target: _, } => { use warzone_protocol::message::CallSignalType; let sender_short = { cache_eth_lookup(&sender_fingerprint, client, eth_cache); display_sender(&sender_fingerprint, eth_cache) }; match signal_type { CallSignalType::Offer => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!("\u{1f4de} Incoming call from {} \u{2014} /accept or /reject", sender_short), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); // Terminal bell for incoming call print!("\x07"); } CallSignalType::Answer => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!("\u{2713} {} accepted the call", sender_short), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } CallSignalType::Hangup => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: "Call ended".into(), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } CallSignalType::Reject => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!("{} rejected the call", sender_short), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } CallSignalType::Ringing => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: "Ringing...".into(), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } CallSignalType::Busy => { messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!("{} is busy", sender_short), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); } _ => { messages.lock().unwrap().push(ChatLine { sender: sender_short, text: format!("\u{1f4de} Call signal: {:?}", signal_type), is_system: false, is_self: false, message_id: None, timestamp: Local::now(), }); } } } } } /// Real-time message loop via WebSocket (falls back to HTTP polling). pub async fn poll_loop( messages: Arc>>, receipts: Arc>>, pending_files: Arc>>, our_fp: String, identity: IdentityKeyPair, db: Arc, client: ServerClient, last_dm_peer: Arc>>, connected: Arc, ) { let fp = normfp(&our_fp); let eth_cache: EthCache = Arc::new(std::sync::Mutex::new(HashMap::new())); // Try WebSocket first let ws_url = client.base_url .replace("http://", "ws://") .replace("https://", "wss://"); let ws_url = format!("{}/v1/ws/{}", ws_url, fp); loop { match tokio_tungstenite::connect_async(&ws_url).await { Ok((ws_stream, _)) => { connected.store(true, Ordering::Relaxed); messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: "Real-time connection established".into(), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); use futures_util::StreamExt; let (_, mut read) = ws_stream.split(); while let Some(Ok(msg)) = read.next().await { match msg { tokio_tungstenite::tungstenite::Message::Binary(data) => { process_incoming(&data, &identity, &db, &messages, &receipts, &pending_files, &our_fp, &client, ð_cache, &last_dm_peer); } tokio_tungstenite::tungstenite::Message::Text(text) => { if let Ok(json) = serde_json::from_str::(&text) { if json.get("type").and_then(|v| v.as_str()) == Some("missed_call") { let data = json.get("data").cloned().unwrap_or_default(); let caller = data.get("caller_fp").and_then(|v| v.as_str()).unwrap_or("unknown"); let ts = data.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0); let when = chrono::DateTime::from_timestamp(ts, 0) .map(|dt| dt.with_timezone(&Local).format("%H:%M").to_string()) .unwrap_or_else(|| "?".to_string()); messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: format!("\u{1f4de} Missed call from {} at {}", &caller[..caller.len().min(12)], when), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); print!("\x07"); } else if json.get("type").and_then(|v| v.as_str()) == Some("bot_message") { let from = json.get("from_name").or(json.get("from")).and_then(|v| v.as_str()).unwrap_or("bot"); let text_content = json.get("text").and_then(|v| v.as_str()).unwrap_or(""); messages.lock().unwrap().push(ChatLine { sender: format!("@{}", from), text: text_content.to_string(), is_system: false, is_self: false, message_id: None, timestamp: Local::now(), }); print!("\x07"); } } } _ => {} } } connected.store(false, Ordering::Relaxed); messages.lock().unwrap().push(ChatLine { sender: "system".into(), text: "Connection lost, reconnecting...".into(), is_system: true, is_self: false, message_id: None, timestamp: Local::now(), }); tokio::time::sleep(Duration::from_secs(3)).await; } Err(_) => { connected.store(false, Ordering::Relaxed); // Fallback to HTTP polling tokio::time::sleep(Duration::from_secs(2)).await; let raw_msgs = match client.poll_messages(&our_fp).await { Ok(m) => m, Err(_) => continue, }; for raw in &raw_msgs { process_incoming(raw, &identity, &db, &messages, &receipts, &pending_files, &our_fp, &client, ð_cache, &last_dm_peer); } } } } }