fix: federation presence dedup, stale cleanup, and Android SIGSEGV crash
Some checks failed
Mirror to GitHub / mirror (push) Failing after 29s
Build Release Binaries / build-amd64 (push) Failing after 1m57s

- Deduplicate remote participants by fingerprint in all merge sites
  (canonical == raw room name caused double-lookup, doubling every remote participant)
- GlobalRoomInactive now propagates updated participant list to other peers
  (hub relay B was not informing A when C's participants left)
- Add 15-second stale presence sweeper that purges remote participants
  from peers that stop sending data (safety net for QUIC timeout delays)
- Add @Synchronized to WzpEngine.getStats/stopCall/destroy to prevent
  TOCTOU race between stats polling coroutine and engine teardown (SIGSEGV)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-08 15:07:59 +04:00
parent 7dadc1ddd6
commit 2eab56beec
3 changed files with 187 additions and 44 deletions

View File

@@ -53,6 +53,7 @@ class WzpEngine(private val callback: WzpCallback) {
} }
/** Stop the active call. Safe to call when no call is active. */ /** Stop the active call. Safe to call when no call is active. */
@Synchronized
fun stopCall() { fun stopCall() {
if (nativeHandle != 0L) { if (nativeHandle != 0L) {
nativeStopCall(nativeHandle) nativeStopCall(nativeHandle)
@@ -76,6 +77,7 @@ class WzpEngine(private val callback: WzpCallback) {
* *
* @return JSON-serialised [CallStats], or `"{}"` if the engine is not initialised. * @return JSON-serialised [CallStats], or `"{}"` if the engine is not initialised.
*/ */
@Synchronized
fun getStats(): String { fun getStats(): String {
if (nativeHandle == 0L) return "{}" if (nativeHandle == 0L) return "{}"
return try { return try {
@@ -95,6 +97,7 @@ class WzpEngine(private val callback: WzpCallback) {
} }
/** Destroy the native engine and free all resources. The instance must not be reused. */ /** Destroy the native engine and free all resources. The instance must not be reused. */
@Synchronized
fun destroy() { fun destroy() {
if (nativeHandle != 0L) { if (nativeHandle != 0L) {
nativeDestroy(nativeHandle) nativeDestroy(nativeHandle)

View File

@@ -114,12 +114,16 @@ struct PeerLink {
active_rooms: HashSet<String>, active_rooms: HashSet<String>,
/// Remote participants per room (for federated presence in RoomUpdate). /// Remote participants per room (for federated presence in RoomUpdate).
remote_participants: HashMap<String, Vec<wzp_proto::packet::RoomParticipant>>, remote_participants: HashMap<String, Vec<wzp_proto::packet::RoomParticipant>>,
/// Last time we received any data (signal or media) from this peer.
last_seen: Instant,
} }
/// Max federation packets per second per room (0 = unlimited). /// Max federation packets per second per room (0 = unlimited).
const FEDERATION_RATE_LIMIT_PPS: u32 = 500; const FEDERATION_RATE_LIMIT_PPS: u32 = 500;
/// Dedup window size (number of recent packets to remember). /// Dedup window size (number of recent packets to remember).
const DEDUP_WINDOW_SIZE: usize = 4096; const DEDUP_WINDOW_SIZE: usize = 4096;
/// Remote participants are considered stale after this duration with no updates.
const REMOTE_PARTICIPANT_STALE_SECS: u64 = 15;
/// Manages federation connections and global room forwarding. /// Manages federation connections and global room forwarding.
pub struct FederationManager { pub struct FederationManager {
@@ -222,6 +226,12 @@ impl FederationManager {
run_room_event_dispatcher(this, room_events).await; run_room_event_dispatcher(this, room_events).await;
})); }));
// Stale presence sweeper — purges remote participants from dead peers
let this = self.clone();
handles.push(tokio::spawn(async move {
run_stale_presence_sweeper(this).await;
}));
for h in handles { for h in handles {
let _ = h.await; let _ = h.await;
} }
@@ -242,6 +252,7 @@ impl FederationManager {
} }
/// Get all remote participants for a room from all peer links. /// Get all remote participants for a room from all peer links.
/// Deduplicates by fingerprint (same participant may appear via multiple links).
pub async fn get_remote_participants(&self, room: &str) -> Vec<wzp_proto::packet::RoomParticipant> { pub async fn get_remote_participants(&self, room: &str) -> Vec<wzp_proto::packet::RoomParticipant> {
let canonical = self.resolve_global_room(room); let canonical = self.resolve_global_room(room);
let links = self.peer_links.lock().await; let links = self.peer_links.lock().await;
@@ -252,12 +263,21 @@ impl FederationManager {
if let Some(remote) = link.remote_participants.get(c) { if let Some(remote) = link.remote_participants.get(c) {
result.extend(remote.iter().cloned()); result.extend(remote.iter().cloned());
} }
} // Also check raw room name, but only if different from canonical
// Also check raw room name if c != room {
if let Some(remote) = link.remote_participants.get(room) { if let Some(remote) = link.remote_participants.get(room) {
result.extend(remote.iter().cloned()); result.extend(remote.iter().cloned());
}
}
} else {
if let Some(remote) = link.remote_participants.get(room) {
result.extend(remote.iter().cloned());
}
} }
} }
// Deduplicate by fingerprint
let mut seen = HashSet::new();
result.retain(|p| seen.insert(p.fingerprint.clone()));
result result
} }
@@ -372,6 +392,76 @@ async fn run_room_event_dispatcher(
} }
} }
// ── Stale presence sweeper ──
/// Periodically checks for stale remote participants and purges them.
/// This handles the case where a peer link dies without sending GlobalRoomInactive
/// (e.g., QUIC timeout, network partition, crash).
async fn run_stale_presence_sweeper(fm: Arc<FederationManager>) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let stale_threshold = Duration::from_secs(REMOTE_PARTICIPANT_STALE_SECS);
// Find peers with stale remote_participants whose link is also gone or idle
let stale_rooms: Vec<(String, String)> = {
let links = fm.peer_links.lock().await;
let mut stale = Vec::new();
for (fp, link) in links.iter() {
if link.last_seen.elapsed() > stale_threshold && !link.remote_participants.is_empty() {
for room in link.remote_participants.keys() {
stale.push((fp.clone(), room.clone()));
}
}
}
stale
};
if stale_rooms.is_empty() {
continue;
}
// Purge stale entries and collect affected rooms
let mut affected_rooms = HashSet::new();
{
let mut links = fm.peer_links.lock().await;
for (fp, room) in &stale_rooms {
if let Some(link) = links.get_mut(fp.as_str()) {
if link.last_seen.elapsed() > stale_threshold {
info!(peer = %link.label, room = %room, "purging stale remote participants (no data for {}s)", link.last_seen.elapsed().as_secs());
link.remote_participants.remove(room);
link.active_rooms.remove(room);
affected_rooms.insert(room.clone());
}
}
}
}
// Broadcast updated RoomUpdate for affected rooms
for room in &affected_rooms {
let mgr = fm.room_mgr.lock().await;
for local_room in mgr.active_rooms() {
if fm.resolve_global_room(&local_room) == fm.resolve_global_room(room) {
let mut all_participants = mgr.local_participant_list(&local_room);
let remote = fm.get_remote_participants(&local_room).await;
all_participants.extend(remote);
let mut seen = HashSet::new();
all_participants.retain(|p| seen.insert(p.fingerprint.clone()));
let update = SignalMessage::RoomUpdate {
count: all_participants.len() as u32,
participants: all_participants,
};
let senders = mgr.local_senders(&local_room);
drop(mgr);
room::broadcast_signal(&senders, &update).await;
info!(room = %room, "swept stale presence — broadcast updated RoomUpdate");
break;
}
}
}
}
}
// ── Peer connection management ── // ── Peer connection management ──
/// Persistent connection loop for one peer — reconnects with backoff. /// Persistent connection loop for one peer — reconnects with backoff.
@@ -433,6 +523,7 @@ async fn run_federation_link(
label: peer_label.clone(), label: peer_label.clone(),
active_rooms: HashSet::new(), active_rooms: HashSet::new(),
remote_participants: HashMap::new(), remote_participants: HashMap::new(),
last_seen: Instant::now(),
}); });
} }
@@ -552,6 +643,14 @@ async fn handle_signal(
peer_label: &str, peer_label: &str,
msg: SignalMessage, msg: SignalMessage,
) { ) {
// Update last_seen for this peer
{
let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(peer_fp) {
link.last_seen = Instant::now();
}
}
match msg { match msg {
SignalMessage::GlobalRoomActive { room, participants } => { SignalMessage::GlobalRoomActive { room, participants } => {
if fm.is_global_room(&room) { if fm.is_global_room(&room) {
@@ -590,7 +689,7 @@ async fn handle_signal(
let mgr = fm.room_mgr.lock().await; let mgr = fm.room_mgr.lock().await;
for local_room in mgr.active_rooms() { for local_room in mgr.active_rooms() {
if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) {
// Build merged participant list: local + all remote // Build merged participant list: local + all remote (deduped)
let mut all_participants = mgr.local_participant_list(&local_room); let mut all_participants = mgr.local_participant_list(&local_room);
let links = fm.peer_links.lock().await; let links = fm.peer_links.lock().await;
for link in links.values() { for link in links.values() {
@@ -598,12 +697,17 @@ async fn handle_signal(
if let Some(remote) = link.remote_participants.get(canonical) { if let Some(remote) = link.remote_participants.get(canonical) {
all_participants.extend(remote.iter().cloned()); all_participants.extend(remote.iter().cloned());
} }
// Also check raw room name // Also check raw room name, but only if different from canonical
if let Some(remote) = link.remote_participants.get(&local_room) { if canonical != local_room {
all_participants.extend(remote.iter().cloned()); if let Some(remote) = link.remote_participants.get(&local_room) {
all_participants.extend(remote.iter().cloned());
}
} }
} }
} }
// Deduplicate by fingerprint
let mut seen = HashSet::new();
all_participants.retain(|p| seen.insert(p.fingerprint.clone()));
let update = SignalMessage::RoomUpdate { let update = SignalMessage::RoomUpdate {
count: all_participants.len() as u32, count: all_participants.len() as u32,
participants: all_participants, participants: all_participants,
@@ -634,45 +738,79 @@ async fn handle_signal(
let total: usize = links.values().map(|l| l.active_rooms.len()).sum(); let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
fm.metrics.federation_active_rooms.set(total as i64); fm.metrics.federation_active_rooms.set(total as i64);
// Check if any other peer still has this room — if none, propagate inactive // Build remaining remote participants (from all peers except the one going inactive)
let any_other_active = links.iter() let remaining_remote: Vec<wzp_proto::packet::RoomParticipant> = {
.any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room)); let canonical = fm.resolve_global_room(&room);
let local_active = { let mut result = Vec::new();
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms().iter().any(|r| r == &room)
};
if !any_other_active && !local_active {
for (fp, link) in links.iter() { for (fp, link) in links.iter() {
if fp != peer_fp { if fp == peer_fp { continue; }
let _ = link.transport.send_signal(&SignalMessage::GlobalRoomInactive { room: room.clone() }).await; if let Some(c) = canonical {
if let Some(remote) = link.remote_participants.get(c) {
result.extend(remote.iter().cloned());
}
} }
} }
} let mut seen = HashSet::new();
result.retain(|p| seen.insert(p.fingerprint.clone()));
result
};
// Propagate to other peers: send updated GlobalRoomActive with revised list,
// or GlobalRoomInactive if no participants remain anywhere
let local_active = {
let mgr = fm.room_mgr.lock().await;
mgr.active_rooms().iter().any(|r| fm.resolve_global_room(r) == fm.resolve_global_room(&room))
};
let has_remaining = !remaining_remote.is_empty() || local_active;
// Collect peer transports to send to (avoid holding lock across await)
let peer_sends: Vec<_> = links.iter()
.filter(|(fp, _)| *fp != peer_fp)
.map(|(_, link)| link.transport.clone())
.collect();
drop(links); drop(links);
if has_remaining {
// Send updated participant list to other peers
let mut updated_participants = remaining_remote.clone();
if local_active {
let mgr = fm.room_mgr.lock().await;
for local_room in mgr.active_rooms() {
if fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) {
updated_participants.extend(mgr.local_participant_list(&local_room));
break;
}
}
}
let msg = SignalMessage::GlobalRoomActive {
room: room.clone(),
participants: updated_participants,
};
for transport in &peer_sends {
let _ = transport.send_signal(&msg).await;
}
} else {
// No participants left anywhere — propagate inactive
let msg = SignalMessage::GlobalRoomInactive { room: room.clone() };
for transport in &peer_sends {
let _ = transport.send_signal(&msg).await;
}
}
// Broadcast updated RoomUpdate to local clients (remote participant removed) // Broadcast updated RoomUpdate to local clients (remote participant removed)
let mgr = fm.room_mgr.lock().await; let mgr = fm.room_mgr.lock().await;
for local_room in mgr.active_rooms() { for local_room in mgr.active_rooms() {
if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) {
let mut all_participants = mgr.local_participant_list(&local_room); let mut all_participants = mgr.local_participant_list(&local_room);
// Merge remaining remote participants from other peers all_participants.extend(remaining_remote.iter().cloned());
let links = fm.peer_links.lock().await; // Deduplicate by fingerprint
for link in links.values() { let mut seen = HashSet::new();
if let Some(canonical) = fm.resolve_global_room(&local_room) { all_participants.retain(|p| seen.insert(p.fingerprint.clone()));
if let Some(remote) = link.remote_participants.get(canonical) {
all_participants.extend(remote.iter().cloned());
}
if let Some(remote) = link.remote_participants.get(&local_room) {
all_participants.extend(remote.iter().cloned());
}
}
}
let update = SignalMessage::RoomUpdate { let update = SignalMessage::RoomUpdate {
count: all_participants.len() as u32, count: all_participants.len() as u32,
participants: all_participants, participants: all_participants,
}; };
let senders = mgr.local_senders(&local_room); let senders = mgr.local_senders(&local_room);
drop(links);
drop(mgr); drop(mgr);
room::broadcast_signal(&senders, &update).await; room::broadcast_signal(&senders, &update).await;
info!(room = %room, "broadcast updated presence (remote participant removed)"); info!(room = %room, "broadcast updated presence (remote participant removed)");
@@ -701,9 +839,15 @@ async fn handle_datagram(
None => return, None => return,
}; };
// Count inbound federation packet // Count inbound federation packet + update last_seen
fm.metrics.federation_packets_forwarded fm.metrics.federation_packets_forwarded
.with_label_values(&[source_peer_fp, "in"]).inc(); .with_label_values(&[source_peer_fp, "in"]).inc();
{
let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(source_peer_fp) {
link.last_seen = Instant::now();
}
}
// Dedup: drop packets we've already seen (multi-path duplicates) // Dedup: drop packets we've already seen (multi-path duplicates)
{ {

View File

@@ -765,17 +765,13 @@ async fn main() -> anyhow::Result<()> {
if fm.is_global_room(&room_name) { if fm.is_global_room(&room_name) {
if let SignalMessage::RoomUpdate { count: _, participants: mut local_parts } = update { if let SignalMessage::RoomUpdate { count: _, participants: mut local_parts } = update {
let remote = fm.get_remote_participants(&room_name).await; let remote = fm.get_remote_participants(&room_name).await;
if !remote.is_empty() { local_parts.extend(remote);
local_parts.extend(remote); // Deduplicate by fingerprint
SignalMessage::RoomUpdate { let mut seen = std::collections::HashSet::new();
count: local_parts.len() as u32, local_parts.retain(|p| seen.insert(p.fingerprint.clone()));
participants: local_parts, SignalMessage::RoomUpdate {
} count: local_parts.len() as u32,
} else { participants: local_parts,
SignalMessage::RoomUpdate {
count: local_parts.len() as u32,
participants: local_parts,
}
} }
} else { update } } else { update }
} else { update } } else { update }