Fetch-and-delete: server deletes messages after poll delivery
poll_messages now collects all queued messages, returns them, then deletes them from sled. No more duplicate delivery. This is correct for store-and-forward: once the client receives the messages, the server's job is done. If the client crashes before processing, the messages are lost — acceptable for Phase 1. Phase 2 can add explicit ack-based delivery if needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -22,37 +22,59 @@ struct SendRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn normalize_fp(fp: &str) -> String {
|
fn normalize_fp(fp: &str) -> String {
|
||||||
fp.chars().filter(|c| c.is_ascii_hexdigit()).collect::<String>().to_lowercase()
|
fp.chars()
|
||||||
|
.filter(|c| c.is_ascii_hexdigit())
|
||||||
|
.collect::<String>()
|
||||||
|
.to_lowercase()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_message(
|
async fn send_message(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Json(req): Json<SendRequest>,
|
Json(req): Json<SendRequest>,
|
||||||
) -> AppResult<Json<serde_json::Value>> {
|
) -> AppResult<Json<serde_json::Value>> {
|
||||||
let key = format!("queue:{}", normalize_fp(&req.to));
|
let to = normalize_fp(&req.to);
|
||||||
state.db.messages.insert(
|
let key = format!("queue:{}:{}", to, uuid::Uuid::new_v4());
|
||||||
format!("{}:{}", key, uuid::Uuid::new_v4()).as_bytes(),
|
tracing::info!("Queuing message for {} ({} bytes)", to, req.message.len());
|
||||||
req.message,
|
state.db.messages.insert(key.as_bytes(), req.message)?;
|
||||||
)?;
|
|
||||||
Ok(Json(serde_json::json!({ "ok": true })))
|
Ok(Json(serde_json::json!({ "ok": true })))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Poll fetches all queued messages and deletes them from the server.
|
||||||
|
/// This is store-and-forward: once delivered, the server drops them.
|
||||||
async fn poll_messages(
|
async fn poll_messages(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(fingerprint): Path<String>,
|
Path(fingerprint): Path<String>,
|
||||||
) -> AppResult<Json<Vec<String>>> {
|
) -> AppResult<Json<Vec<String>>> {
|
||||||
let prefix = format!("queue:{}", normalize_fp(&fingerprint));
|
let prefix = format!("queue:{}", normalize_fp(&fingerprint));
|
||||||
let mut messages = Vec::new();
|
let mut messages = Vec::new();
|
||||||
|
let mut keys_to_delete = Vec::new();
|
||||||
|
|
||||||
for item in state.db.messages.scan_prefix(prefix.as_bytes()) {
|
for item in state.db.messages.scan_prefix(prefix.as_bytes()) {
|
||||||
let (_, value) = item?;
|
let (key, value) = item?;
|
||||||
messages.push(base64::Engine::encode(
|
messages.push(base64::Engine::encode(
|
||||||
&base64::engine::general_purpose::STANDARD,
|
&base64::engine::general_purpose::STANDARD,
|
||||||
&value,
|
&value,
|
||||||
));
|
));
|
||||||
|
keys_to_delete.push(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete after collecting (fetch-and-delete)
|
||||||
|
for key in &keys_to_delete {
|
||||||
|
state.db.messages.remove(key)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !messages.is_empty() {
|
||||||
|
tracing::info!(
|
||||||
|
"Delivered {} message(s) to {}, deleted from queue",
|
||||||
|
messages.len(),
|
||||||
|
normalize_fp(&fingerprint)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Json(messages))
|
Ok(Json(messages))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Explicit ack endpoint (for future use with selective delivery).
|
||||||
async fn ack_message(
|
async fn ack_message(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(id): Path<String>,
|
Path(id): Path<String>,
|
||||||
|
|||||||
Reference in New Issue
Block a user