diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index a28210f..5da51d3 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -648,6 +648,23 @@ struct SignalState { /// Tauri command to compute the deterministic role for the /// dual-path QUIC race against `peer_direct_addr`. own_reflex_addr: Option, + /// The relay address the user currently wants to be registered + /// against. `Some` means "keep me connected" — the supervisor + /// will auto-reconnect after unexpected drops. `None` means + /// "user explicitly deregistered" — do not retry. + /// + /// Distinguishing these two cases is what lets relay + /// restarts + transient network blips be transparent to the + /// user: the recv loop dies, but because `desired_relay_addr` + /// is still set, a supervisor task retries the full + /// connect+register flow with exponential backoff until the + /// relay is reachable again. + desired_relay_addr: Option, + /// Single-flight guard: `true` while the reconnect supervisor + /// task is actively trying to re-establish the signal + /// connection. Prevents duplicate supervisors from spawning + /// (recv loop exit races with a manual register_signal call). + reconnect_in_progress: bool, } #[tauri::command] @@ -656,6 +673,80 @@ async fn register_signal( app: tauri::AppHandle, relay: String, ) -> Result { + // Set the desired relay and handle the "already registered to + // a different relay" transition. This is the public entry + // point — settings-screen changes come through here. + let already_same = { + let sig = state.signal.lock().await; + sig.transport.is_some() + && sig.desired_relay_addr.as_deref() == Some(relay.as_str()) + }; + if already_same { + // Idempotent: user hit "Register" twice on the same relay, + // or the JS side re-called after a settings save that + // didn't actually change the relay. + let sig = state.signal.lock().await; + return Ok(sig.fingerprint.clone()); + } + + // Tear down any existing registration (different relay → swap). + internal_deregister(&state.signal, /*keep_desired=*/ false).await; + + // Announce the new desired state so the recv-loop exit path and + // any running supervisor can see it. + { + let mut sig = state.signal.lock().await; + sig.desired_relay_addr = Some(relay.clone()); + } + + do_register_signal(state.signal.clone(), app, relay).await +} + +/// Close the current signal transport + clear derived state. +/// Used by `deregister` (with `keep_desired = false`, clearing +/// `desired_relay_addr`) and by the relay-swap path in +/// `register_signal` (also `keep_desired = false` — the caller +/// is about to set a new desired addr). +async fn internal_deregister( + signal_state: &Arc>, + keep_desired: bool, +) { + let mut sig = signal_state.lock().await; + if let Some(t) = sig.transport.take() { + // Dropping the transport Arc closes the quinn connection; + // calling close() explicitly is a no-op but neat. + let _ = t.close().await; + } + sig.endpoint = None; + sig.signal_status = "idle".into(); + sig.incoming_call_id = None; + sig.incoming_caller_fp = None; + sig.incoming_caller_alias = None; + sig.pending_reflect = None; + sig.own_reflex_addr = None; + if !keep_desired { + sig.desired_relay_addr = None; + } +} + +/// Core register flow, extracted so the Tauri command AND the +/// reconnect supervisor can both call it. Does the connect + +/// RegisterPresence + spawn-recv-loop dance. +/// +/// Contract: `signal_state.desired_relay_addr` must already be +/// set to `Some(relay)` by the caller. On recv-loop exit, the +/// spawned task will check `desired_relay_addr` and (if still +/// Some) trigger the reconnect supervisor. +/// +/// Explicit `+ Send` on the return type so the reconnect +/// supervisor (which lives inside a `tokio::spawn`) can await +/// this future without hitting auto-trait inference issues. +fn do_register_signal( + signal_state: Arc>, + app: tauri::AppHandle, + relay: String, +) -> impl std::future::Future> + Send { + async move { use wzp_proto::SignalMessage; emit_call_debug(&app, "register_signal:start", serde_json::json!({ "relay": relay })); @@ -696,13 +787,27 @@ async fn register_signal( } } - { let mut sig = state.signal.lock().await; sig.transport = Some(transport.clone()); sig.endpoint = Some(endpoint.clone()); sig.fingerprint = fp.clone(); sig.signal_status = "registered".into(); } + { + let mut sig = signal_state.lock().await; + sig.transport = Some(transport.clone()); + sig.endpoint = Some(endpoint.clone()); + sig.fingerprint = fp.clone(); + sig.signal_status = "registered".into(); + } + // Let the JS side know we've (re-)entered "registered" so any + // "reconnecting..." banner can clear. + let _ = app.emit( + "signal-event", + serde_json::json!({ "type": "registered", "fingerprint": fp }), + ); tracing::info!(%fp, "signal registered, spawning recv loop"); emit_call_debug(&app, "register_signal:recv_loop_spawning", serde_json::json!({ "fingerprint": fp })); - let signal_state = Arc::clone(&state.signal); + let signal_state_loop = signal_state.clone(); let app_clone = app.clone(); tokio::spawn(async move { + // Capture for the exit-path reconnect trigger below. + let signal_state = signal_state_loop.clone(); loop { match transport.recv_signal().await { Ok(Some(SignalMessage::CallRinging { call_id })) => { @@ -837,9 +942,165 @@ async fn register_signal( } } tracing::warn!("signal recv loop exited — signal_status=idle, transport dropped"); - let mut sig = signal_state.lock().await; sig.signal_status = "idle".into(); sig.transport = None; + // Determine whether this was a user-requested close or an + // unexpected drop. `desired_relay_addr.is_some()` means the + // user still wants to be registered — spawn the reconnect + // supervisor with exponential backoff. + let (should_reconnect, desired_relay, already_reconnecting) = { + let mut sig = signal_state.lock().await; + sig.signal_status = "idle".into(); + sig.transport = None; + ( + sig.desired_relay_addr.is_some(), + sig.desired_relay_addr.clone(), + sig.reconnect_in_progress, + ) + }; + if should_reconnect && !already_reconnecting { + if let Some(relay) = desired_relay { + tracing::info!(%relay, "signal recv loop exited unexpectedly — spawning reconnect supervisor"); + emit_call_debug( + &app_clone, + "signal:reconnect_supervisor_spawning", + serde_json::json!({ "relay": relay }), + ); + let _ = app_clone.emit( + "signal-event", + serde_json::json!({ "type": "reconnecting", "relay": relay }), + ); + let state_for_sup = signal_state.clone(); + let app_for_sup = app_clone.clone(); + tokio::spawn(async move { + signal_reconnect_supervisor(state_for_sup, app_for_sup, relay).await; + }); + } + } else if should_reconnect && already_reconnecting { + tracing::debug!("signal recv loop exited; reconnect supervisor already running"); + } }); Ok(fp) + } // end async move +} // end fn do_register_signal + +/// Supervisor task: loops with exponential backoff, calling +/// `do_register_signal` until the relay comes back online. Exits +/// as soon as one attempt succeeds (the newly-spawned recv loop +/// owns the connection from that point on) OR the user clears +/// `desired_relay_addr` via `deregister`. +/// +/// Backoff schedule: 1s, 2s, 4s, 8s, 15s, 30s (capped). Reset on +/// success or exit. +async fn signal_reconnect_supervisor( + signal_state: Arc>, + app: tauri::AppHandle, + initial_relay: String, +) { + // Claim the single-flight slot so a second exit-path trigger + // or a manual register_signal doesn't spawn a duplicate. + { + let mut sig = signal_state.lock().await; + if sig.reconnect_in_progress { + tracing::debug!("reconnect supervisor: another already running, exiting"); + return; + } + sig.reconnect_in_progress = true; + } + + let backoff_schedule_ms: [u64; 6] = [1_000, 2_000, 4_000, 8_000, 15_000, 30_000]; + let mut attempt: usize = 0; + let mut current_relay = initial_relay; + + loop { + // Has the user cleared the desired relay? If so, exit. + let (desired, transport_is_some) = { + let sig = signal_state.lock().await; + (sig.desired_relay_addr.clone(), sig.transport.is_some()) + }; + let Some(desired) = desired else { + tracing::info!("reconnect supervisor: desired_relay_addr cleared, exiting"); + break; + }; + + // Has something else already re-registered us (manual + // register_signal won the race)? If so, exit. + if transport_is_some { + tracing::info!("reconnect supervisor: transport already set by another path, exiting"); + break; + } + + // Has the desired relay changed under us? Switch to the new one. + if desired != current_relay { + tracing::info!(old = %current_relay, new = %desired, "reconnect supervisor: desired relay changed"); + current_relay = desired.clone(); + attempt = 0; + } + + // Back off before the retry (skip on attempt 0 so the first + // reconnect kicks in fast). + if attempt > 0 { + let idx = (attempt - 1).min(backoff_schedule_ms.len() - 1); + let wait_ms = backoff_schedule_ms[idx]; + tracing::info!( + attempt, + wait_ms, + relay = %current_relay, + "reconnect supervisor: backing off" + ); + emit_call_debug( + &app, + "signal:reconnect_backoff", + serde_json::json!({ "attempt": attempt, "wait_ms": wait_ms, "relay": current_relay }), + ); + tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await; + } + attempt += 1; + + // One-shot attempt. do_register_signal will set the + // transport + spawn a fresh recv loop on success. + // + // CRITICAL: release our single-flight guard BEFORE + // do_register_signal spawns the new recv loop, because that + // recv loop's exit path also checks `reconnect_in_progress` + // to decide whether to spawn a supervisor of its own. If we + // held it here and later exited, the slot would be released + // too late for the next drop to trigger a fresh supervisor. + { + let mut sig = signal_state.lock().await; + sig.reconnect_in_progress = false; + } + + emit_call_debug( + &app, + "signal:reconnect_attempt", + serde_json::json!({ "attempt": attempt, "relay": current_relay }), + ); + match do_register_signal(signal_state.clone(), app.clone(), current_relay.clone()).await { + Ok(fp) => { + tracing::info!(%fp, relay = %current_relay, "reconnect supervisor: success"); + emit_call_debug( + &app, + "signal:reconnect_ok", + serde_json::json!({ "fingerprint": fp, "relay": current_relay }), + ); + return; // recv loop now owns the connection + } + Err(e) => { + tracing::warn!(error = %e, relay = %current_relay, "reconnect supervisor: attempt failed"); + emit_call_debug( + &app, + "signal:reconnect_failed", + serde_json::json!({ "attempt": attempt, "error": e, "relay": current_relay }), + ); + // Re-claim the single-flight slot for the next iteration. + let mut sig = signal_state.lock().await; + sig.reconnect_in_progress = true; + } + } + } + + // Loop exited — clean up the slot if we still hold it. + let mut sig = signal_state.lock().await; + sig.reconnect_in_progress = false; } #[tauri::command] @@ -1162,19 +1423,13 @@ async fn get_signal_status(state: tauri::State<'_, Arc>) -> Result>) -> Result<(), String> { - let mut sig = state.signal.lock().await; - if let Some(transport) = sig.transport.take() { - tracing::info!("deregister: closing signal transport"); - transport.close().await.ok(); - } - sig.endpoint = None; - sig.signal_status = "idle".into(); - sig.incoming_call_id = None; - sig.incoming_caller_fp = None; - sig.incoming_caller_alias = None; + internal_deregister(&state.signal, /*keep_desired=*/ false).await; + tracing::info!("deregister: user-requested, desired_relay_addr cleared"); Ok(()) } @@ -1192,6 +1447,8 @@ pub fn run() { incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None, pending_reflect: None, own_reflex_addr: None, + desired_relay_addr: None, + reconnect_in_progress: false, })), }); diff --git a/desktop/src/main.ts b/desktop/src/main.ts index 634b302..87c43a2 100644 --- a/desktop/src/main.ts +++ b/desktop/src/main.ts @@ -347,6 +347,9 @@ function renderRelayDialogList() { // Click to select item.addEventListener("click", () => { + const prev = loadSettings(); + const prevRelayAddr = prev.relays[prev.selectedRelay]?.address; + const s = loadSettings(); s.selectedRelay = i; @@ -358,6 +361,30 @@ function renderRelayDialogList() { saveSettingsObj(s); renderRelayDialogList(); renderRelayButton(); + + // If the user switched relays and we're currently registered, + // transparently re-register against the new one. The Rust + // `register_signal` command is idempotent and handles the + // swap internally (close old transport → connect new). This + // makes "change server" a single-click operation instead of + // manual deregister + re-register. + const newRelayAddr = r.address; + if (newRelayAddr && newRelayAddr !== prevRelayAddr) { + (async () => { + // Is a signal currently registered? get_signal_status is + // cheap and lets us decide whether to kick the swap. + try { + const st: any = await invoke("get_signal_status"); + if (st && st.status === "registered") { + await invoke("register_signal", { relay: newRelayAddr }); + // `signal-event { type: "registered" }` from Rust will + // update directRegistered for us — no manual render here. + } + } catch (e) { + console.warn("relay swap: failed to re-register", e); + } + })(); + } }); relayDialogList.appendChild(item); @@ -1237,5 +1264,26 @@ listen("signal-event", (event: any) => { callStatusText.textContent = ""; incomingCallPanel.classList.add("hidden"); break; + case "reconnecting": + // Signal supervisor is retrying the relay connection. Show + // a non-blocking indicator; the user can keep using + // everything that doesn't need a live signal. + { + const relay = typeof data.relay === "string" ? data.relay : "relay"; + directRegistered.textContent = `🔄 reconnecting to ${relay}…`; + directRegistered.style.color = "var(--yellow)"; + directRegistered.classList.remove("hidden"); + } + break; + case "registered": + // Supervisor (re-)succeeded, or the first register landed. + // Clear the banner and show the registered state. + { + const fp = typeof data.fingerprint === "string" ? data.fingerprint : ""; + directRegistered.textContent = `✓ registered${fp ? ` (${fp.slice(0, 16)}…)` : ""}`; + directRegistered.style.color = "var(--green)"; + directRegistered.classList.remove("hidden"); + } + break; } });