From 20375eceb91bd7b94c5a39c0f7cf9ff10dde8b77 Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Sat, 11 Apr 2026 18:40:11 +0400 Subject: [PATCH] feat(signal): transparent reconnect + auto-swap on relay change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related UX fixes, same state-machine surface: 1. Relay drops / goes offline / restarts: the client now auto- reconnects in the background instead of silently falling to "not registered" and requiring the user to tap Deregister + Register. 2. User switches relay in settings: client auto-swaps — close old transport, register against new, all transparent. ## Signal state additions (desktop/src-tauri/src/lib.rs) - `SignalState.desired_relay_addr: Option` — what the user CURRENTLY wants. `Some(x)` means "keep me connected to x", `None` means "user explicitly asked for idle". This is the pivot that distinguishes "connection dropped, retry" from "user deregistered, stop". - `SignalState.reconnect_in_progress: bool` — single-flight guard so concurrent triggers (recv-loop exit + manual register_signal + another recv-loop exit after a brief success) don't spawn duplicate supervisors. ## Refactor The old `register_signal` Tauri command was doing the whole connect + Register + spawn-recv-loop flow inline. Split into: - `internal_deregister(signal_state, keep_desired)` — shared teardown helper that nulls out transport/endpoint/call state and optionally clears `desired_relay_addr`. - `do_register_signal(signal_state, app, relay)` — core connect + register + spawn-recv-loop flow, callable from both the Tauri command and the reconnect supervisor. Returns an explicit `impl Future<...> + Send` to avoid auto-trait inference bailing inside the tokio::spawn chain (rustc loses the Send trail through the recv-loop spawn inside the fn body). - `register_signal` Tauri command — now thin: if already registered to the same relay, no-op; otherwise internal_deregister(keep_desired=false), set desired_relay_addr = Some(new), call do_register_signal. The Rust side handles the "change of server" transition entirely on its own, no deregister+register dance from JS needed. - `deregister` Tauri command — internal_deregister(keep_desired = false) so the recv-loop exit path sees the cleared desired addr and does NOT spawn a supervisor. ## Reconnect supervisor New `signal_reconnect_supervisor(signal_state, app, relay)` task. Spawned from the recv-loop exit path when the loop exits unexpectedly AND `desired_relay_addr.is_some()` AND no supervisor is already running. - Exponential backoff: 1s, 2s, 4s, 8s, 15s, 30s (capped at 30s, never gives up). First attempt is immediate (attempt 0 skips the wait). - On each iteration checks whether `desired_relay_addr` was cleared (user deregistered mid-flight) or another path already re-registered; either short-circuits the supervisor. - Also detects if the user changed relays while the supervisor was sleeping — resets the backoff counter and retries against the new addr. - On success, exits so the newly-spawned recv loop owns the connection from that point. If THAT drops again, a fresh supervisor spawns. - Emits `call-debug-log` and `signal-event` events at every state transition so the GUI can display "reconnecting...", "registered" banners. ## UI wiring (desktop/src/main.ts) - signal-event handler gets two new cases: - `"reconnecting"` — amber "🔄 reconnecting to …" in the registered banner area - `"registered"` — green "✓ registered (…)" to clear the reconnecting badge - Relay-selection click handler checks if a signal is currently registered and, if the user picked a different relay, fires `register_signal` with the new address. Rust side handles the swap transparently. Full workspace test: 423 passing (no regressions). Co-Authored-By: Claude Opus 4.6 (1M context) --- desktop/src-tauri/src/lib.rs | 285 +++++++++++++++++++++++++++++++++-- desktop/src/main.ts | 48 ++++++ 2 files changed, 319 insertions(+), 14 deletions(-) diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index a28210f..5da51d3 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -648,6 +648,23 @@ struct SignalState { /// Tauri command to compute the deterministic role for the /// dual-path QUIC race against `peer_direct_addr`. own_reflex_addr: Option, + /// The relay address the user currently wants to be registered + /// against. `Some` means "keep me connected" — the supervisor + /// will auto-reconnect after unexpected drops. `None` means + /// "user explicitly deregistered" — do not retry. + /// + /// Distinguishing these two cases is what lets relay + /// restarts + transient network blips be transparent to the + /// user: the recv loop dies, but because `desired_relay_addr` + /// is still set, a supervisor task retries the full + /// connect+register flow with exponential backoff until the + /// relay is reachable again. + desired_relay_addr: Option, + /// Single-flight guard: `true` while the reconnect supervisor + /// task is actively trying to re-establish the signal + /// connection. Prevents duplicate supervisors from spawning + /// (recv loop exit races with a manual register_signal call). + reconnect_in_progress: bool, } #[tauri::command] @@ -656,6 +673,80 @@ async fn register_signal( app: tauri::AppHandle, relay: String, ) -> Result { + // Set the desired relay and handle the "already registered to + // a different relay" transition. This is the public entry + // point — settings-screen changes come through here. + let already_same = { + let sig = state.signal.lock().await; + sig.transport.is_some() + && sig.desired_relay_addr.as_deref() == Some(relay.as_str()) + }; + if already_same { + // Idempotent: user hit "Register" twice on the same relay, + // or the JS side re-called after a settings save that + // didn't actually change the relay. + let sig = state.signal.lock().await; + return Ok(sig.fingerprint.clone()); + } + + // Tear down any existing registration (different relay → swap). + internal_deregister(&state.signal, /*keep_desired=*/ false).await; + + // Announce the new desired state so the recv-loop exit path and + // any running supervisor can see it. + { + let mut sig = state.signal.lock().await; + sig.desired_relay_addr = Some(relay.clone()); + } + + do_register_signal(state.signal.clone(), app, relay).await +} + +/// Close the current signal transport + clear derived state. +/// Used by `deregister` (with `keep_desired = false`, clearing +/// `desired_relay_addr`) and by the relay-swap path in +/// `register_signal` (also `keep_desired = false` — the caller +/// is about to set a new desired addr). +async fn internal_deregister( + signal_state: &Arc>, + keep_desired: bool, +) { + let mut sig = signal_state.lock().await; + if let Some(t) = sig.transport.take() { + // Dropping the transport Arc closes the quinn connection; + // calling close() explicitly is a no-op but neat. + let _ = t.close().await; + } + sig.endpoint = None; + sig.signal_status = "idle".into(); + sig.incoming_call_id = None; + sig.incoming_caller_fp = None; + sig.incoming_caller_alias = None; + sig.pending_reflect = None; + sig.own_reflex_addr = None; + if !keep_desired { + sig.desired_relay_addr = None; + } +} + +/// Core register flow, extracted so the Tauri command AND the +/// reconnect supervisor can both call it. Does the connect + +/// RegisterPresence + spawn-recv-loop dance. +/// +/// Contract: `signal_state.desired_relay_addr` must already be +/// set to `Some(relay)` by the caller. On recv-loop exit, the +/// spawned task will check `desired_relay_addr` and (if still +/// Some) trigger the reconnect supervisor. +/// +/// Explicit `+ Send` on the return type so the reconnect +/// supervisor (which lives inside a `tokio::spawn`) can await +/// this future without hitting auto-trait inference issues. +fn do_register_signal( + signal_state: Arc>, + app: tauri::AppHandle, + relay: String, +) -> impl std::future::Future> + Send { + async move { use wzp_proto::SignalMessage; emit_call_debug(&app, "register_signal:start", serde_json::json!({ "relay": relay })); @@ -696,13 +787,27 @@ async fn register_signal( } } - { let mut sig = state.signal.lock().await; sig.transport = Some(transport.clone()); sig.endpoint = Some(endpoint.clone()); sig.fingerprint = fp.clone(); sig.signal_status = "registered".into(); } + { + let mut sig = signal_state.lock().await; + sig.transport = Some(transport.clone()); + sig.endpoint = Some(endpoint.clone()); + sig.fingerprint = fp.clone(); + sig.signal_status = "registered".into(); + } + // Let the JS side know we've (re-)entered "registered" so any + // "reconnecting..." banner can clear. + let _ = app.emit( + "signal-event", + serde_json::json!({ "type": "registered", "fingerprint": fp }), + ); tracing::info!(%fp, "signal registered, spawning recv loop"); emit_call_debug(&app, "register_signal:recv_loop_spawning", serde_json::json!({ "fingerprint": fp })); - let signal_state = Arc::clone(&state.signal); + let signal_state_loop = signal_state.clone(); let app_clone = app.clone(); tokio::spawn(async move { + // Capture for the exit-path reconnect trigger below. + let signal_state = signal_state_loop.clone(); loop { match transport.recv_signal().await { Ok(Some(SignalMessage::CallRinging { call_id })) => { @@ -837,9 +942,165 @@ async fn register_signal( } } tracing::warn!("signal recv loop exited — signal_status=idle, transport dropped"); - let mut sig = signal_state.lock().await; sig.signal_status = "idle".into(); sig.transport = None; + // Determine whether this was a user-requested close or an + // unexpected drop. `desired_relay_addr.is_some()` means the + // user still wants to be registered — spawn the reconnect + // supervisor with exponential backoff. + let (should_reconnect, desired_relay, already_reconnecting) = { + let mut sig = signal_state.lock().await; + sig.signal_status = "idle".into(); + sig.transport = None; + ( + sig.desired_relay_addr.is_some(), + sig.desired_relay_addr.clone(), + sig.reconnect_in_progress, + ) + }; + if should_reconnect && !already_reconnecting { + if let Some(relay) = desired_relay { + tracing::info!(%relay, "signal recv loop exited unexpectedly — spawning reconnect supervisor"); + emit_call_debug( + &app_clone, + "signal:reconnect_supervisor_spawning", + serde_json::json!({ "relay": relay }), + ); + let _ = app_clone.emit( + "signal-event", + serde_json::json!({ "type": "reconnecting", "relay": relay }), + ); + let state_for_sup = signal_state.clone(); + let app_for_sup = app_clone.clone(); + tokio::spawn(async move { + signal_reconnect_supervisor(state_for_sup, app_for_sup, relay).await; + }); + } + } else if should_reconnect && already_reconnecting { + tracing::debug!("signal recv loop exited; reconnect supervisor already running"); + } }); Ok(fp) + } // end async move +} // end fn do_register_signal + +/// Supervisor task: loops with exponential backoff, calling +/// `do_register_signal` until the relay comes back online. Exits +/// as soon as one attempt succeeds (the newly-spawned recv loop +/// owns the connection from that point on) OR the user clears +/// `desired_relay_addr` via `deregister`. +/// +/// Backoff schedule: 1s, 2s, 4s, 8s, 15s, 30s (capped). Reset on +/// success or exit. +async fn signal_reconnect_supervisor( + signal_state: Arc>, + app: tauri::AppHandle, + initial_relay: String, +) { + // Claim the single-flight slot so a second exit-path trigger + // or a manual register_signal doesn't spawn a duplicate. + { + let mut sig = signal_state.lock().await; + if sig.reconnect_in_progress { + tracing::debug!("reconnect supervisor: another already running, exiting"); + return; + } + sig.reconnect_in_progress = true; + } + + let backoff_schedule_ms: [u64; 6] = [1_000, 2_000, 4_000, 8_000, 15_000, 30_000]; + let mut attempt: usize = 0; + let mut current_relay = initial_relay; + + loop { + // Has the user cleared the desired relay? If so, exit. + let (desired, transport_is_some) = { + let sig = signal_state.lock().await; + (sig.desired_relay_addr.clone(), sig.transport.is_some()) + }; + let Some(desired) = desired else { + tracing::info!("reconnect supervisor: desired_relay_addr cleared, exiting"); + break; + }; + + // Has something else already re-registered us (manual + // register_signal won the race)? If so, exit. + if transport_is_some { + tracing::info!("reconnect supervisor: transport already set by another path, exiting"); + break; + } + + // Has the desired relay changed under us? Switch to the new one. + if desired != current_relay { + tracing::info!(old = %current_relay, new = %desired, "reconnect supervisor: desired relay changed"); + current_relay = desired.clone(); + attempt = 0; + } + + // Back off before the retry (skip on attempt 0 so the first + // reconnect kicks in fast). + if attempt > 0 { + let idx = (attempt - 1).min(backoff_schedule_ms.len() - 1); + let wait_ms = backoff_schedule_ms[idx]; + tracing::info!( + attempt, + wait_ms, + relay = %current_relay, + "reconnect supervisor: backing off" + ); + emit_call_debug( + &app, + "signal:reconnect_backoff", + serde_json::json!({ "attempt": attempt, "wait_ms": wait_ms, "relay": current_relay }), + ); + tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await; + } + attempt += 1; + + // One-shot attempt. do_register_signal will set the + // transport + spawn a fresh recv loop on success. + // + // CRITICAL: release our single-flight guard BEFORE + // do_register_signal spawns the new recv loop, because that + // recv loop's exit path also checks `reconnect_in_progress` + // to decide whether to spawn a supervisor of its own. If we + // held it here and later exited, the slot would be released + // too late for the next drop to trigger a fresh supervisor. + { + let mut sig = signal_state.lock().await; + sig.reconnect_in_progress = false; + } + + emit_call_debug( + &app, + "signal:reconnect_attempt", + serde_json::json!({ "attempt": attempt, "relay": current_relay }), + ); + match do_register_signal(signal_state.clone(), app.clone(), current_relay.clone()).await { + Ok(fp) => { + tracing::info!(%fp, relay = %current_relay, "reconnect supervisor: success"); + emit_call_debug( + &app, + "signal:reconnect_ok", + serde_json::json!({ "fingerprint": fp, "relay": current_relay }), + ); + return; // recv loop now owns the connection + } + Err(e) => { + tracing::warn!(error = %e, relay = %current_relay, "reconnect supervisor: attempt failed"); + emit_call_debug( + &app, + "signal:reconnect_failed", + serde_json::json!({ "attempt": attempt, "error": e, "relay": current_relay }), + ); + // Re-claim the single-flight slot for the next iteration. + let mut sig = signal_state.lock().await; + sig.reconnect_in_progress = true; + } + } + } + + // Loop exited — clean up the slot if we still hold it. + let mut sig = signal_state.lock().await; + sig.reconnect_in_progress = false; } #[tauri::command] @@ -1162,19 +1423,13 @@ async fn get_signal_status(state: tauri::State<'_, Arc>) -> Result>) -> Result<(), String> { - let mut sig = state.signal.lock().await; - if let Some(transport) = sig.transport.take() { - tracing::info!("deregister: closing signal transport"); - transport.close().await.ok(); - } - sig.endpoint = None; - sig.signal_status = "idle".into(); - sig.incoming_call_id = None; - sig.incoming_caller_fp = None; - sig.incoming_caller_alias = None; + internal_deregister(&state.signal, /*keep_desired=*/ false).await; + tracing::info!("deregister: user-requested, desired_relay_addr cleared"); Ok(()) } @@ -1192,6 +1447,8 @@ pub fn run() { incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None, pending_reflect: None, own_reflex_addr: None, + desired_relay_addr: None, + reconnect_in_progress: false, })), }); diff --git a/desktop/src/main.ts b/desktop/src/main.ts index 634b302..87c43a2 100644 --- a/desktop/src/main.ts +++ b/desktop/src/main.ts @@ -347,6 +347,9 @@ function renderRelayDialogList() { // Click to select item.addEventListener("click", () => { + const prev = loadSettings(); + const prevRelayAddr = prev.relays[prev.selectedRelay]?.address; + const s = loadSettings(); s.selectedRelay = i; @@ -358,6 +361,30 @@ function renderRelayDialogList() { saveSettingsObj(s); renderRelayDialogList(); renderRelayButton(); + + // If the user switched relays and we're currently registered, + // transparently re-register against the new one. The Rust + // `register_signal` command is idempotent and handles the + // swap internally (close old transport → connect new). This + // makes "change server" a single-click operation instead of + // manual deregister + re-register. + const newRelayAddr = r.address; + if (newRelayAddr && newRelayAddr !== prevRelayAddr) { + (async () => { + // Is a signal currently registered? get_signal_status is + // cheap and lets us decide whether to kick the swap. + try { + const st: any = await invoke("get_signal_status"); + if (st && st.status === "registered") { + await invoke("register_signal", { relay: newRelayAddr }); + // `signal-event { type: "registered" }` from Rust will + // update directRegistered for us — no manual render here. + } + } catch (e) { + console.warn("relay swap: failed to re-register", e); + } + })(); + } }); relayDialogList.appendChild(item); @@ -1237,5 +1264,26 @@ listen("signal-event", (event: any) => { callStatusText.textContent = ""; incomingCallPanel.classList.add("hidden"); break; + case "reconnecting": + // Signal supervisor is retrying the relay connection. Show + // a non-blocking indicator; the user can keep using + // everything that doesn't need a live signal. + { + const relay = typeof data.relay === "string" ? data.relay : "relay"; + directRegistered.textContent = `🔄 reconnecting to ${relay}…`; + directRegistered.style.color = "var(--yellow)"; + directRegistered.classList.remove("hidden"); + } + break; + case "registered": + // Supervisor (re-)succeeded, or the first register landed. + // Clear the banner and show the registered state. + { + const fp = typeof data.fingerprint === "string" ? data.fingerprint : ""; + directRegistered.textContent = `✓ registered${fp ? ` (${fp.slice(0, 16)}…)` : ""}`; + directRegistered.style.color = "var(--green)"; + directRegistered.classList.remove("hidden"); + } + break; } });