diff --git a/Cargo.lock b/Cargo.lock index 8253f4d..db39b7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1191,6 +1191,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dasp" version = "0.11.0" @@ -2341,6 +2355,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -7785,6 +7805,7 @@ dependencies = [ "axum 0.7.9", "bytes", "chrono", + "dashmap", "dirs", "futures-util", "prometheus", diff --git a/crates/wzp-relay/Cargo.toml b/crates/wzp-relay/Cargo.toml index 8e7a3e0..68704b3 100644 --- a/crates/wzp-relay/Cargo.toml +++ b/crates/wzp-relay/Cargo.toml @@ -28,6 +28,7 @@ prometheus = "0.13" axum = { version = "0.7", default-features = false, features = ["tokio", "http1", "ws"] } tower-http = { version = "0.6", features = ["fs"] } futures-util = "0.3" +dashmap = "6" dirs = "6" sha2 = { workspace = true } chrono = "0.4" diff --git a/crates/wzp-relay/src/federation.rs b/crates/wzp-relay/src/federation.rs index 287d7ad..7b4370e 100644 --- a/crates/wzp-relay/src/federation.rs +++ b/crates/wzp-relay/src/federation.rs @@ -134,7 +134,7 @@ pub struct FederationManager { peers: Vec, trusted: Vec, global_rooms: HashSet, - room_mgr: Arc>, + room_mgr: Arc, endpoint: quinn::Endpoint, local_tls_fp: String, metrics: Arc, @@ -161,7 +161,7 @@ impl FederationManager { peers: Vec, trusted: Vec, global_rooms: HashSet, - room_mgr: Arc>, + room_mgr: Arc, endpoint: quinn::Endpoint, local_tls_fp: String, metrics: Arc, @@ -333,10 +333,7 @@ impl FederationManager { } // Room event dispatcher - let room_events = { - let mgr = self.room_mgr.lock().await; - mgr.subscribe_events() - }; + let room_events = self.room_mgr.subscribe_events(); let this = self.clone(); handles.push(tokio::spawn(async move { run_room_event_dispatcher(this, room_events).await; @@ -483,10 +480,7 @@ async fn run_room_event_dispatcher( match events.recv().await { Ok(RoomEvent::LocalJoin { room }) => { if fm.is_global_room(&room) { - let participants = { - let mgr = fm.room_mgr.lock().await; - mgr.local_participant_list(&room) - }; + let participants = fm.room_mgr.local_participant_list(&room); info!(room = %room, count = participants.len(), "global room now active, announcing to peers"); let msg = SignalMessage::GlobalRoomActive { room, participants }; let links = fm.peer_links.lock().await; @@ -560,11 +554,11 @@ async fn run_stale_presence_sweeper(fm: Arc) { // Broadcast updated RoomUpdate for affected rooms for room in &affected_rooms { - let mgr = fm.room_mgr.lock().await; - for local_room in mgr.active_rooms() { - if fm.resolve_global_room(&local_room) == fm.resolve_global_room(room) { - let mut all_participants = mgr.local_participant_list(&local_room); - let remote = fm.get_remote_participants(&local_room).await; + let active = fm.room_mgr.active_rooms(); + for local_room in &active { + if fm.resolve_global_room(local_room) == fm.resolve_global_room(room) { + let mut all_participants = fm.room_mgr.local_participant_list(local_room); + let remote = fm.get_remote_participants(local_room).await; all_participants.extend(remote); let mut seen = HashSet::new(); all_participants.retain(|p| seen.insert(p.fingerprint.clone())); @@ -572,8 +566,7 @@ async fn run_stale_presence_sweeper(fm: Arc) { count: all_participants.len() as u32, participants: all_participants, }; - let senders = mgr.local_senders(&local_room); - drop(mgr); + let senders = fm.room_mgr.local_senders(local_room); room::broadcast_signal(&senders, &update).await; info!(room = %room, "swept stale presence — broadcast updated RoomUpdate"); break; @@ -651,14 +644,13 @@ async fn run_federation_link( // Announce our currently active global rooms to this new peer // Collect all announcements first, then send (avoid holding locks across await) let announcements = { - let mgr = fm.room_mgr.lock().await; - let active = mgr.active_rooms(); + let active = fm.room_mgr.active_rooms(); let mut msgs = Vec::new(); // Local rooms for room_name in &active { if fm.is_global_room(room_name) { - let participants = mgr.local_participant_list(room_name); + let participants = fm.room_mgr.local_participant_list(room_name); info!(peer = %peer_label, room = %room_name, participants = participants.len(), "announcing local global room to new peer"); msgs.push(SignalMessage::GlobalRoomActive { room: room_name.clone(), participants }); } @@ -828,22 +820,24 @@ async fn handle_signal( // Broadcast updated RoomUpdate to local clients in this room // Find the local room name (may be hashed or raw) - let mgr = fm.room_mgr.lock().await; - for local_room in mgr.active_rooms() { - if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { + let active = fm.room_mgr.active_rooms(); + for local_room in &active { + if fm.is_global_room(local_room) && fm.resolve_global_room(local_room) == fm.resolve_global_room(&room) { // Build merged participant list: local + all remote (deduped) - let mut all_participants = mgr.local_participant_list(&local_room); - let links = fm.peer_links.lock().await; - for link in links.values() { - if let Some(ref canonical) = fm.resolve_global_room(&local_room) { - if let Some(remote) = link.remote_participants.get(canonical.as_str()) { - all_participants.extend(remote.iter().cloned()); - } - // Also check raw room name, but only if different from canonical - if canonical != &local_room { - if let Some(remote) = link.remote_participants.get(&local_room) { + let mut all_participants = fm.room_mgr.local_participant_list(local_room); + { + let links = fm.peer_links.lock().await; + for link in links.values() { + if let Some(ref canonical) = fm.resolve_global_room(local_room) { + if let Some(remote) = link.remote_participants.get(canonical.as_str()) { all_participants.extend(remote.iter().cloned()); } + // Also check raw room name, but only if different from canonical + if canonical != local_room { + if let Some(remote) = link.remote_participants.get(local_room) { + all_participants.extend(remote.iter().cloned()); + } + } } } } @@ -854,9 +848,7 @@ async fn handle_signal( count: all_participants.len() as u32, participants: all_participants, }; - let senders = mgr.local_senders(&local_room); - drop(links); - drop(mgr); + let senders = fm.room_mgr.local_senders(local_room); room::broadcast_signal(&senders, &update).await; break; } @@ -899,10 +891,7 @@ async fn handle_signal( // Propagate to other peers: send updated GlobalRoomActive with revised list, // or GlobalRoomInactive if no participants remain anywhere - let local_active = { - let mgr = fm.room_mgr.lock().await; - mgr.active_rooms().iter().any(|r| fm.resolve_global_room(r) == fm.resolve_global_room(&room)) - }; + let local_active = fm.room_mgr.active_rooms().iter().any(|r| fm.resolve_global_room(r) == fm.resolve_global_room(&room)); let has_remaining = !remaining_remote.is_empty() || local_active; // Collect peer transports to send to (avoid holding lock across await) @@ -916,10 +905,9 @@ async fn handle_signal( // Send updated participant list to other peers let mut updated_participants = remaining_remote.clone(); if local_active { - let mgr = fm.room_mgr.lock().await; - for local_room in mgr.active_rooms() { + for local_room in fm.room_mgr.active_rooms() { if fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { - updated_participants.extend(mgr.local_participant_list(&local_room)); + updated_participants.extend(fm.room_mgr.local_participant_list(&local_room)); break; } } @@ -940,10 +928,10 @@ async fn handle_signal( } // Broadcast updated RoomUpdate to local clients (remote participant removed) - let mgr = fm.room_mgr.lock().await; - for local_room in mgr.active_rooms() { - if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) { - let mut all_participants = mgr.local_participant_list(&local_room); + let active = fm.room_mgr.active_rooms(); + for local_room in &active { + if fm.is_global_room(local_room) && fm.resolve_global_room(local_room) == fm.resolve_global_room(&room) { + let mut all_participants = fm.room_mgr.local_participant_list(local_room); all_participants.extend(remaining_remote.iter().cloned()); // Deduplicate by fingerprint let mut seen = HashSet::new(); @@ -952,8 +940,7 @@ async fn handle_signal( count: all_participants.len() as u32, participants: all_participants, }; - let senders = mgr.local_senders(&local_room); - drop(mgr); + let senders = fm.room_mgr.local_senders(local_room); room::broadcast_signal(&senders, &update).await; info!(room = %room, "broadcast updated presence (remote participant removed)"); break; @@ -1070,10 +1057,9 @@ async fn handle_datagram( } } - // Find room by hash — check local rooms AND global room config + // Find room by hash -- check local rooms AND global room config let room_name = { - let mgr = fm.room_mgr.lock().await; - let active = mgr.active_rooms(); + let active = fm.room_mgr.active_rooms(); // First: check local rooms (has participants) active.iter().find(|r| room_hash(r) == rh).cloned() .or_else(|| active.iter().find(|r| fm.global_room_hash(r) == rh).cloned()) @@ -1093,10 +1079,7 @@ async fn handle_datagram( // for a room we don't have locally — could be a // timing issue (peer joined before us) or a hash // mismatch. - let active = { - let mgr = fm.room_mgr.lock().await; - mgr.active_rooms() - }; + let active = fm.room_mgr.active_rooms(); warn!( room_hash = ?rh, active_rooms = ?active, @@ -1121,10 +1104,7 @@ async fn handle_datagram( // Deliver to all local participants — forward the raw bytes as-is. // The original sender's MediaPacket is preserved exactly (no re-serialization). - let locals = { - let mgr = fm.room_mgr.lock().await; - mgr.local_senders(&room_name) - }; + let locals = fm.room_mgr.local_senders(&room_name); for sender in &locals { match sender { room::ParticipantSender::Quic(t) => { diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 7493da6..9e16091 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -416,7 +416,7 @@ async fn main() -> anyhow::Result<()> { }; // Room manager (room mode only) - let room_mgr = Arc::new(Mutex::new(RoomManager::new())); + let room_mgr = Arc::new(RoomManager::new()); // Event log for protocol analysis let event_log = wzp_relay::event_log::start_event_log( @@ -1621,9 +1621,7 @@ async fn main() -> anyhow::Result<()> { // Call rooms: enforce 2-participant limit if room_name.starts_with("call-") { - let mgr = room_mgr.lock().await; - if mgr.room_size(&room_name) >= 2 { - drop(mgr); + if room_mgr.room_size(&room_name) >= 2 { warn!(%addr, room = %room_name, "call room full (max 2 participants)"); metrics.active_sessions.dec(); let mut smgr = session_mgr.lock().await; @@ -1634,8 +1632,7 @@ async fn main() -> anyhow::Result<()> { } let participant_id = { - let mut mgr = room_mgr.lock().await; - match mgr.join( + match room_mgr.join( &room_name, addr, room::ParticipantSender::Quic(transport.clone()), @@ -1643,8 +1640,7 @@ async fn main() -> anyhow::Result<()> { caller_alias.as_deref(), ) { Ok((id, update, senders)) => { - metrics.active_rooms.set(mgr.list().len() as i64); - drop(mgr); // release lock before async broadcast + metrics.active_rooms.set(room_mgr.list().len() as i64); // Merge federated participants into RoomUpdate if this is a global room let merged_update = if let Some(ref fm) = federation_mgr { @@ -1729,10 +1725,7 @@ async fn main() -> anyhow::Result<()> { } metrics.remove_session_metrics(&session_id_str); metrics.active_sessions.dec(); - { - let mgr = room_mgr.lock().await; - metrics.active_rooms.set(mgr.list().len() as i64); - } + metrics.active_rooms.set(room_mgr.list().len() as i64); { let mut smgr = session_mgr.lock().await; smgr.remove_session(session_id); diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 41eee22..830f5a5 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use tokio::sync::Mutex; +use dashmap::DashMap; use tracing::{error, info, warn}; use wzp_proto::packet::TrunkFrame; @@ -277,12 +277,18 @@ struct Participant { /// A room holding multiple participants. struct Room { participants: Vec, + /// Per-participant quality tracking, keyed by participant_id. + qualities: HashMap, + /// Current room-wide tier (to avoid repeated broadcasts). + current_tier: Tier, } impl Room { fn new() -> Self { Self { participants: Vec::new(), + qualities: HashMap::new(), + current_tier: Tier::Good, } } @@ -339,29 +345,27 @@ impl Room { } /// Manages all rooms on the relay. +/// +/// Uses `DashMap` for per-room sharded locking -- rooms are independently +/// lockable so the media hot-path never contends on a single mutex. pub struct RoomManager { - rooms: HashMap, - /// Room access control list. Maps hashed room name → allowed fingerprints. + rooms: DashMap, + /// Room access control list. Maps hashed room name -> allowed fingerprints. /// When `None`, rooms are open (no auth mode). When `Some`, only listed - /// fingerprints can join the corresponding room. - acl: Option>>, + /// fingerprints can join the corresponding room. Protected by std Mutex + /// since ACL mutations are rare (only during call setup). + acl: Option>>>, /// Channel for room lifecycle events (federation subscribes). event_tx: tokio::sync::broadcast::Sender, - /// Per-participant quality tracking, keyed by (room_name, participant_id). - qualities: HashMap<(String, ParticipantId), ParticipantQuality>, - /// Current room-wide tier per room (to avoid repeated broadcasts). - room_tiers: HashMap, } impl RoomManager { pub fn new() -> Self { let (event_tx, _) = tokio::sync::broadcast::channel(64); Self { - rooms: HashMap::new(), + rooms: DashMap::new(), acl: None, event_tx, - qualities: HashMap::new(), - room_tiers: HashMap::new(), } } @@ -369,11 +373,9 @@ impl RoomManager { pub fn with_acl() -> Self { let (event_tx, _) = tokio::sync::broadcast::channel(64); Self { - rooms: HashMap::new(), - acl: Some(HashMap::new()), + rooms: DashMap::new(), + acl: Some(std::sync::Mutex::new(HashMap::new())), event_tx, - qualities: HashMap::new(), - room_tiers: HashMap::new(), } } @@ -383,9 +385,10 @@ impl RoomManager { } /// Grant a fingerprint access to a room. - pub fn allow(&mut self, room_name: &str, fingerprint: &str) { - if let Some(ref mut acl) = self.acl { - acl.entry(room_name.to_string()) + pub fn allow(&self, room_name: &str, fingerprint: &str) { + if let Some(ref acl) = self.acl { + acl.lock().unwrap() + .entry(room_name.to_string()) .or_default() .insert(fingerprint.to_string()); } @@ -398,6 +401,7 @@ impl RoomManager { (None, _) => true, // no ACL = open (Some(_), None) => false, // ACL enabled but no fingerprint (Some(acl), Some(fp)) => { + let acl = acl.lock().unwrap(); // Room not in ACL = open room (allow anyone authenticated) match acl.get(room_name) { None => true, @@ -409,7 +413,7 @@ impl RoomManager { /// Join a room. Returns (participant_id, room_update_msg, all_senders) for broadcasting. pub fn join( - &mut self, + &self, room_name: &str, addr: std::net::SocketAddr, sender: ParticipantSender, @@ -420,25 +424,25 @@ impl RoomManager { warn!(room = room_name, fingerprint = ?fingerprint, "unauthorized room join attempt"); return Err("not authorized for this room".to_string()); } - let was_empty = !self.rooms.contains_key(room_name) - || self.rooms.get(room_name).map_or(true, |r| r.is_empty()); - let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); + let was_empty = self.rooms.get(room_name).map_or(true, |r| r.is_empty()); + let mut room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string())); - self.qualities.insert((room_name.to_string(), id), ParticipantQuality::new()); - if was_empty { - let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() }); - } + room.qualities.insert(id, ParticipantQuality::new()); let update = wzp_proto::SignalMessage::RoomUpdate { count: room.len() as u32, participants: room.participant_list(), }; let senders = room.all_senders(); + drop(room); // release DashMap guard before event_tx send (not async, but good practice) + if was_empty { + let _ = self.event_tx.send(RoomEvent::LocalJoin { room: room_name.to_string() }); + } Ok((id, update, senders)) } /// Join a room via WebSocket. Convenience wrapper around `join()`. pub fn join_ws( - &mut self, + &self, room_name: &str, addr: std::net::SocketAddr, sender: tokio::sync::mpsc::Sender, @@ -450,7 +454,7 @@ impl RoomManager { /// Get list of active room names. pub fn active_rooms(&self) -> Vec { - self.rooms.keys().cloned().collect() + self.rooms.iter().map(|r| r.key().clone()).collect() } /// Get participant list for a room (fingerprint + alias). @@ -470,26 +474,29 @@ impl RoomManager { } /// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty. - pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec)> { - self.qualities.remove(&(room_name.to_string(), participant_id)); - if let Some(room) = self.rooms.get_mut(room_name) { - room.remove(participant_id); - if room.is_empty() { - self.rooms.remove(room_name); - self.room_tiers.remove(room_name); - let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() }); - info!(room = room_name, "room closed (empty)"); - return None; + pub fn leave(&self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec)> { + let result = { + if let Some(mut room) = self.rooms.get_mut(room_name) { + room.qualities.remove(&participant_id); + room.remove(participant_id); + if room.is_empty() { + drop(room); // release write guard before remove + self.rooms.remove(room_name); + let _ = self.event_tx.send(RoomEvent::LocalLeave { room: room_name.to_string() }); + info!(room = room_name, "room closed (empty)"); + return None; + } + let update = wzp_proto::SignalMessage::RoomUpdate { + count: room.len() as u32, + participants: room.participant_list(), + }; + let senders = room.all_senders(); + Some((update, senders)) + } else { + None } - let update = wzp_proto::SignalMessage::RoomUpdate { - count: room.len() as u32, - participants: room.participant_list(), - }; - let senders = room.all_senders(); - Some((update, senders)) - } else { - None - } + }; + result } /// Get senders for all OTHER participants in a room. @@ -509,23 +516,29 @@ impl RoomManager { self.rooms.get(room_name).map(|r| r.len()).unwrap_or(0) } + /// Check if a room exists and has participants. + pub fn is_room_active(&self, room_name: &str) -> bool { + self.rooms.contains_key(room_name) + } + /// List all rooms with their sizes. pub fn list(&self) -> Vec<(String, usize)> { - self.rooms.iter().map(|(k, v)| (k.clone(), v.len())).collect() + self.rooms.iter().map(|r| (r.key().clone(), r.len())).collect() } /// Feed a quality report from a participant. If the room-wide weakest /// tier changes, returns `(QualityDirective signal, all senders)` for /// broadcasting. pub fn observe_quality( - &mut self, + &self, room_name: &str, participant_id: ParticipantId, report: &wzp_proto::packet::QualityReport, ) -> Option<(wzp_proto::SignalMessage, Vec)> { - let key = (room_name.to_string(), participant_id); - let tier_changed = self.qualities - .get_mut(&key) + let mut room = self.rooms.get_mut(room_name)?; + + let tier_changed = room.qualities + .get_mut(&participant_id) .and_then(|pq| pq.observe(report)) .is_some(); @@ -534,22 +547,19 @@ impl RoomManager { } // Compute the weakest tier across all participants in this room - let room_qualities = self.qualities.iter() - .filter(|((rn, _), _)| rn == room_name) - .map(|(_, pq)| pq); - let weakest = weakest_tier(room_qualities); + let weakest = weakest_tier(room.qualities.values()); - let current_room_tier = self.room_tiers.get(room_name).copied().unwrap_or(Tier::Good); - if weakest == current_room_tier { + if weakest == room.current_tier { return None; } - // Room-wide tier changed — update and broadcast directive - self.room_tiers.insert(room_name.to_string(), weakest); + // Room-wide tier changed -- update and broadcast directive + let old_tier = room.current_tier; + room.current_tier = weakest; let profile = weakest.profile(); info!( room = room_name, - old_tier = ?current_room_tier, + old_tier = ?old_tier, new_tier = ?weakest, codec = ?profile.codec, fec_ratio = profile.fec_ratio, @@ -560,9 +570,7 @@ impl RoomManager { recommended_profile: profile, reason: Some(format!("weakest link: {weakest:?}")), }; - let senders = self.rooms.get(room_name) - .map(|r| r.all_senders()) - .unwrap_or_default(); + let senders = room.all_senders(); Some((directive, senders)) } } @@ -646,7 +654,7 @@ impl TrunkedForwarder { /// into [`TrunkedForwarder`]s and flushed every 5 ms or when the batcher is /// full, reducing QUIC datagram overhead. pub async fn run_participant( - room_mgr: Arc>, + room_mgr: Arc, room_name: String, participant_id: ParticipantId, transport: Arc, @@ -672,7 +680,7 @@ pub async fn run_participant( /// Plain (non-trunked) forwarding loop — original behaviour. async fn run_participant_plain( - room_mgr: Arc>, + room_mgr: Arc, room_name: String, participant_id: ParticipantId, transport: Arc, @@ -746,13 +754,12 @@ async fn run_participant_plain( // Get current list of other participants + check quality directive let lock_start = std::time::Instant::now(); let (others, quality_directive) = { - let mut mgr = room_mgr.lock().await; let directive = if let Some(ref report) = pkt.quality_report { - mgr.observe_quality(&room_name, participant_id, report) + room_mgr.observe_quality(&room_name, participant_id, report) } else { None }; - let o = mgr.others(&room_name, participant_id); + let o = room_mgr.others(&room_name, participant_id); (o, directive) }; let lock_ms = lock_start.elapsed().as_millis() as u64; @@ -841,10 +848,7 @@ async fn run_participant_plain( // Periodic stats log every 5 seconds if last_log_instant.elapsed() >= Duration::from_secs(5) { - let room_size = { - let mgr = room_mgr.lock().await; - mgr.room_size(&room_name) - }; + let room_size = room_mgr.room_size(&room_name); info!( room = %room_name, participant = participant_id, @@ -867,9 +871,7 @@ async fn run_participant_plain( } // Clean up — leave room and broadcast update to remaining participants - let mut mgr = room_mgr.lock().await; - if let Some((update, senders)) = mgr.leave(&room_name, participant_id) { - drop(mgr); // release lock before async broadcast + if let Some((update, senders)) = room_mgr.leave(&room_name, participant_id) { if let Some(ref tap) = debug_tap { if tap.matches(&room_name) { tap.log_event(&room_name, "leave", &format!( @@ -890,7 +892,7 @@ async fn run_participant_plain( /// Trunked forwarding loop — batches outgoing packets per peer. async fn run_participant_trunked( - room_mgr: Arc>, + room_mgr: Arc, room_name: String, participant_id: ParticipantId, transport: Arc, @@ -965,13 +967,12 @@ async fn run_participant_trunked( let lock_start = std::time::Instant::now(); let (others, quality_directive) = { - let mut mgr = room_mgr.lock().await; let directive = if let Some(ref report) = pkt.quality_report { - mgr.observe_quality(&room_name, participant_id, report) + room_mgr.observe_quality(&room_name, participant_id, report) } else { None }; - let o = mgr.others(&room_name, participant_id); + let o = room_mgr.others(&room_name, participant_id); (o, directive) }; let lock_ms = lock_start.elapsed().as_millis() as u64; @@ -1037,10 +1038,7 @@ async fn run_participant_trunked( // Periodic stats every 5 seconds if last_log_instant.elapsed() >= Duration::from_secs(5) { - let room_size = { - let mgr = room_mgr.lock().await; - mgr.room_size(&room_name) - }; + let room_size = room_mgr.room_size(&room_name); info!( room = %room_name, participant = participant_id, @@ -1081,9 +1079,7 @@ async fn run_participant_trunked( let _ = fwd.flush().await; } - let mut mgr = room_mgr.lock().await; - if let Some((update, senders)) = mgr.leave(&room_name, participant_id) { - drop(mgr); + if let Some((update, senders)) = room_mgr.leave(&room_name, participant_id) { broadcast_signal(&senders, &update).await; } } @@ -1129,7 +1125,7 @@ mod tests { #[test] fn acl_restricts_to_allowed() { - let mut mgr = RoomManager::with_acl(); + let mgr = RoomManager::with_acl(); mgr.allow("room1", "alice"); mgr.allow("room1", "bob"); assert!(mgr.is_authorized("room1", Some("alice"))); diff --git a/crates/wzp-relay/src/ws.rs b/crates/wzp-relay/src/ws.rs index 58d32cf..3fa1f66 100644 --- a/crates/wzp-relay/src/ws.rs +++ b/crates/wzp-relay/src/ws.rs @@ -31,7 +31,7 @@ use crate::session_mgr::SessionManager; /// Shared state for WebSocket handlers. #[derive(Clone)] pub struct WsState { - pub room_mgr: Arc>, + pub room_mgr: Arc, pub session_mgr: Arc>, pub auth_url: Option, pub metrics: Arc, @@ -143,10 +143,9 @@ async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) { // 4. Join room with WS sender let addr: SocketAddr = ([0, 0, 0, 0], 0).into(); let participant_id = { - let mut mgr = state.room_mgr.lock().await; - match mgr.join_ws(&room, addr, tx, fingerprint.as_deref()) { + match state.room_mgr.join_ws(&room, addr, tx, fingerprint.as_deref()) { Ok(id) => { - state.metrics.active_rooms.set(mgr.list().len() as i64); + state.metrics.active_rooms.set(state.room_mgr.list().len() as i64); id } Err(e) => { @@ -184,10 +183,7 @@ async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) { loop { match ws_rx.next().await { Some(Ok(Message::Binary(data))) => { - let others = { - let mgr = state.room_mgr.lock().await; - mgr.others(&room, participant_id) - }; + let others = state.room_mgr.others(&room, participant_id); for other in &others { let _ = other.send_raw(&data).await; } @@ -214,11 +210,8 @@ async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) { reg.unregister_local(fp); } - { - let mut mgr = state.room_mgr.lock().await; - mgr.leave(&room, participant_id); - state.metrics.active_rooms.set(mgr.list().len() as i64); - } + state.room_mgr.leave(&room, participant_id); + state.metrics.active_rooms.set(state.room_mgr.list().len() as i64); let session_id_str: String = session_id.iter().map(|b| format!("{b:02x}")).collect(); state.metrics.remove_session_metrics(&session_id_str); diff --git a/docs/PRD-relay-concurrency.md b/docs/PRD-relay-concurrency.md index 0cee2ef..ef9c3d2 100644 --- a/docs/PRD-relay-concurrency.md +++ b/docs/PRD-relay-concurrency.md @@ -1,4 +1,4 @@ -# PRD: Relay Concurrency — Per-Room Lock Sharding +# PRD: Relay Concurrency — DashMap Room Sharding ## Problem @@ -188,72 +188,119 @@ for sender in senders.iter() { - Snapshot doesn't include mutable room state (quality tiers) - More complex join/leave (must rebuild snapshot atomically) -**Verdict: Best theoretical performance, but adds complexity. Worth it if Option A proves insufficient.** +**Verdict: Best theoretical performance, but adds complexity. Consider if DashMap proves insufficient.** -## Recommended Implementation: Option A + Federation Fix +## Recommended Implementation: Option B (DashMap) + Federation Fix -### Phase 1: Per-Room Locks (Biggest Win) +DashMap is the right tool here. The original objections don't hold up: -1. Move `qualities` and `room_tiers` into the `Room` struct (they're per-room anyway) -2. Wrap each Room in `Arc>` -3. RoomManager outer lock becomes a thin room-lookup layer -4. Per-packet hot path acquires only the per-room lock +- "Guards can't be held across `.await`" — we already drop locks before any async sends +- "Less control" — DashMap's 64 internal shards give finer granularity than manual per-room locks +- "New dependency" — one crate, battle-tested, widely used in the Rust ecosystem + +DashMap's advantages over manual per-room `Arc>`: +- **No two-level locking** — single `rooms.get()` vs outer-lock → Arc clone → drop → inner-lock +- **Read/write separation** — `get()` is a shared shard lock, multiple rooms on the same shard can read concurrently +- **Less code** — no manual Arc/Mutex wrapping, no explicit lock choreography +- **Iteration without global lock** — federation room announcements don't block media forwarding + +### Phase 1: DashMap Room Storage (Biggest Win) + +1. Add `dashmap` dependency to `wzp-relay` +2. Replace `rooms: HashMap` with `rooms: DashMap` +3. Move `qualities` and `room_tiers` into the `Room` struct (per-room state, not global) +4. RoomManager no longer needs a wrapping Mutex — it becomes `Arc` directly +5. Per-packet hot path: `rooms.get(&name)` takes a shared shard lock, releases on drop + +```rust +pub struct RoomManager { + rooms: DashMap, + acl: Option>>, // read-only after init + event_tx: broadcast::Sender, +} + +struct Room { + participants: Vec, + qualities: HashMap, + current_tier: Tier, +} + +// Hot path becomes: +let (others, directive) = if let Some(mut room) = room_mgr.rooms.get_mut(&room_name) { + let directive = if let Some(ref qr) = pkt.quality_report { + room.observe_quality(participant_id, qr) + } else { + None + }; + let o = room.others(participant_id); + (o, directive) +} else { + (vec![], None) +}; +// Shard lock released here — fan-out sends are lock-free +``` **Files to modify:** -- `crates/wzp-relay/src/room.rs` — Room struct, RoomManager refactor -- `crates/wzp-relay/src/lib.rs` — re-exports if needed +- `crates/wzp-relay/Cargo.toml` — add `dashmap` dependency +- `crates/wzp-relay/src/room.rs` — RoomManager struct, Room struct, all methods +- `crates/wzp-relay/src/lib.rs` — change from `Arc>` to `Arc` +- `crates/wzp-relay/src/main.rs` — update RoomManager construction and all `.lock().await` call sites +- `crates/wzp-relay/src/federation.rs` — update room_mgr usage (no more `.lock().await`) -**Expected change:** ~100 lines modified, ~20 new +**Key behavior change:** `Arc>` → `Arc`. Every call site that does `room_mgr.lock().await.some_method()` becomes `room_mgr.some_method()` directly. The DashMap handles internal locking. **Concurrency improvement:** -- Before: 100 rooms × 10 people = all 1000 tasks compete for 1 lock -- After: 100 rooms × 10 people = 10 tasks compete for 1 lock per room (100× improvement) +- Before: 100 rooms × 10 people = all 1000 tasks compete for 1 Mutex +- After: 100 rooms × 10 people = distributed across 64 shards, ~15 tasks per shard average +- Within a room: participants still serialize through the shard lock, but hold time is <0.1ms for `get()` and `others()` (just Vec clone of Arcs) ### Phase 2: Federation Lock Fix -Fix `forward_to_peers()` and `broadcast_signal()` to clone the peer list, release the lock, then send: +Clone the peer list, release lock, then send: ```rust pub async fn forward_to_peers(&self, room_hash: &[u8; 8], media_data: &Bytes) { let peers: Vec<_> = { let links = self.peer_links.lock().await; links.values().map(|l| (l.label.clone(), l.transport.clone())).collect() - }; // lock released + }; // lock released immediately for (label, transport) in &peers { - // send without holding lock + // send without holding lock — slow peer doesn't block others } } ``` +Also apply to `broadcast_signal()` and `send_signal_to_peer()`. + **Files to modify:** -- `crates/wzp-relay/src/federation.rs` — `forward_to_peers()`, `broadcast_signal()`, `send_signal_to_peer()` +- `crates/wzp-relay/src/federation.rs` — 3 methods -**Expected change:** ~30 lines modified - -**Concurrency improvement:** Federation sends no longer block each other or room operations. +**Concurrency improvement:** A slow federation peer no longer blocks all other peers' media delivery. ### Phase 3: Quality Tracking Optimization (Optional) -Move `observe_quality()` out of the per-packet critical path: +With DashMap, quality tracking uses `get_mut()` (exclusive shard lock) on every packet that carries a QualityReport. For rooms where quality reports are frequent, this creates write contention on the shard. -1. Accumulate quality reports in a lock-free counter per participant -2. A background task (every 1s) reads counters, computes tiers, broadcasts directives -3. Per-packet path becomes: `lock → others() → unlock` (no quality computation) +Option: Move quality observation to a background task: +1. Per-participant `AtomicU8` for latest loss/RTT (lock-free write from hot path) +2. Background task every 1s reads atomics, computes tiers, broadcasts directives +3. Hot path becomes read-only: `rooms.get()` (shared lock) → `others()` → done -**Reduces per-packet lock hold time from ~1ms to ~0.1ms.** +**Reduces shard lock from exclusive (`get_mut`) to shared (`get`) on every packet.** ## Verification -1. **Correctness:** Run existing relay tests (`cargo test -p wzp-relay`) — must pass -2. **Load test:** 10 rooms × 10 participants, verify all 10 rooms forward concurrently -3. **Large room test:** 1 room × 50 participants, verify no deadlocks -4. **Federation test:** 3 relays, verify media still bridges with new lock pattern -5. **Benchmark:** Before/after packets-per-second on a multi-core machine with `wzp-bench` +1. **Correctness:** `cargo test -p wzp-relay` — all existing tests must pass +2. **Compile check:** `cargo check --workspace` — no regressions +3. **Load test:** 10 rooms × 10 participants, verify rooms forward concurrently +4. **Large room:** 1 room × 50 participants, no deadlocks +5. **Federation:** 3 relays, media bridges correctly with new lock pattern +6. **Benchmark:** Before/after packets-per-second on multi-core with `wzp-bench` ## Effort -- Phase 1: 1 day -- Phase 2: 0.5 day -- Phase 3: 1 day (optional) -- Total: 1.5–2.5 days +- Phase 1: 1 day (DashMap migration + test updates) +- Phase 2: 0.5 day (federation clone-and-release) +- Phase 3: 0.5 day (optional, quality tracking with atomics) +- Total: 1.5–2 days