TUI client: WebSocket with HTTP fallback

poll_loop now:
1. Tries WebSocket connection to /v1/ws/<fingerprint>
2. On success: receives messages in real-time (instant push)
3. On disconnect: reconnects after 3 seconds
4. On WS failure: falls back to HTTP polling every 2 seconds

Refactored message processing into shared functions:
- process_incoming() handles raw bytes
- process_wire_message() handles deserialized WireMessage
- Used by both WS and HTTP paths

Both CLI TUI and web client now use WebSocket:
- No more HTTP polling spam in server logs
- Messages arrive instantly on both clients
- HTTP poll kept as fallback for scripts/mules

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-27 09:49:46 +04:00
parent c8a95e27e4
commit fe2b7d8e8a
3 changed files with 140 additions and 85 deletions

6
warzone/Cargo.lock generated
View File

@@ -2330,7 +2330,9 @@ checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite",
]
@@ -2501,6 +2503,7 @@ dependencies = [
"http",
"httparse",
"log",
"native-tls",
"rand",
"sha1",
"thiserror 1.0.69",
@@ -2654,6 +2657,7 @@ dependencies = [
"chrono",
"clap",
"crossterm",
"futures-util",
"hex",
"libc",
"rand",
@@ -2663,8 +2667,10 @@ dependencies = [
"serde_json",
"sled",
"tokio",
"tokio-tungstenite",
"tracing",
"tracing-subscriber",
"url",
"uuid",
"warzone-protocol",
"x25519-dalek",

View File

@@ -27,3 +27,6 @@ bincode.workspace = true
libc = "0.2"
uuid.workspace = true
chrono.workspace = true
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
futures-util = "0.3"
url = "2"

View File

@@ -631,7 +631,99 @@ fn normfp(fp: &str) -> String {
fp.chars().filter(|c| c.is_ascii_hexdigit()).collect::<String>().to_lowercase()
}
/// Poll for incoming messages in the background.
/// Process a single incoming raw message (shared by WS and HTTP paths).
fn process_incoming(
raw: &[u8],
identity: &IdentityKeyPair,
db: &LocalDb,
messages: &Arc<Mutex<Vec<ChatLine>>>,
) {
match bincode::deserialize::<WireMessage>(raw) {
Ok(wire) => process_wire_message(wire, identity, db, messages),
Err(_) => {}
}
}
fn process_wire_message(
wire: WireMessage,
identity: &IdentityKeyPair,
db: &LocalDb,
messages: &Arc<Mutex<Vec<ChatLine>>>,
) {
match wire {
WireMessage::KeyExchange {
sender_fingerprint,
sender_identity_encryption_key,
ephemeral_public,
used_one_time_pre_key_id,
ratchet_message,
} => {
let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) {
Ok(fp) => fp,
Err(_) => return,
};
let spk_secret = match db.load_signed_pre_key(1) {
Ok(Some(s)) => s,
_ => return,
};
let otpk_secret = if let Some(id) = used_one_time_pre_key_id {
db.take_one_time_pre_key(id).ok().flatten()
} else {
None
};
let their_id_x25519 = PublicKey::from(sender_identity_encryption_key);
let their_eph = PublicKey::from(ephemeral_public);
let shared_secret = match x3dh::respond(
identity, &spk_secret, otpk_secret.as_ref(), &their_id_x25519, &their_eph,
) {
Ok(s) => s,
Err(_) => return,
};
let mut state = RatchetState::init_bob(shared_secret, spk_secret);
match state.decrypt(&ratchet_message) {
Ok(plaintext) => {
let text = String::from_utf8_lossy(&plaintext).to_string();
let _ = db.save_session(&sender_fp, &state);
messages.lock().unwrap().push(ChatLine {
sender: sender_fingerprint[..sender_fingerprint.len().min(12)].to_string(),
text,
is_system: false,
is_self: false,
});
}
Err(_) => {}
}
}
WireMessage::Message {
sender_fingerprint,
ratchet_message,
} => {
let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) {
Ok(fp) => fp,
Err(_) => return,
};
let mut state = match db.load_session(&sender_fp) {
Ok(Some(s)) => s,
_ => return,
};
match state.decrypt(&ratchet_message) {
Ok(plaintext) => {
let text = String::from_utf8_lossy(&plaintext).to_string();
let _ = db.save_session(&sender_fp, &state);
messages.lock().unwrap().push(ChatLine {
sender: sender_fingerprint[..sender_fingerprint.len().min(12)].to_string(),
text,
is_system: false,
is_self: false,
});
}
Err(_) => {}
}
}
}
}
/// Real-time message loop via WebSocket (falls back to HTTP polling).
pub async fn poll_loop(
messages: Arc<Mutex<Vec<ChatLine>>>,
our_fp: String,
@@ -639,97 +731,51 @@ pub async fn poll_loop(
db: Arc<LocalDb>,
client: ServerClient,
) {
let fp = normfp(&our_fp);
// Try WebSocket first
let ws_url = client.base_url
.replace("http://", "ws://")
.replace("https://", "wss://");
let ws_url = format!("{}/v1/ws/{}", ws_url, fp);
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
match tokio_tungstenite::connect_async(&ws_url).await {
Ok((ws_stream, _)) => {
messages.lock().unwrap().push(ChatLine {
sender: "system".into(),
text: "Real-time connection established".into(),
is_system: true,
is_self: false,
});
let raw_msgs = match client.poll_messages(&our_fp).await {
Ok(m) => m,
Err(_) => continue,
};
use futures_util::StreamExt;
let (_, mut read) = ws_stream.split();
for raw in &raw_msgs {
match bincode::deserialize::<WireMessage>(raw) {
Ok(WireMessage::KeyExchange {
sender_fingerprint,
sender_identity_encryption_key,
ephemeral_public,
used_one_time_pre_key_id,
ratchet_message,
}) => {
let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) {
Ok(fp) => fp,
Err(_) => continue,
};
let spk_secret = match db.load_signed_pre_key(1) {
Ok(Some(s)) => s,
_ => continue,
};
let otpk_secret = if let Some(id) = used_one_time_pre_key_id {
db.take_one_time_pre_key(id).ok().flatten()
} else {
None
};
let their_id_x25519 = PublicKey::from(sender_identity_encryption_key);
let their_eph = PublicKey::from(ephemeral_public);
let shared_secret = match x3dh::respond(
&identity,
&spk_secret,
otpk_secret.as_ref(),
&their_id_x25519,
&their_eph,
) {
Ok(s) => s,
Err(_) => continue,
};
let mut state = RatchetState::init_bob(shared_secret, spk_secret);
match state.decrypt(&ratchet_message) {
Ok(plaintext) => {
let text = String::from_utf8_lossy(&plaintext).to_string();
let _ = db.save_session(&sender_fp, &state);
messages.lock().unwrap().push(ChatLine {
sender: sender_fingerprint[..12].to_string(),
text,
is_system: false,
is_self: false,
});
}
Err(_) => continue,
while let Some(Ok(msg)) = read.next().await {
if let tokio_tungstenite::tungstenite::Message::Binary(data) = msg {
process_incoming(&data, &identity, &db, &messages);
}
}
Ok(WireMessage::Message {
sender_fingerprint,
ratchet_message,
}) => {
let sender_fp = match Fingerprint::from_hex(&sender_fingerprint) {
Ok(fp) => fp,
Err(_) => continue,
};
let mut state = match db.load_session(&sender_fp) {
Ok(Some(s)) => s,
_ => continue,
};
match state.decrypt(&ratchet_message) {
Ok(plaintext) => {
let text = String::from_utf8_lossy(&plaintext).to_string();
let _ = db.save_session(&sender_fp, &state);
messages.lock().unwrap().push(ChatLine {
sender: sender_fingerprint[..12].to_string(),
text,
is_system: false,
is_self: false,
});
}
Err(_) => continue,
}
messages.lock().unwrap().push(ChatLine {
sender: "system".into(),
text: "Connection lost, reconnecting...".into(),
is_system: true,
is_self: false,
});
tokio::time::sleep(Duration::from_secs(3)).await;
}
Err(_) => {
// Fallback to HTTP polling
tokio::time::sleep(Duration::from_secs(2)).await;
let raw_msgs = match client.poll_messages(&our_fp).await {
Ok(m) => m,
Err(_) => continue,
};
for raw in &raw_msgs {
process_incoming(raw, &identity, &db, &messages);
}
Err(_) => continue,
}
}
}