feat(signal): transparent reconnect + auto-swap on relay change

Two related UX fixes, same state-machine surface:

1. Relay drops / goes offline / restarts: the client now auto-
   reconnects in the background instead of silently falling to
   "not registered" and requiring the user to tap Deregister +
   Register.
2. User switches relay in settings: client auto-swaps — close
   old transport, register against new, all transparent.

## Signal state additions (desktop/src-tauri/src/lib.rs)

- `SignalState.desired_relay_addr: Option<String>` — what the
  user CURRENTLY wants. `Some(x)` means "keep me connected to x",
  `None` means "user explicitly asked for idle". This is the
  pivot that distinguishes "connection dropped, retry" from
  "user deregistered, stop".
- `SignalState.reconnect_in_progress: bool` — single-flight
  guard so concurrent triggers (recv-loop exit + manual
  register_signal + another recv-loop exit after a brief
  success) don't spawn duplicate supervisors.

## Refactor

The old `register_signal` Tauri command was doing the whole
connect + Register + spawn-recv-loop flow inline. Split into:

- `internal_deregister(signal_state, keep_desired)` — shared
  teardown helper that nulls out transport/endpoint/call state
  and optionally clears `desired_relay_addr`.
- `do_register_signal(signal_state, app, relay)` — core
  connect + register + spawn-recv-loop flow, callable from both
  the Tauri command and the reconnect supervisor. Returns an
  explicit `impl Future<...> + Send` to avoid auto-trait
  inference bailing inside the tokio::spawn chain (rustc loses
  the Send trail through the recv-loop spawn inside the fn
  body).
- `register_signal` Tauri command — now thin: if already
  registered to the same relay, no-op; otherwise
  internal_deregister(keep_desired=false), set
  desired_relay_addr = Some(new), call do_register_signal. The
  Rust side handles the "change of server" transition entirely
  on its own, no deregister+register dance from JS needed.
- `deregister` Tauri command — internal_deregister(keep_desired
  = false) so the recv-loop exit path sees the cleared desired
  addr and does NOT spawn a supervisor.

## Reconnect supervisor

New `signal_reconnect_supervisor(signal_state, app, relay)`
task. Spawned from the recv-loop exit path when the loop exits
unexpectedly AND `desired_relay_addr.is_some()` AND no
supervisor is already running.

- Exponential backoff: 1s, 2s, 4s, 8s, 15s, 30s (capped at 30s,
  never gives up). First attempt is immediate (attempt 0 skips
  the wait).
- On each iteration checks whether `desired_relay_addr` was
  cleared (user deregistered mid-flight) or another path
  already re-registered; either short-circuits the supervisor.
- Also detects if the user changed relays while the supervisor
  was sleeping — resets the backoff counter and retries against
  the new addr.
- On success, exits so the newly-spawned recv loop owns the
  connection from that point. If THAT drops again, a fresh
  supervisor spawns.
- Emits `call-debug-log` and `signal-event` events at every
  state transition so the GUI can display "reconnecting...",
  "registered" banners.

## UI wiring (desktop/src/main.ts)

- signal-event handler gets two new cases:
  - `"reconnecting"` — amber "🔄 reconnecting to <relay>…" in
    the registered banner area
  - `"registered"` — green "✓ registered (<fp prefix>…)" to
    clear the reconnecting badge
- Relay-selection click handler checks if a signal is
  currently registered and, if the user picked a different
  relay, fires `register_signal` with the new address. Rust
  side handles the swap transparently.

Full workspace test: 423 passing (no regressions).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-11 18:40:11 +04:00
parent 00deb97a5d
commit 20375eceb9
2 changed files with 319 additions and 14 deletions

View File

@@ -648,6 +648,23 @@ struct SignalState {
/// Tauri command to compute the deterministic role for the
/// dual-path QUIC race against `peer_direct_addr`.
own_reflex_addr: Option<String>,
/// The relay address the user currently wants to be registered
/// against. `Some` means "keep me connected" — the supervisor
/// will auto-reconnect after unexpected drops. `None` means
/// "user explicitly deregistered" — do not retry.
///
/// Distinguishing these two cases is what lets relay
/// restarts + transient network blips be transparent to the
/// user: the recv loop dies, but because `desired_relay_addr`
/// is still set, a supervisor task retries the full
/// connect+register flow with exponential backoff until the
/// relay is reachable again.
desired_relay_addr: Option<String>,
/// Single-flight guard: `true` while the reconnect supervisor
/// task is actively trying to re-establish the signal
/// connection. Prevents duplicate supervisors from spawning
/// (recv loop exit races with a manual register_signal call).
reconnect_in_progress: bool,
}
#[tauri::command]
@@ -656,6 +673,80 @@ async fn register_signal(
app: tauri::AppHandle,
relay: String,
) -> Result<String, String> {
// Set the desired relay and handle the "already registered to
// a different relay" transition. This is the public entry
// point — settings-screen changes come through here.
let already_same = {
let sig = state.signal.lock().await;
sig.transport.is_some()
&& sig.desired_relay_addr.as_deref() == Some(relay.as_str())
};
if already_same {
// Idempotent: user hit "Register" twice on the same relay,
// or the JS side re-called after a settings save that
// didn't actually change the relay.
let sig = state.signal.lock().await;
return Ok(sig.fingerprint.clone());
}
// Tear down any existing registration (different relay → swap).
internal_deregister(&state.signal, /*keep_desired=*/ false).await;
// Announce the new desired state so the recv-loop exit path and
// any running supervisor can see it.
{
let mut sig = state.signal.lock().await;
sig.desired_relay_addr = Some(relay.clone());
}
do_register_signal(state.signal.clone(), app, relay).await
}
/// Close the current signal transport + clear derived state.
/// Used by `deregister` (with `keep_desired = false`, clearing
/// `desired_relay_addr`) and by the relay-swap path in
/// `register_signal` (also `keep_desired = false` — the caller
/// is about to set a new desired addr).
async fn internal_deregister(
signal_state: &Arc<tokio::sync::Mutex<SignalState>>,
keep_desired: bool,
) {
let mut sig = signal_state.lock().await;
if let Some(t) = sig.transport.take() {
// Dropping the transport Arc closes the quinn connection;
// calling close() explicitly is a no-op but neat.
let _ = t.close().await;
}
sig.endpoint = None;
sig.signal_status = "idle".into();
sig.incoming_call_id = None;
sig.incoming_caller_fp = None;
sig.incoming_caller_alias = None;
sig.pending_reflect = None;
sig.own_reflex_addr = None;
if !keep_desired {
sig.desired_relay_addr = None;
}
}
/// Core register flow, extracted so the Tauri command AND the
/// reconnect supervisor can both call it. Does the connect +
/// RegisterPresence + spawn-recv-loop dance.
///
/// Contract: `signal_state.desired_relay_addr` must already be
/// set to `Some(relay)` by the caller. On recv-loop exit, the
/// spawned task will check `desired_relay_addr` and (if still
/// Some) trigger the reconnect supervisor.
///
/// Explicit `+ Send` on the return type so the reconnect
/// supervisor (which lives inside a `tokio::spawn`) can await
/// this future without hitting auto-trait inference issues.
fn do_register_signal(
signal_state: Arc<tokio::sync::Mutex<SignalState>>,
app: tauri::AppHandle,
relay: String,
) -> impl std::future::Future<Output = Result<String, String>> + Send {
async move {
use wzp_proto::SignalMessage;
emit_call_debug(&app, "register_signal:start", serde_json::json!({ "relay": relay }));
@@ -696,13 +787,27 @@ async fn register_signal(
}
}
{ let mut sig = state.signal.lock().await; sig.transport = Some(transport.clone()); sig.endpoint = Some(endpoint.clone()); sig.fingerprint = fp.clone(); sig.signal_status = "registered".into(); }
{
let mut sig = signal_state.lock().await;
sig.transport = Some(transport.clone());
sig.endpoint = Some(endpoint.clone());
sig.fingerprint = fp.clone();
sig.signal_status = "registered".into();
}
// Let the JS side know we've (re-)entered "registered" so any
// "reconnecting..." banner can clear.
let _ = app.emit(
"signal-event",
serde_json::json!({ "type": "registered", "fingerprint": fp }),
);
tracing::info!(%fp, "signal registered, spawning recv loop");
emit_call_debug(&app, "register_signal:recv_loop_spawning", serde_json::json!({ "fingerprint": fp }));
let signal_state = Arc::clone(&state.signal);
let signal_state_loop = signal_state.clone();
let app_clone = app.clone();
tokio::spawn(async move {
// Capture for the exit-path reconnect trigger below.
let signal_state = signal_state_loop.clone();
loop {
match transport.recv_signal().await {
Ok(Some(SignalMessage::CallRinging { call_id })) => {
@@ -837,9 +942,165 @@ async fn register_signal(
}
}
tracing::warn!("signal recv loop exited — signal_status=idle, transport dropped");
let mut sig = signal_state.lock().await; sig.signal_status = "idle".into(); sig.transport = None;
// Determine whether this was a user-requested close or an
// unexpected drop. `desired_relay_addr.is_some()` means the
// user still wants to be registered — spawn the reconnect
// supervisor with exponential backoff.
let (should_reconnect, desired_relay, already_reconnecting) = {
let mut sig = signal_state.lock().await;
sig.signal_status = "idle".into();
sig.transport = None;
(
sig.desired_relay_addr.is_some(),
sig.desired_relay_addr.clone(),
sig.reconnect_in_progress,
)
};
if should_reconnect && !already_reconnecting {
if let Some(relay) = desired_relay {
tracing::info!(%relay, "signal recv loop exited unexpectedly — spawning reconnect supervisor");
emit_call_debug(
&app_clone,
"signal:reconnect_supervisor_spawning",
serde_json::json!({ "relay": relay }),
);
let _ = app_clone.emit(
"signal-event",
serde_json::json!({ "type": "reconnecting", "relay": relay }),
);
let state_for_sup = signal_state.clone();
let app_for_sup = app_clone.clone();
tokio::spawn(async move {
signal_reconnect_supervisor(state_for_sup, app_for_sup, relay).await;
});
}
} else if should_reconnect && already_reconnecting {
tracing::debug!("signal recv loop exited; reconnect supervisor already running");
}
});
Ok(fp)
} // end async move
} // end fn do_register_signal
/// Supervisor task: loops with exponential backoff, calling
/// `do_register_signal` until the relay comes back online. Exits
/// as soon as one attempt succeeds (the newly-spawned recv loop
/// owns the connection from that point on) OR the user clears
/// `desired_relay_addr` via `deregister`.
///
/// Backoff schedule: 1s, 2s, 4s, 8s, 15s, 30s (capped). Reset on
/// success or exit.
async fn signal_reconnect_supervisor(
signal_state: Arc<tokio::sync::Mutex<SignalState>>,
app: tauri::AppHandle,
initial_relay: String,
) {
// Claim the single-flight slot so a second exit-path trigger
// or a manual register_signal doesn't spawn a duplicate.
{
let mut sig = signal_state.lock().await;
if sig.reconnect_in_progress {
tracing::debug!("reconnect supervisor: another already running, exiting");
return;
}
sig.reconnect_in_progress = true;
}
let backoff_schedule_ms: [u64; 6] = [1_000, 2_000, 4_000, 8_000, 15_000, 30_000];
let mut attempt: usize = 0;
let mut current_relay = initial_relay;
loop {
// Has the user cleared the desired relay? If so, exit.
let (desired, transport_is_some) = {
let sig = signal_state.lock().await;
(sig.desired_relay_addr.clone(), sig.transport.is_some())
};
let Some(desired) = desired else {
tracing::info!("reconnect supervisor: desired_relay_addr cleared, exiting");
break;
};
// Has something else already re-registered us (manual
// register_signal won the race)? If so, exit.
if transport_is_some {
tracing::info!("reconnect supervisor: transport already set by another path, exiting");
break;
}
// Has the desired relay changed under us? Switch to the new one.
if desired != current_relay {
tracing::info!(old = %current_relay, new = %desired, "reconnect supervisor: desired relay changed");
current_relay = desired.clone();
attempt = 0;
}
// Back off before the retry (skip on attempt 0 so the first
// reconnect kicks in fast).
if attempt > 0 {
let idx = (attempt - 1).min(backoff_schedule_ms.len() - 1);
let wait_ms = backoff_schedule_ms[idx];
tracing::info!(
attempt,
wait_ms,
relay = %current_relay,
"reconnect supervisor: backing off"
);
emit_call_debug(
&app,
"signal:reconnect_backoff",
serde_json::json!({ "attempt": attempt, "wait_ms": wait_ms, "relay": current_relay }),
);
tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await;
}
attempt += 1;
// One-shot attempt. do_register_signal will set the
// transport + spawn a fresh recv loop on success.
//
// CRITICAL: release our single-flight guard BEFORE
// do_register_signal spawns the new recv loop, because that
// recv loop's exit path also checks `reconnect_in_progress`
// to decide whether to spawn a supervisor of its own. If we
// held it here and later exited, the slot would be released
// too late for the next drop to trigger a fresh supervisor.
{
let mut sig = signal_state.lock().await;
sig.reconnect_in_progress = false;
}
emit_call_debug(
&app,
"signal:reconnect_attempt",
serde_json::json!({ "attempt": attempt, "relay": current_relay }),
);
match do_register_signal(signal_state.clone(), app.clone(), current_relay.clone()).await {
Ok(fp) => {
tracing::info!(%fp, relay = %current_relay, "reconnect supervisor: success");
emit_call_debug(
&app,
"signal:reconnect_ok",
serde_json::json!({ "fingerprint": fp, "relay": current_relay }),
);
return; // recv loop now owns the connection
}
Err(e) => {
tracing::warn!(error = %e, relay = %current_relay, "reconnect supervisor: attempt failed");
emit_call_debug(
&app,
"signal:reconnect_failed",
serde_json::json!({ "attempt": attempt, "error": e, "relay": current_relay }),
);
// Re-claim the single-flight slot for the next iteration.
let mut sig = signal_state.lock().await;
sig.reconnect_in_progress = true;
}
}
}
// Loop exited — clean up the slot if we still hold it.
let mut sig = signal_state.lock().await;
sig.reconnect_in_progress = false;
}
#[tauri::command]
@@ -1162,19 +1423,13 @@ async fn get_signal_status(state: tauri::State<'_, Arc<AppState>>) -> Result<ser
/// Tear down the signal connection so the user goes back to idle. Called
/// when the user clicks "Deregister" on the direct-call screen. The
/// spawned recv loop will break out naturally when the transport closes.
/// spawned recv loop will break out naturally when the transport closes,
/// AND — critically — clearing `desired_relay_addr` here tells that
/// exit path NOT to spawn a reconnect supervisor.
#[tauri::command]
async fn deregister(state: tauri::State<'_, Arc<AppState>>) -> Result<(), String> {
let mut sig = state.signal.lock().await;
if let Some(transport) = sig.transport.take() {
tracing::info!("deregister: closing signal transport");
transport.close().await.ok();
}
sig.endpoint = None;
sig.signal_status = "idle".into();
sig.incoming_call_id = None;
sig.incoming_caller_fp = None;
sig.incoming_caller_alias = None;
internal_deregister(&state.signal, /*keep_desired=*/ false).await;
tracing::info!("deregister: user-requested, desired_relay_addr cleared");
Ok(())
}
@@ -1192,6 +1447,8 @@ pub fn run() {
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
pending_reflect: None,
own_reflex_addr: None,
desired_relay_addr: None,
reconnect_in_progress: false,
})),
});

View File

@@ -347,6 +347,9 @@ function renderRelayDialogList() {
// Click to select
item.addEventListener("click", () => {
const prev = loadSettings();
const prevRelayAddr = prev.relays[prev.selectedRelay]?.address;
const s = loadSettings();
s.selectedRelay = i;
@@ -358,6 +361,30 @@ function renderRelayDialogList() {
saveSettingsObj(s);
renderRelayDialogList();
renderRelayButton();
// If the user switched relays and we're currently registered,
// transparently re-register against the new one. The Rust
// `register_signal` command is idempotent and handles the
// swap internally (close old transport → connect new). This
// makes "change server" a single-click operation instead of
// manual deregister + re-register.
const newRelayAddr = r.address;
if (newRelayAddr && newRelayAddr !== prevRelayAddr) {
(async () => {
// Is a signal currently registered? get_signal_status is
// cheap and lets us decide whether to kick the swap.
try {
const st: any = await invoke("get_signal_status");
if (st && st.status === "registered") {
await invoke<string>("register_signal", { relay: newRelayAddr });
// `signal-event { type: "registered" }` from Rust will
// update directRegistered for us — no manual render here.
}
} catch (e) {
console.warn("relay swap: failed to re-register", e);
}
})();
}
});
relayDialogList.appendChild(item);
@@ -1237,5 +1264,26 @@ listen("signal-event", (event: any) => {
callStatusText.textContent = "";
incomingCallPanel.classList.add("hidden");
break;
case "reconnecting":
// Signal supervisor is retrying the relay connection. Show
// a non-blocking indicator; the user can keep using
// everything that doesn't need a live signal.
{
const relay = typeof data.relay === "string" ? data.relay : "relay";
directRegistered.textContent = `🔄 reconnecting to ${relay}`;
directRegistered.style.color = "var(--yellow)";
directRegistered.classList.remove("hidden");
}
break;
case "registered":
// Supervisor (re-)succeeded, or the first register landed.
// Clear the banner and show the registered state.
{
const fp = typeof data.fingerprint === "string" ? data.fingerprint : "";
directRegistered.textContent = `✓ registered${fp ? ` (${fp.slice(0, 16)}…)` : ""}`;
directRegistered.style.color = "var(--green)";
directRegistered.classList.remove("hidden");
}
break;
}
});