diff --git a/android/app/src/main/java/com/wzp/engine/CallStats.kt b/android/app/src/main/java/com/wzp/engine/CallStats.kt index 900113e..d4e4f41 100644 --- a/android/app/src/main/java/com/wzp/engine/CallStats.kt +++ b/android/app/src/main/java/com/wzp/engine/CallStats.kt @@ -1,5 +1,6 @@ package com.wzp.engine +import org.json.JSONArray import org.json.JSONObject /** @@ -32,6 +33,10 @@ data class CallStats( val fecRecovered: Long = 0, /** Current mic audio level (RMS, 0-32767). */ val audioLevel: Int = 0, + /** Number of participants in the room. */ + val roomParticipantCount: Int = 0, + /** Participants in the room (fingerprint + optional alias). */ + val roomParticipants: List = emptyList(), ) { /** Human-readable quality label. */ val qualityLabel: String @@ -43,6 +48,17 @@ data class CallStats( } companion object { + private fun parseParticipants(arr: JSONArray?): List { + if (arr == null) return emptyList() + return (0 until arr.length()).map { i -> + val o = arr.getJSONObject(i) + RoomMember( + fingerprint = o.optString("fingerprint", ""), + alias = o.optString("alias", null) + ) + } + } + /** Deserialise from the JSON string produced by the native engine. */ fun fromJson(json: String): CallStats { return try { @@ -59,7 +75,9 @@ data class CallStats( framesDecoded = obj.optLong("frames_decoded", 0), underruns = obj.optLong("underruns", 0), fecRecovered = obj.optLong("fec_recovered", 0), - audioLevel = obj.optInt("audio_level", 0) + audioLevel = obj.optInt("audio_level", 0), + roomParticipantCount = obj.optInt("room_participant_count", 0), + roomParticipants = parseParticipants(obj.optJSONArray("room_participants")) ) } catch (e: Exception) { CallStats() @@ -67,3 +85,12 @@ data class CallStats( } } } + +data class RoomMember( + val fingerprint: String, + val alias: String? = null +) { + /** Short display name: alias if set, otherwise first 8 chars of fingerprint. */ + val displayName: String + get() = alias ?: fingerprint.take(8) +} diff --git a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt index 6b1a75e..649199a 100644 --- a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt +++ b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt @@ -228,6 +228,22 @@ fun InCallScreen( QualityIndicator(qualityTier, stats.qualityLabel) + if (stats.roomParticipantCount > 0) { + Spacer(modifier = Modifier.height(8.dp)) + Text( + text = "${stats.roomParticipantCount} in room", + style = MaterialTheme.typography.bodySmall, + color = MaterialTheme.colorScheme.onSurfaceVariant + ) + stats.roomParticipants.forEach { member -> + Text( + text = member.displayName, + style = MaterialTheme.typography.labelSmall, + color = MaterialTheme.colorScheme.onSurfaceVariant + ) + } + } + Spacer(modifier = Modifier.height(32.dp)) AudioLevelBar(stats.audioLevel) diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 5dce97d..00dfb8d 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -529,10 +529,45 @@ async fn run_call( } }; + // Signal recv task — listens for RoomUpdate and other signaling messages + let transport_signal = transport.clone(); + let state_signal = state.clone(); + let signal_task = async { + loop { + match transport_signal.recv_signal().await { + Ok(Some(SignalMessage::RoomUpdate { count, participants })) => { + info!(count, "RoomUpdate received"); + let members: Vec = participants + .iter() + .map(|p| crate::stats::RoomMember { + fingerprint: p.fingerprint.clone(), + alias: p.alias.clone(), + }) + .collect(); + let mut stats = state_signal.stats.lock().unwrap(); + stats.room_participant_count = count; + stats.room_participants = members; + } + Ok(Some(msg)) => { + info!("signal received: {:?}", std::mem::discriminant(&msg)); + } + Ok(None) => { + info!("signal stream closed"); + break; + } + Err(e) => { + warn!("signal recv error: {e}"); + break; + } + } + } + }; + tokio::select! { _ = send_task => {} _ = recv_task => {} _ = stats_task => {} + _ = signal_task => {} } transport.close().await.ok(); diff --git a/crates/wzp-android/src/stats.rs b/crates/wzp-android/src/stats.rs index 0dbb2db..49ea3c7 100644 --- a/crates/wzp-android/src/stats.rs +++ b/crates/wzp-android/src/stats.rs @@ -53,4 +53,15 @@ pub struct CallStats { pub fec_recovered: u64, /// Current mic audio level (RMS of i16 samples, 0-32767). pub audio_level: u32, + /// Number of participants in the room (from last RoomUpdate). + pub room_participant_count: u32, + /// Participant list (fingerprint + optional alias) serialized as JSON array. + pub room_participants: Vec, +} + +/// A room member entry, serialized into the stats JSON. +#[derive(Clone, Debug, Default, serde::Serialize)] +pub struct RoomMember { + pub fingerprint: String, + pub alias: Option, } diff --git a/crates/wzp-proto/src/lib.rs b/crates/wzp-proto/src/lib.rs index bb23aae..6f15d8d 100644 --- a/crates/wzp-proto/src/lib.rs +++ b/crates/wzp-proto/src/lib.rs @@ -26,7 +26,7 @@ pub use codec_id::{CodecId, QualityProfile}; pub use error::*; pub use packet::{ HangupReason, MediaHeader, MediaPacket, MiniFrameContext, MiniHeader, QualityReport, - SignalMessage, TrunkEntry, TrunkFrame, FRAME_TYPE_FULL, FRAME_TYPE_MINI, + RoomParticipant, SignalMessage, TrunkEntry, TrunkFrame, FRAME_TYPE_FULL, FRAME_TYPE_MINI, }; pub use bandwidth::{BandwidthEstimator, CongestionState}; pub use quality::{AdaptiveQualityController, NetworkContext, Tier}; diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index 9b326e8..807efab 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -645,6 +645,23 @@ pub enum SignalMessage { session_id: String, room_name: String, }, + + /// Room membership update — sent by relay to all participants when someone joins or leaves. + RoomUpdate { + /// Current participant count. + count: u32, + /// List of participants currently in the room. + participants: Vec, + }, +} + +/// A participant entry in a RoomUpdate message. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RoomParticipant { + /// Identity fingerprint (hex string, stable across reconnects if seed is persisted). + pub fingerprint: String, + /// Optional display name set by the client. + pub alias: Option, } /// Reasons for ending a call. diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 4095ca7..9fc7612 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -502,14 +502,21 @@ async fn main() -> anyhow::Result<()> { let participant_id = { let mut mgr = room_mgr.lock().await; - match mgr.join(&room_name, addr, room::ParticipantSender::Quic(transport.clone()), authenticated_fp.as_deref()) { - Ok(id) => { + match mgr.join( + &room_name, + addr, + room::ParticipantSender::Quic(transport.clone()), + authenticated_fp.as_deref(), + None, // alias — TODO: accept from client + ) { + Ok((id, update, senders)) => { metrics.active_rooms.set(mgr.list().len() as i64); + drop(mgr); // release lock before async broadcast + room::broadcast_signal(&senders, &update).await; id } Err(e) => { error!(%addr, room = %room_name, "room join denied: {e}"); - // Clean up the session we just created metrics.active_sessions.dec(); let mut smgr = session_mgr.lock().await; smgr.remove_session(session_id); diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 0cb175d..616538f 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -67,11 +67,24 @@ impl ParticipantSender { } } +/// Broadcast a signal message to a list of participant senders. +pub async fn broadcast_signal(senders: &[ParticipantSender], msg: &wzp_proto::SignalMessage) { + for sender in senders { + if let ParticipantSender::Quic(t) = sender { + if let Err(e) = t.send_signal(msg).await { + warn!("broadcast_signal error: {e}"); + } + } + } +} + /// A participant in a room. struct Participant { id: ParticipantId, _addr: std::net::SocketAddr, sender: ParticipantSender, + fingerprint: Option, + alias: Option, } /// A room holding multiple participants. @@ -86,10 +99,16 @@ impl Room { } } - fn add(&mut self, addr: std::net::SocketAddr, sender: ParticipantSender) -> ParticipantId { + fn add( + &mut self, + addr: std::net::SocketAddr, + sender: ParticipantSender, + fingerprint: Option, + alias: Option, + ) -> ParticipantId { let id = next_id(); info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room"); - self.participants.push(Participant { id, _addr: addr, sender }); + self.participants.push(Participant { id, _addr: addr, sender, fingerprint, alias }); id } @@ -106,6 +125,22 @@ impl Room { .collect() } + /// Build a RoomUpdate participant list. + fn participant_list(&self) -> Vec { + self.participants + .iter() + .map(|p| wzp_proto::packet::RoomParticipant { + fingerprint: p.fingerprint.clone().unwrap_or_default(), + alias: p.alias.clone(), + }) + .collect() + } + + /// Get all senders (for broadcasting to everyone including the joiner). + fn all_senders(&self) -> Vec { + self.participants.iter().map(|p| p.sender.clone()).collect() + } + fn is_empty(&self) -> bool { self.participants.is_empty() } @@ -165,20 +200,27 @@ impl RoomManager { } } - /// Join a room. Returns the participant ID or an error if unauthorized. + /// Join a room. Returns (participant_id, room_update_msg, all_senders) for broadcasting. pub fn join( &mut self, room_name: &str, addr: std::net::SocketAddr, sender: ParticipantSender, fingerprint: Option<&str>, - ) -> Result { + alias: Option<&str>, + ) -> Result<(ParticipantId, wzp_proto::SignalMessage, Vec), String> { if !self.is_authorized(room_name, fingerprint) { warn!(room = room_name, fingerprint = ?fingerprint, "unauthorized room join attempt"); return Err("not authorized for this room".to_string()); } let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new); - Ok(room.add(addr, sender)) + let id = room.add(addr, sender, fingerprint.map(|s| s.to_string()), alias.map(|s| s.to_string())); + let update = wzp_proto::SignalMessage::RoomUpdate { + count: room.len() as u32, + participants: room.participant_list(), + }; + let senders = room.all_senders(); + Ok((id, update, senders)) } /// Join a room via WebSocket. Convenience wrapper around `join()`. @@ -189,17 +231,27 @@ impl RoomManager { sender: tokio::sync::mpsc::Sender, fingerprint: Option<&str>, ) -> Result { - self.join(room_name, addr, ParticipantSender::WebSocket(sender), fingerprint) + let (id, _update, _senders) = self.join(room_name, addr, ParticipantSender::WebSocket(sender), fingerprint, None)?; + Ok(id) } - /// Leave a room. Removes the room if empty. - pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) { + /// Leave a room. Returns (room_update_msg, remaining_senders) for broadcasting, or None if room is now empty. + pub fn leave(&mut self, room_name: &str, participant_id: ParticipantId) -> Option<(wzp_proto::SignalMessage, Vec)> { if let Some(room) = self.rooms.get_mut(room_name) { room.remove(participant_id); if room.is_empty() { self.rooms.remove(room_name); info!(room = room_name, "room closed (empty)"); + return None; } + let update = wzp_proto::SignalMessage::RoomUpdate { + count: room.len() as u32, + participants: room.participant_list(), + }; + let senders = room.all_senders(); + Some((update, senders)) + } else { + None } } @@ -386,9 +438,12 @@ async fn run_participant_plain( } } - // Clean up + // Clean up — leave room and broadcast update to remaining participants let mut mgr = room_mgr.lock().await; - mgr.leave(&room_name, participant_id); + if let Some((update, senders)) = mgr.leave(&room_name, participant_id) { + drop(mgr); // release lock before async broadcast + broadcast_signal(&senders, &update).await; + } } /// Trunked forwarding loop — batches outgoing packets per peer. @@ -497,7 +552,10 @@ async fn run_participant_trunked( } let mut mgr = room_mgr.lock().await; - mgr.leave(&room_name, participant_id); + if let Some((update, senders)) = mgr.leave(&room_name, participant_id) { + drop(mgr); + broadcast_signal(&senders, &update).await; + } } /// Parse up to the first 2 bytes of a hex session-id string into `[u8; 2]`. diff --git a/wzp-release.apk b/wzp-release.apk index ee9b5b7..1d35aba 100644 Binary files a/wzp-release.apk and b/wzp-release.apk differ