From 2eab56beecffa4afaba7bb0ce4fbf42f5d8e268c Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 8 Apr 2026 15:07:59 +0400 Subject: [PATCH] fix: federation presence dedup, stale cleanup, and Android SIGSEGV crash - Deduplicate remote participants by fingerprint in all merge sites (canonical == raw room name caused double-lookup, doubling every remote participant) - GlobalRoomInactive now propagates updated participant list to other peers (hub relay B was not informing A when C's participants left) - Add 15-second stale presence sweeper that purges remote participants from peers that stop sending data (safety net for QUIC timeout delays) - Add @Synchronized to WzpEngine.getStats/stopCall/destroy to prevent TOCTOU race between stats polling coroutine and engine teardown (SIGSEGV) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/main/java/com/wzp/engine/WzpEngine.kt | 3 + crates/wzp-relay/src/federation.rs | 210 +++++++++++++++--- crates/wzp-relay/src/main.rs | 18 +- 3 files changed, 187 insertions(+), 44 deletions(-) diff --git a/android/app/src/main/java/com/wzp/engine/WzpEngine.kt b/android/app/src/main/java/com/wzp/engine/WzpEngine.kt index 64c37ae..0db3ff5 100644 --- a/android/app/src/main/java/com/wzp/engine/WzpEngine.kt +++ b/android/app/src/main/java/com/wzp/engine/WzpEngine.kt @@ -53,6 +53,7 @@ class WzpEngine(private val callback: WzpCallback) { } /** Stop the active call. Safe to call when no call is active. */ + @Synchronized fun stopCall() { if (nativeHandle != 0L) { nativeStopCall(nativeHandle) @@ -76,6 +77,7 @@ class WzpEngine(private val callback: WzpCallback) { * * @return JSON-serialised [CallStats], or `"{}"` if the engine is not initialised. */ + @Synchronized fun getStats(): String { if (nativeHandle == 0L) return "{}" return try { @@ -95,6 +97,7 @@ class WzpEngine(private val callback: WzpCallback) { } /** Destroy the native engine and free all resources. The instance must not be reused. */ + @Synchronized fun destroy() { if (nativeHandle != 0L) { nativeDestroy(nativeHandle) diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 8e4e4d4..49d65bb 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -114,12 +114,16 @@ struct PeerLink { active_rooms: HashSet, /// Remote participants per room (for federated presence in RoomUpdate). remote_participants: HashMap>, + /// Last time we received any data (signal or media) from this peer. + last_seen: Instant, } /// Max federation packets per second per room (0 = unlimited). const FEDERATION_RATE_LIMIT_PPS: u32 = 500; /// Dedup window size (number of recent packets to remember). const DEDUP_WINDOW_SIZE: usize = 4096; +/// Remote participants are considered stale after this duration with no updates. +const REMOTE_PARTICIPANT_STALE_SECS: u64 = 15; /// Manages federation connections and global room forwarding. pub struct FederationManager { @@ -222,6 +226,12 @@ impl FederationManager { run_room_event_dispatcher(this, room_events).await; })); + // Stale presence sweeper — purges remote participants from dead peers + let this = self.clone(); + handles.push(tokio::spawn(async move { + run_stale_presence_sweeper(this).await; + })); + for h in handles { let _ = h.await; } @@ -242,6 +252,7 @@ impl FederationManager { } /// Get all remote participants for a room from all peer links. + /// Deduplicates by fingerprint (same participant may appear via multiple links). pub async fn get_remote_participants(&self, room: &str) -> Vec { let canonical = self.resolve_global_room(room); let links = self.peer_links.lock().await; @@ -252,12 +263,21 @@ impl FederationManager { if let Some(remote) = link.remote_participants.get(c) { result.extend(remote.iter().cloned()); } - } - // Also check raw room name - if let Some(remote) = link.remote_participants.get(room) { - result.extend(remote.iter().cloned()); + // Also check raw room name, but only if different from canonical + if c != room { + if let Some(remote) = link.remote_participants.get(room) { + result.extend(remote.iter().cloned()); + } + } + } else { + if let Some(remote) = link.remote_participants.get(room) { + result.extend(remote.iter().cloned()); + } } } + // Deduplicate by fingerprint + let mut seen = HashSet::new(); + result.retain(|p| seen.insert(p.fingerprint.clone())); result } @@ -372,6 +392,76 @@ async fn run_room_event_dispatcher( } } +// ── Stale presence sweeper ── + +/// Periodically checks for stale remote participants and purges them. +/// This handles the case where a peer link dies without sending GlobalRoomInactive +/// (e.g., QUIC timeout, network partition, crash). +async fn run_stale_presence_sweeper(fm: Arc) { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + let stale_threshold = Duration::from_secs(REMOTE_PARTICIPANT_STALE_SECS); + + // Find peers with stale remote_participants whose link is also gone or idle + let stale_rooms: Vec<(String, String)> = { + let links = fm.peer_links.lock().await; + let mut stale = Vec::new(); + for (fp, link) in links.iter() { + if link.last_seen.elapsed() > stale_threshold && !link.remote_participants.is_empty() { + for room in link.remote_participants.keys() { + stale.push((fp.clone(), room.clone())); + } + } + } + stale + }; + + if stale_rooms.is_empty() { + continue; + } + + // Purge stale entries and collect affected rooms + let mut affected_rooms = HashSet::new(); + { + let mut links = fm.peer_links.lock().await; + for (fp, room) in &stale_rooms { + if let Some(link) = links.get_mut(fp.as_str()) { + if link.last_seen.elapsed() > stale_threshold { + info!(peer = %link.label, room = %room, "purging stale remote participants (no data for {}s)", link.last_seen.elapsed().as_secs()); + link.remote_participants.remove(room); + link.active_rooms.remove(room); + affected_rooms.insert(room.clone()); + } + } + } + } + + // Broadcast updated RoomUpdate for affected rooms + for room in &affected_rooms { + let mgr = fm.room_mgr.lock().await; + for local_room in mgr.active_rooms() { + if fm.resolve_global_room(&local_room) == fm.resolve_global_room(room) { + let mut all_participants = mgr.local_participant_list(&local_room); + let remote = fm.get_remote_participants(&local_room).await; + all_participants.extend(remote); + let mut seen = HashSet::new(); + all_participants.retain(|p| seen.insert(p.fingerprint.clone())); + let update = SignalMessage::RoomUpdate { + count: all_participants.len() as u32, + participants: all_participants, + }; + let senders = mgr.local_senders(&local_room); + drop(mgr); + room::broadcast_signal(&senders, &update).await; + info!(room = %room, "swept stale presence — broadcast updated RoomUpdate"); + break; + } + } + } + } +} + // ── Peer connection management ── /// Persistent connection loop for one peer — reconnects with backoff. @@ -433,6 +523,7 @@ async fn run_federation_link( label: peer_label.clone(), active_rooms: HashSet::new(), remote_participants: HashMap::new(), + last_seen: Instant::now(), }); } @@ -552,6 +643,14 @@ async fn handle_signal( peer_label: &str, msg: SignalMessage, ) { + // Update last_seen for this peer + { + let mut links = fm.peer_links.lock().await; + if let Some(link) = links.get_mut(peer_fp) { + link.last_seen = Instant::now(); + } + } + match msg { SignalMessage::GlobalRoomActive { room, participants } => { if fm.is_global_room(&room) { @@ -590,7 +689,7 @@ async fn handle_signal( let mgr = fm.room_mgr.lock().await; for local_room in mgr.active_rooms() { if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { - // Build merged participant list: local + all remote + // Build merged participant list: local + all remote (deduped) let mut all_participants = mgr.local_participant_list(&local_room); let links = fm.peer_links.lock().await; for link in links.values() { @@ -598,12 +697,17 @@ async fn handle_signal( if let Some(remote) = link.remote_participants.get(canonical) { all_participants.extend(remote.iter().cloned()); } - // Also check raw room name - if let Some(remote) = link.remote_participants.get(&local_room) { - all_participants.extend(remote.iter().cloned()); + // Also check raw room name, but only if different from canonical + if canonical != local_room { + if let Some(remote) = link.remote_participants.get(&local_room) { + all_participants.extend(remote.iter().cloned()); + } } } } + // Deduplicate by fingerprint + let mut seen = HashSet::new(); + all_participants.retain(|p| seen.insert(p.fingerprint.clone())); let update = SignalMessage::RoomUpdate { count: all_participants.len() as u32, participants: all_participants, @@ -634,45 +738,79 @@ async fn handle_signal( let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); fm.metrics.federation_active_rooms.set(total as i64); - // Check if any other peer still has this room — if none, propagate inactive - let any_other_active = links.iter() - .any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room)); - let local_active = { - let mgr = fm.room_mgr.lock().await; - mgr.active_rooms().iter().any(|r| r == &room) - }; - if !any_other_active && !local_active { + // Build remaining remote participants (from all peers except the one going inactive) + let remaining_remote: Vec = { + let canonical = fm.resolve_global_room(&room); + let mut result = Vec::new(); for (fp, link) in links.iter() { - if fp != peer_fp { - let _ = link.transport.send_signal(&SignalMessage::GlobalRoomInactive { room: room.clone() }).await; + if fp == peer_fp { continue; } + if let Some(c) = canonical { + if let Some(remote) = link.remote_participants.get(c) { + result.extend(remote.iter().cloned()); + } } } - } + let mut seen = HashSet::new(); + result.retain(|p| seen.insert(p.fingerprint.clone())); + result + }; + + // Propagate to other peers: send updated GlobalRoomActive with revised list, + // or GlobalRoomInactive if no participants remain anywhere + let local_active = { + let mgr = fm.room_mgr.lock().await; + mgr.active_rooms().iter().any(|r| fm.resolve_global_room(r) == fm.resolve_global_room(&room)) + }; + let has_remaining = !remaining_remote.is_empty() || local_active; + + // Collect peer transports to send to (avoid holding lock across await) + let peer_sends: Vec<_> = links.iter() + .filter(|(fp, _)| *fp != peer_fp) + .map(|(_, link)| link.transport.clone()) + .collect(); drop(links); + if has_remaining { + // Send updated participant list to other peers + let mut updated_participants = remaining_remote.clone(); + if local_active { + let mgr = fm.room_mgr.lock().await; + for local_room in mgr.active_rooms() { + if fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { + updated_participants.extend(mgr.local_participant_list(&local_room)); + break; + } + } + } + let msg = SignalMessage::GlobalRoomActive { + room: room.clone(), + participants: updated_participants, + }; + for transport in &peer_sends { + let _ = transport.send_signal(&msg).await; + } + } else { + // No participants left anywhere — propagate inactive + let msg = SignalMessage::GlobalRoomInactive { room: room.clone() }; + for transport in &peer_sends { + let _ = transport.send_signal(&msg).await; + } + } + // Broadcast updated RoomUpdate to local clients (remote participant removed) let mgr = fm.room_mgr.lock().await; for local_room in mgr.active_rooms() { if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { let mut all_participants = mgr.local_participant_list(&local_room); - // Merge remaining remote participants from other peers - let links = fm.peer_links.lock().await; - for link in links.values() { - if let Some(canonical) = fm.resolve_global_room(&local_room) { - if let Some(remote) = link.remote_participants.get(canonical) { - all_participants.extend(remote.iter().cloned()); - } - if let Some(remote) = link.remote_participants.get(&local_room) { - all_participants.extend(remote.iter().cloned()); - } - } - } + all_participants.extend(remaining_remote.iter().cloned()); + // Deduplicate by fingerprint + let mut seen = HashSet::new(); + all_participants.retain(|p| seen.insert(p.fingerprint.clone())); let update = SignalMessage::RoomUpdate { count: all_participants.len() as u32, participants: all_participants, }; let senders = mgr.local_senders(&local_room); - drop(links); drop(mgr); room::broadcast_signal(&senders, &update).await; info!(room = %room, "broadcast updated presence (remote participant removed)"); @@ -701,9 +839,15 @@ async fn handle_datagram( None => return, }; - // Count inbound federation packet + // Count inbound federation packet + update last_seen fm.metrics.federation_packets_forwarded .with_label_values(&[source_peer_fp, "in"]).inc(); + { + let mut links = fm.peer_links.lock().await; + if let Some(link) = links.get_mut(source_peer_fp) { + link.last_seen = Instant::now(); + } + } // Dedup: drop packets we've already seen (multi-path duplicates) { diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 134942f..eaff258 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -765,17 +765,13 @@ async fn main() -> anyhow::Result<()> { if fm.is_global_room(&room_name) { if let SignalMessage::RoomUpdate { count: _, participants: mut local_parts } = update { let remote = fm.get_remote_participants(&room_name).await; - if !remote.is_empty() { - local_parts.extend(remote); - SignalMessage::RoomUpdate { - count: local_parts.len() as u32, - participants: local_parts, - } - } else { - SignalMessage::RoomUpdate { - count: local_parts.len() as u32, - participants: local_parts, - } + local_parts.extend(remote); + // Deduplicate by fingerprint + let mut seen = std::collections::HashSet::new(); + local_parts.retain(|p| seen.insert(p.fingerprint.clone())); + SignalMessage::RoomUpdate { + count: local_parts.len() as u32, + participants: local_parts, } } else { update } } else { update }