feat(signal): transparent reconnect + auto-swap on relay change
Two related UX fixes, same state-machine surface: 1. Relay drops / goes offline / restarts: the client now auto- reconnects in the background instead of silently falling to "not registered" and requiring the user to tap Deregister + Register. 2. User switches relay in settings: client auto-swaps — close old transport, register against new, all transparent. ## Signal state additions (desktop/src-tauri/src/lib.rs) - `SignalState.desired_relay_addr: Option<String>` — what the user CURRENTLY wants. `Some(x)` means "keep me connected to x", `None` means "user explicitly asked for idle". This is the pivot that distinguishes "connection dropped, retry" from "user deregistered, stop". - `SignalState.reconnect_in_progress: bool` — single-flight guard so concurrent triggers (recv-loop exit + manual register_signal + another recv-loop exit after a brief success) don't spawn duplicate supervisors. ## Refactor The old `register_signal` Tauri command was doing the whole connect + Register + spawn-recv-loop flow inline. Split into: - `internal_deregister(signal_state, keep_desired)` — shared teardown helper that nulls out transport/endpoint/call state and optionally clears `desired_relay_addr`. - `do_register_signal(signal_state, app, relay)` — core connect + register + spawn-recv-loop flow, callable from both the Tauri command and the reconnect supervisor. Returns an explicit `impl Future<...> + Send` to avoid auto-trait inference bailing inside the tokio::spawn chain (rustc loses the Send trail through the recv-loop spawn inside the fn body). - `register_signal` Tauri command — now thin: if already registered to the same relay, no-op; otherwise internal_deregister(keep_desired=false), set desired_relay_addr = Some(new), call do_register_signal. The Rust side handles the "change of server" transition entirely on its own, no deregister+register dance from JS needed. - `deregister` Tauri command — internal_deregister(keep_desired = false) so the recv-loop exit path sees the cleared desired addr and does NOT spawn a supervisor. ## Reconnect supervisor New `signal_reconnect_supervisor(signal_state, app, relay)` task. Spawned from the recv-loop exit path when the loop exits unexpectedly AND `desired_relay_addr.is_some()` AND no supervisor is already running. - Exponential backoff: 1s, 2s, 4s, 8s, 15s, 30s (capped at 30s, never gives up). First attempt is immediate (attempt 0 skips the wait). - On each iteration checks whether `desired_relay_addr` was cleared (user deregistered mid-flight) or another path already re-registered; either short-circuits the supervisor. - Also detects if the user changed relays while the supervisor was sleeping — resets the backoff counter and retries against the new addr. - On success, exits so the newly-spawned recv loop owns the connection from that point. If THAT drops again, a fresh supervisor spawns. - Emits `call-debug-log` and `signal-event` events at every state transition so the GUI can display "reconnecting...", "registered" banners. ## UI wiring (desktop/src/main.ts) - signal-event handler gets two new cases: - `"reconnecting"` — amber "🔄 reconnecting to <relay>…" in the registered banner area - `"registered"` — green "✓ registered (<fp prefix>…)" to clear the reconnecting badge - Relay-selection click handler checks if a signal is currently registered and, if the user picked a different relay, fires `register_signal` with the new address. Rust side handles the swap transparently. Full workspace test: 423 passing (no regressions). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -648,6 +648,23 @@ struct SignalState {
|
||||
/// Tauri command to compute the deterministic role for the
|
||||
/// dual-path QUIC race against `peer_direct_addr`.
|
||||
own_reflex_addr: Option<String>,
|
||||
/// The relay address the user currently wants to be registered
|
||||
/// against. `Some` means "keep me connected" — the supervisor
|
||||
/// will auto-reconnect after unexpected drops. `None` means
|
||||
/// "user explicitly deregistered" — do not retry.
|
||||
///
|
||||
/// Distinguishing these two cases is what lets relay
|
||||
/// restarts + transient network blips be transparent to the
|
||||
/// user: the recv loop dies, but because `desired_relay_addr`
|
||||
/// is still set, a supervisor task retries the full
|
||||
/// connect+register flow with exponential backoff until the
|
||||
/// relay is reachable again.
|
||||
desired_relay_addr: Option<String>,
|
||||
/// Single-flight guard: `true` while the reconnect supervisor
|
||||
/// task is actively trying to re-establish the signal
|
||||
/// connection. Prevents duplicate supervisors from spawning
|
||||
/// (recv loop exit races with a manual register_signal call).
|
||||
reconnect_in_progress: bool,
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -656,6 +673,80 @@ async fn register_signal(
|
||||
app: tauri::AppHandle,
|
||||
relay: String,
|
||||
) -> Result<String, String> {
|
||||
// Set the desired relay and handle the "already registered to
|
||||
// a different relay" transition. This is the public entry
|
||||
// point — settings-screen changes come through here.
|
||||
let already_same = {
|
||||
let sig = state.signal.lock().await;
|
||||
sig.transport.is_some()
|
||||
&& sig.desired_relay_addr.as_deref() == Some(relay.as_str())
|
||||
};
|
||||
if already_same {
|
||||
// Idempotent: user hit "Register" twice on the same relay,
|
||||
// or the JS side re-called after a settings save that
|
||||
// didn't actually change the relay.
|
||||
let sig = state.signal.lock().await;
|
||||
return Ok(sig.fingerprint.clone());
|
||||
}
|
||||
|
||||
// Tear down any existing registration (different relay → swap).
|
||||
internal_deregister(&state.signal, /*keep_desired=*/ false).await;
|
||||
|
||||
// Announce the new desired state so the recv-loop exit path and
|
||||
// any running supervisor can see it.
|
||||
{
|
||||
let mut sig = state.signal.lock().await;
|
||||
sig.desired_relay_addr = Some(relay.clone());
|
||||
}
|
||||
|
||||
do_register_signal(state.signal.clone(), app, relay).await
|
||||
}
|
||||
|
||||
/// Close the current signal transport + clear derived state.
|
||||
/// Used by `deregister` (with `keep_desired = false`, clearing
|
||||
/// `desired_relay_addr`) and by the relay-swap path in
|
||||
/// `register_signal` (also `keep_desired = false` — the caller
|
||||
/// is about to set a new desired addr).
|
||||
async fn internal_deregister(
|
||||
signal_state: &Arc<tokio::sync::Mutex<SignalState>>,
|
||||
keep_desired: bool,
|
||||
) {
|
||||
let mut sig = signal_state.lock().await;
|
||||
if let Some(t) = sig.transport.take() {
|
||||
// Dropping the transport Arc closes the quinn connection;
|
||||
// calling close() explicitly is a no-op but neat.
|
||||
let _ = t.close().await;
|
||||
}
|
||||
sig.endpoint = None;
|
||||
sig.signal_status = "idle".into();
|
||||
sig.incoming_call_id = None;
|
||||
sig.incoming_caller_fp = None;
|
||||
sig.incoming_caller_alias = None;
|
||||
sig.pending_reflect = None;
|
||||
sig.own_reflex_addr = None;
|
||||
if !keep_desired {
|
||||
sig.desired_relay_addr = None;
|
||||
}
|
||||
}
|
||||
|
||||
/// Core register flow, extracted so the Tauri command AND the
|
||||
/// reconnect supervisor can both call it. Does the connect +
|
||||
/// RegisterPresence + spawn-recv-loop dance.
|
||||
///
|
||||
/// Contract: `signal_state.desired_relay_addr` must already be
|
||||
/// set to `Some(relay)` by the caller. On recv-loop exit, the
|
||||
/// spawned task will check `desired_relay_addr` and (if still
|
||||
/// Some) trigger the reconnect supervisor.
|
||||
///
|
||||
/// Explicit `+ Send` on the return type so the reconnect
|
||||
/// supervisor (which lives inside a `tokio::spawn`) can await
|
||||
/// this future without hitting auto-trait inference issues.
|
||||
fn do_register_signal(
|
||||
signal_state: Arc<tokio::sync::Mutex<SignalState>>,
|
||||
app: tauri::AppHandle,
|
||||
relay: String,
|
||||
) -> impl std::future::Future<Output = Result<String, String>> + Send {
|
||||
async move {
|
||||
use wzp_proto::SignalMessage;
|
||||
|
||||
emit_call_debug(&app, "register_signal:start", serde_json::json!({ "relay": relay }));
|
||||
@@ -696,13 +787,27 @@ async fn register_signal(
|
||||
}
|
||||
}
|
||||
|
||||
{ let mut sig = state.signal.lock().await; sig.transport = Some(transport.clone()); sig.endpoint = Some(endpoint.clone()); sig.fingerprint = fp.clone(); sig.signal_status = "registered".into(); }
|
||||
{
|
||||
let mut sig = signal_state.lock().await;
|
||||
sig.transport = Some(transport.clone());
|
||||
sig.endpoint = Some(endpoint.clone());
|
||||
sig.fingerprint = fp.clone();
|
||||
sig.signal_status = "registered".into();
|
||||
}
|
||||
// Let the JS side know we've (re-)entered "registered" so any
|
||||
// "reconnecting..." banner can clear.
|
||||
let _ = app.emit(
|
||||
"signal-event",
|
||||
serde_json::json!({ "type": "registered", "fingerprint": fp }),
|
||||
);
|
||||
|
||||
tracing::info!(%fp, "signal registered, spawning recv loop");
|
||||
emit_call_debug(&app, "register_signal:recv_loop_spawning", serde_json::json!({ "fingerprint": fp }));
|
||||
let signal_state = Arc::clone(&state.signal);
|
||||
let signal_state_loop = signal_state.clone();
|
||||
let app_clone = app.clone();
|
||||
tokio::spawn(async move {
|
||||
// Capture for the exit-path reconnect trigger below.
|
||||
let signal_state = signal_state_loop.clone();
|
||||
loop {
|
||||
match transport.recv_signal().await {
|
||||
Ok(Some(SignalMessage::CallRinging { call_id })) => {
|
||||
@@ -837,9 +942,165 @@ async fn register_signal(
|
||||
}
|
||||
}
|
||||
tracing::warn!("signal recv loop exited — signal_status=idle, transport dropped");
|
||||
let mut sig = signal_state.lock().await; sig.signal_status = "idle".into(); sig.transport = None;
|
||||
// Determine whether this was a user-requested close or an
|
||||
// unexpected drop. `desired_relay_addr.is_some()` means the
|
||||
// user still wants to be registered — spawn the reconnect
|
||||
// supervisor with exponential backoff.
|
||||
let (should_reconnect, desired_relay, already_reconnecting) = {
|
||||
let mut sig = signal_state.lock().await;
|
||||
sig.signal_status = "idle".into();
|
||||
sig.transport = None;
|
||||
(
|
||||
sig.desired_relay_addr.is_some(),
|
||||
sig.desired_relay_addr.clone(),
|
||||
sig.reconnect_in_progress,
|
||||
)
|
||||
};
|
||||
if should_reconnect && !already_reconnecting {
|
||||
if let Some(relay) = desired_relay {
|
||||
tracing::info!(%relay, "signal recv loop exited unexpectedly — spawning reconnect supervisor");
|
||||
emit_call_debug(
|
||||
&app_clone,
|
||||
"signal:reconnect_supervisor_spawning",
|
||||
serde_json::json!({ "relay": relay }),
|
||||
);
|
||||
let _ = app_clone.emit(
|
||||
"signal-event",
|
||||
serde_json::json!({ "type": "reconnecting", "relay": relay }),
|
||||
);
|
||||
let state_for_sup = signal_state.clone();
|
||||
let app_for_sup = app_clone.clone();
|
||||
tokio::spawn(async move {
|
||||
signal_reconnect_supervisor(state_for_sup, app_for_sup, relay).await;
|
||||
});
|
||||
}
|
||||
} else if should_reconnect && already_reconnecting {
|
||||
tracing::debug!("signal recv loop exited; reconnect supervisor already running");
|
||||
}
|
||||
});
|
||||
Ok(fp)
|
||||
} // end async move
|
||||
} // end fn do_register_signal
|
||||
|
||||
/// Supervisor task: loops with exponential backoff, calling
|
||||
/// `do_register_signal` until the relay comes back online. Exits
|
||||
/// as soon as one attempt succeeds (the newly-spawned recv loop
|
||||
/// owns the connection from that point on) OR the user clears
|
||||
/// `desired_relay_addr` via `deregister`.
|
||||
///
|
||||
/// Backoff schedule: 1s, 2s, 4s, 8s, 15s, 30s (capped). Reset on
|
||||
/// success or exit.
|
||||
async fn signal_reconnect_supervisor(
|
||||
signal_state: Arc<tokio::sync::Mutex<SignalState>>,
|
||||
app: tauri::AppHandle,
|
||||
initial_relay: String,
|
||||
) {
|
||||
// Claim the single-flight slot so a second exit-path trigger
|
||||
// or a manual register_signal doesn't spawn a duplicate.
|
||||
{
|
||||
let mut sig = signal_state.lock().await;
|
||||
if sig.reconnect_in_progress {
|
||||
tracing::debug!("reconnect supervisor: another already running, exiting");
|
||||
return;
|
||||
}
|
||||
sig.reconnect_in_progress = true;
|
||||
}
|
||||
|
||||
let backoff_schedule_ms: [u64; 6] = [1_000, 2_000, 4_000, 8_000, 15_000, 30_000];
|
||||
let mut attempt: usize = 0;
|
||||
let mut current_relay = initial_relay;
|
||||
|
||||
loop {
|
||||
// Has the user cleared the desired relay? If so, exit.
|
||||
let (desired, transport_is_some) = {
|
||||
let sig = signal_state.lock().await;
|
||||
(sig.desired_relay_addr.clone(), sig.transport.is_some())
|
||||
};
|
||||
let Some(desired) = desired else {
|
||||
tracing::info!("reconnect supervisor: desired_relay_addr cleared, exiting");
|
||||
break;
|
||||
};
|
||||
|
||||
// Has something else already re-registered us (manual
|
||||
// register_signal won the race)? If so, exit.
|
||||
if transport_is_some {
|
||||
tracing::info!("reconnect supervisor: transport already set by another path, exiting");
|
||||
break;
|
||||
}
|
||||
|
||||
// Has the desired relay changed under us? Switch to the new one.
|
||||
if desired != current_relay {
|
||||
tracing::info!(old = %current_relay, new = %desired, "reconnect supervisor: desired relay changed");
|
||||
current_relay = desired.clone();
|
||||
attempt = 0;
|
||||
}
|
||||
|
||||
// Back off before the retry (skip on attempt 0 so the first
|
||||
// reconnect kicks in fast).
|
||||
if attempt > 0 {
|
||||
let idx = (attempt - 1).min(backoff_schedule_ms.len() - 1);
|
||||
let wait_ms = backoff_schedule_ms[idx];
|
||||
tracing::info!(
|
||||
attempt,
|
||||
wait_ms,
|
||||
relay = %current_relay,
|
||||
"reconnect supervisor: backing off"
|
||||
);
|
||||
emit_call_debug(
|
||||
&app,
|
||||
"signal:reconnect_backoff",
|
||||
serde_json::json!({ "attempt": attempt, "wait_ms": wait_ms, "relay": current_relay }),
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await;
|
||||
}
|
||||
attempt += 1;
|
||||
|
||||
// One-shot attempt. do_register_signal will set the
|
||||
// transport + spawn a fresh recv loop on success.
|
||||
//
|
||||
// CRITICAL: release our single-flight guard BEFORE
|
||||
// do_register_signal spawns the new recv loop, because that
|
||||
// recv loop's exit path also checks `reconnect_in_progress`
|
||||
// to decide whether to spawn a supervisor of its own. If we
|
||||
// held it here and later exited, the slot would be released
|
||||
// too late for the next drop to trigger a fresh supervisor.
|
||||
{
|
||||
let mut sig = signal_state.lock().await;
|
||||
sig.reconnect_in_progress = false;
|
||||
}
|
||||
|
||||
emit_call_debug(
|
||||
&app,
|
||||
"signal:reconnect_attempt",
|
||||
serde_json::json!({ "attempt": attempt, "relay": current_relay }),
|
||||
);
|
||||
match do_register_signal(signal_state.clone(), app.clone(), current_relay.clone()).await {
|
||||
Ok(fp) => {
|
||||
tracing::info!(%fp, relay = %current_relay, "reconnect supervisor: success");
|
||||
emit_call_debug(
|
||||
&app,
|
||||
"signal:reconnect_ok",
|
||||
serde_json::json!({ "fingerprint": fp, "relay": current_relay }),
|
||||
);
|
||||
return; // recv loop now owns the connection
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, relay = %current_relay, "reconnect supervisor: attempt failed");
|
||||
emit_call_debug(
|
||||
&app,
|
||||
"signal:reconnect_failed",
|
||||
serde_json::json!({ "attempt": attempt, "error": e, "relay": current_relay }),
|
||||
);
|
||||
// Re-claim the single-flight slot for the next iteration.
|
||||
let mut sig = signal_state.lock().await;
|
||||
sig.reconnect_in_progress = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Loop exited — clean up the slot if we still hold it.
|
||||
let mut sig = signal_state.lock().await;
|
||||
sig.reconnect_in_progress = false;
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -1162,19 +1423,13 @@ async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<ser
|
||||
|
||||
/// Tear down the signal connection so the user goes back to idle. Called
|
||||
/// when the user clicks "Deregister" on the direct-call screen. The
|
||||
/// spawned recv loop will break out naturally when the transport closes.
|
||||
/// spawned recv loop will break out naturally when the transport closes,
|
||||
/// AND — critically — clearing `desired_relay_addr` here tells that
|
||||
/// exit path NOT to spawn a reconnect supervisor.
|
||||
#[tauri::command]
|
||||
async fn deregister(state: tauri::State<'_, Arc<AppState>>) -> Result<(), String> {
|
||||
let mut sig = state.signal.lock().await;
|
||||
if let Some(transport) = sig.transport.take() {
|
||||
tracing::info!("deregister: closing signal transport");
|
||||
transport.close().await.ok();
|
||||
}
|
||||
sig.endpoint = None;
|
||||
sig.signal_status = "idle".into();
|
||||
sig.incoming_call_id = None;
|
||||
sig.incoming_caller_fp = None;
|
||||
sig.incoming_caller_alias = None;
|
||||
internal_deregister(&state.signal, /*keep_desired=*/ false).await;
|
||||
tracing::info!("deregister: user-requested, desired_relay_addr cleared");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1192,6 +1447,8 @@ pub fn run() {
|
||||
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
|
||||
pending_reflect: None,
|
||||
own_reflex_addr: None,
|
||||
desired_relay_addr: None,
|
||||
reconnect_in_progress: false,
|
||||
})),
|
||||
});
|
||||
|
||||
|
||||
@@ -347,6 +347,9 @@ function renderRelayDialogList() {
|
||||
|
||||
// Click to select
|
||||
item.addEventListener("click", () => {
|
||||
const prev = loadSettings();
|
||||
const prevRelayAddr = prev.relays[prev.selectedRelay]?.address;
|
||||
|
||||
const s = loadSettings();
|
||||
s.selectedRelay = i;
|
||||
|
||||
@@ -358,6 +361,30 @@ function renderRelayDialogList() {
|
||||
saveSettingsObj(s);
|
||||
renderRelayDialogList();
|
||||
renderRelayButton();
|
||||
|
||||
// If the user switched relays and we're currently registered,
|
||||
// transparently re-register against the new one. The Rust
|
||||
// `register_signal` command is idempotent and handles the
|
||||
// swap internally (close old transport → connect new). This
|
||||
// makes "change server" a single-click operation instead of
|
||||
// manual deregister + re-register.
|
||||
const newRelayAddr = r.address;
|
||||
if (newRelayAddr && newRelayAddr !== prevRelayAddr) {
|
||||
(async () => {
|
||||
// Is a signal currently registered? get_signal_status is
|
||||
// cheap and lets us decide whether to kick the swap.
|
||||
try {
|
||||
const st: any = await invoke("get_signal_status");
|
||||
if (st && st.status === "registered") {
|
||||
await invoke<string>("register_signal", { relay: newRelayAddr });
|
||||
// `signal-event { type: "registered" }` from Rust will
|
||||
// update directRegistered for us — no manual render here.
|
||||
}
|
||||
} catch (e) {
|
||||
console.warn("relay swap: failed to re-register", e);
|
||||
}
|
||||
})();
|
||||
}
|
||||
});
|
||||
|
||||
relayDialogList.appendChild(item);
|
||||
@@ -1237,5 +1264,26 @@ listen("signal-event", (event: any) => {
|
||||
callStatusText.textContent = "";
|
||||
incomingCallPanel.classList.add("hidden");
|
||||
break;
|
||||
case "reconnecting":
|
||||
// Signal supervisor is retrying the relay connection. Show
|
||||
// a non-blocking indicator; the user can keep using
|
||||
// everything that doesn't need a live signal.
|
||||
{
|
||||
const relay = typeof data.relay === "string" ? data.relay : "relay";
|
||||
directRegistered.textContent = `🔄 reconnecting to ${relay}…`;
|
||||
directRegistered.style.color = "var(--yellow)";
|
||||
directRegistered.classList.remove("hidden");
|
||||
}
|
||||
break;
|
||||
case "registered":
|
||||
// Supervisor (re-)succeeded, or the first register landed.
|
||||
// Clear the banner and show the registered state.
|
||||
{
|
||||
const fp = typeof data.fingerprint === "string" ? data.fingerprint : "";
|
||||
directRegistered.textContent = `✓ registered${fp ? ` (${fp.slice(0, 16)}…)` : ""}`;
|
||||
directRegistered.style.color = "var(--green)";
|
||||
directRegistered.classList.remove("hidden");
|
||||
}
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user