diff --git a/warzone/Cargo.lock b/warzone/Cargo.lock index 4d6da74..0e78f9f 100644 --- a/warzone/Cargo.lock +++ b/warzone/Cargo.lock @@ -2330,7 +2330,9 @@ checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" dependencies = [ "futures-util", "log", + "native-tls", "tokio", + "tokio-native-tls", "tungstenite", ] @@ -2501,6 +2503,7 @@ dependencies = [ "http", "httparse", "log", + "native-tls", "rand", "sha1", "thiserror 1.0.69", @@ -2654,6 +2657,7 @@ dependencies = [ "chrono", "clap", "crossterm", + "futures-util", "hex", "libc", "rand", @@ -2663,8 +2667,10 @@ dependencies = [ "serde_json", "sled", "tokio", + "tokio-tungstenite", "tracing", "tracing-subscriber", + "url", "uuid", "warzone-protocol", "x25519-dalek", diff --git a/warzone/crates/warzone-client/Cargo.toml b/warzone/crates/warzone-client/Cargo.toml index 11312b9..3c611d8 100644 --- a/warzone/crates/warzone-client/Cargo.toml +++ b/warzone/crates/warzone-client/Cargo.toml @@ -27,3 +27,6 @@ bincode.workspace = true libc = "0.2" uuid.workspace = true chrono.workspace = true +tokio-tungstenite = { version = "0.24", features = ["native-tls"] } +futures-util = "0.3" +url = "2" diff --git a/warzone/crates/warzone-client/src/tui/app.rs b/warzone/crates/warzone-client/src/tui/app.rs index c858265..52ef0ff 100644 --- a/warzone/crates/warzone-client/src/tui/app.rs +++ b/warzone/crates/warzone-client/src/tui/app.rs @@ -631,7 +631,99 @@ fn normfp(fp: &str) -> String { fp.chars().filter(|c| c.is_ascii_hexdigit()).collect::().to_lowercase() } -/// Poll for incoming messages in the background. +/// Process a single incoming raw message (shared by WS and HTTP paths). +fn process_incoming( + raw: &[u8], + identity: &IdentityKeyPair, + db: &LocalDb, + messages: &Arc>>, +) { + match bincode::deserialize::(raw) { + Ok(wire) => process_wire_message(wire, identity, db, messages), + Err(_) => {} + } +} + +fn process_wire_message( + wire: WireMessage, + identity: &IdentityKeyPair, + db: &LocalDb, + messages: &Arc>>, +) { + match wire { + WireMessage::KeyExchange { + sender_fingerprint, + sender_identity_encryption_key, + ephemeral_public, + used_one_time_pre_key_id, + ratchet_message, + } => { + let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { + Ok(fp) => fp, + Err(_) => return, + }; + let spk_secret = match db.load_signed_pre_key(1) { + Ok(Some(s)) => s, + _ => return, + }; + let otpk_secret = if let Some(id) = used_one_time_pre_key_id { + db.take_one_time_pre_key(id).ok().flatten() + } else { + None + }; + let their_id_x25519 = PublicKey::from(sender_identity_encryption_key); + let their_eph = PublicKey::from(ephemeral_public); + let shared_secret = match x3dh::respond( + identity, &spk_secret, otpk_secret.as_ref(), &their_id_x25519, &their_eph, + ) { + Ok(s) => s, + Err(_) => return, + }; + let mut state = RatchetState::init_bob(shared_secret, spk_secret); + match state.decrypt(&ratchet_message) { + Ok(plaintext) => { + let text = String::from_utf8_lossy(&plaintext).to_string(); + let _ = db.save_session(&sender_fp, &state); + messages.lock().unwrap().push(ChatLine { + sender: sender_fingerprint[..sender_fingerprint.len().min(12)].to_string(), + text, + is_system: false, + is_self: false, + }); + } + Err(_) => {} + } + } + WireMessage::Message { + sender_fingerprint, + ratchet_message, + } => { + let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { + Ok(fp) => fp, + Err(_) => return, + }; + let mut state = match db.load_session(&sender_fp) { + Ok(Some(s)) => s, + _ => return, + }; + match state.decrypt(&ratchet_message) { + Ok(plaintext) => { + let text = String::from_utf8_lossy(&plaintext).to_string(); + let _ = db.save_session(&sender_fp, &state); + messages.lock().unwrap().push(ChatLine { + sender: sender_fingerprint[..sender_fingerprint.len().min(12)].to_string(), + text, + is_system: false, + is_self: false, + }); + } + Err(_) => {} + } + } + } +} + +/// Real-time message loop via WebSocket (falls back to HTTP polling). pub async fn poll_loop( messages: Arc>>, our_fp: String, @@ -639,97 +731,51 @@ pub async fn poll_loop( db: Arc, client: ServerClient, ) { + let fp = normfp(&our_fp); + + // Try WebSocket first + let ws_url = client.base_url + .replace("http://", "ws://") + .replace("https://", "wss://"); + let ws_url = format!("{}/v1/ws/{}", ws_url, fp); + loop { - tokio::time::sleep(Duration::from_secs(2)).await; + match tokio_tungstenite::connect_async(&ws_url).await { + Ok((ws_stream, _)) => { + messages.lock().unwrap().push(ChatLine { + sender: "system".into(), + text: "Real-time connection established".into(), + is_system: true, + is_self: false, + }); - let raw_msgs = match client.poll_messages(&our_fp).await { - Ok(m) => m, - Err(_) => continue, - }; + use futures_util::StreamExt; + let (_, mut read) = ws_stream.split(); - for raw in &raw_msgs { - match bincode::deserialize::(raw) { - Ok(WireMessage::KeyExchange { - sender_fingerprint, - sender_identity_encryption_key, - ephemeral_public, - used_one_time_pre_key_id, - ratchet_message, - }) => { - let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { - Ok(fp) => fp, - Err(_) => continue, - }; - - let spk_secret = match db.load_signed_pre_key(1) { - Ok(Some(s)) => s, - _ => continue, - }; - - let otpk_secret = if let Some(id) = used_one_time_pre_key_id { - db.take_one_time_pre_key(id).ok().flatten() - } else { - None - }; - - let their_id_x25519 = PublicKey::from(sender_identity_encryption_key); - let their_eph = PublicKey::from(ephemeral_public); - - let shared_secret = match x3dh::respond( - &identity, - &spk_secret, - otpk_secret.as_ref(), - &their_id_x25519, - &their_eph, - ) { - Ok(s) => s, - Err(_) => continue, - }; - - let mut state = RatchetState::init_bob(shared_secret, spk_secret); - match state.decrypt(&ratchet_message) { - Ok(plaintext) => { - let text = String::from_utf8_lossy(&plaintext).to_string(); - let _ = db.save_session(&sender_fp, &state); - messages.lock().unwrap().push(ChatLine { - sender: sender_fingerprint[..12].to_string(), - text, - is_system: false, - is_self: false, - }); - } - Err(_) => continue, + while let Some(Ok(msg)) = read.next().await { + if let tokio_tungstenite::tungstenite::Message::Binary(data) = msg { + process_incoming(&data, &identity, &db, &messages); } } - Ok(WireMessage::Message { - sender_fingerprint, - ratchet_message, - }) => { - let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) { - Ok(fp) => fp, - Err(_) => continue, - }; - let mut state = match db.load_session(&sender_fp) { - Ok(Some(s)) => s, - _ => continue, - }; - - match state.decrypt(&ratchet_message) { - Ok(plaintext) => { - let text = String::from_utf8_lossy(&plaintext).to_string(); - let _ = db.save_session(&sender_fp, &state); - messages.lock().unwrap().push(ChatLine { - sender: sender_fingerprint[..12].to_string(), - text, - is_system: false, - is_self: false, - }); - } - Err(_) => continue, - } + messages.lock().unwrap().push(ChatLine { + sender: "system".into(), + text: "Connection lost, reconnecting...".into(), + is_system: true, + is_self: false, + }); + tokio::time::sleep(Duration::from_secs(3)).await; + } + Err(_) => { + // Fallback to HTTP polling + tokio::time::sleep(Duration::from_secs(2)).await; + let raw_msgs = match client.poll_messages(&our_fp).await { + Ok(m) => m, + Err(_) => continue, + }; + for raw in &raw_msgs { + process_incoming(raw, &identity, &db, &messages); } - Err(_) => continue, } } }