feat: rewrite desktop audio I/O with lock-free ring buffers

- Replace Mutex-based CPAL callbacks with atomic SPSC ring buffers
- Proper async send/recv loops (no block_on), 20ms playout tick
- Add signal task for RoomUpdate presence display
- Add --alias, --raw-room flags and key persistence (~/.wzp/identity)
- Add SetAlias signal variant and relay-side handling
- Graceful Ctrl+C shutdown with force-quit on second press

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-06 09:04:51 +04:00
parent 6228ab32c1
commit 860c90394d
7 changed files with 552 additions and 201 deletions

View File

@@ -141,6 +141,17 @@ impl Room {
self.participants.iter().map(|p| p.sender.clone()).collect()
}
/// Update a participant's alias. Returns true if the participant was found.
fn set_alias(&mut self, id: ParticipantId, alias: String) -> bool {
if let Some(p) = self.participants.iter_mut().find(|p| p.id == id) {
info!(participant = id, %alias, "alias updated");
p.alias = Some(alias);
true
} else {
false
}
}
fn is_empty(&self) -> bool {
self.participants.is_empty()
}
@@ -255,6 +266,26 @@ impl RoomManager {
}
}
/// Update a participant's alias and return a RoomUpdate + senders for broadcasting.
pub fn set_alias(
&mut self,
room_name: &str,
participant_id: ParticipantId,
alias: String,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
if let Some(room) = self.rooms.get_mut(room_name) {
if room.set_alias(participant_id, alias) {
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
return Some((update, senders));
}
}
None
}
/// Get senders for all OTHER participants in a room.
pub fn others(
&self,
@@ -374,68 +405,111 @@ async fn run_participant_plain(
session_id: &str,
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
loop {
let pkt = match transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
info!(%addr, participant = participant_id, "disconnected");
break;
}
Err(e) => {
let msg = e.to_string();
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
info!(%addr, participant = participant_id, "connection closed: {e}");
} else {
error!(%addr, participant = participant_id, "recv error: {e}");
// Media forwarding task
let media_room_mgr = room_mgr.clone();
let media_room_name = room_name.clone();
let media_transport = transport.clone();
let media_metrics = metrics.clone();
let media_session_id = session_id.to_string();
let media_task = async move {
let mut packets_forwarded = 0u64;
loop {
let pkt = match media_transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
info!(%addr, participant = participant_id, "disconnected");
break;
}
break;
}
};
// Update per-session quality metrics if a quality report is present
if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report);
}
// Get current list of other participants
let others = {
let mgr = room_mgr.lock().await;
mgr.others(&room_name, participant_id)
};
// Forward to all others
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
match other {
ParticipantSender::Quic(t) => {
let _ = t.send_media(&pkt).await;
Err(e) => {
let msg = e.to_string();
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
info!(%addr, participant = participant_id, "connection closed: {e}");
} else {
error!(%addr, participant = participant_id, "recv error: {e}");
}
break;
}
ParticipantSender::WebSocket(_) => {
// WS clients receive raw payload bytes
let _ = other.send_raw(&pkt.payload).await;
}
}
}
let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
if packets_forwarded % 500 == 0 {
let room_size = {
let mgr = room_mgr.lock().await;
mgr.room_size(&room_name)
};
info!(
room = %room_name,
participant = participant_id,
forwarded = packets_forwarded,
room_size,
"participant stats"
);
if let Some(ref report) = pkt.quality_report {
media_metrics.update_session_quality(&media_session_id, report);
}
let others = {
let mgr = media_room_mgr.lock().await;
mgr.others(&media_room_name, participant_id)
};
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
match other {
ParticipantSender::Quic(t) => {
let _ = t.send_media(&pkt).await;
}
ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await;
}
}
}
let fan_out = others.len() as u64;
media_metrics.packets_forwarded.inc_by(fan_out);
media_metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
if packets_forwarded % 500 == 0 {
let room_size = {
let mgr = media_room_mgr.lock().await;
mgr.room_size(&media_room_name)
};
info!(
room = %media_room_name,
participant = participant_id,
forwarded = packets_forwarded,
room_size,
"participant stats"
);
}
}
};
// Signal handling task — processes SetAlias and other in-call signals
let signal_room_mgr = room_mgr.clone();
let signal_room_name = room_name.clone();
let signal_transport = transport.clone();
let signal_task = async move {
loop {
match signal_transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::SetAlias { alias })) => {
info!(%addr, participant = participant_id, %alias, "SetAlias received");
let mut mgr = signal_room_mgr.lock().await;
if let Some((update, senders)) =
mgr.set_alias(&signal_room_name, participant_id, alias)
{
drop(mgr);
broadcast_signal(&senders, &update).await;
}
}
Ok(Some(wzp_proto::SignalMessage::Hangup { .. })) => {
info!(%addr, participant = participant_id, "hangup received");
break;
}
Ok(Some(msg)) => {
info!(%addr, participant = participant_id, "signal: {:?}", std::mem::discriminant(&msg));
}
Ok(None) => break,
Err(e) => {
warn!(%addr, participant = participant_id, "signal recv error: {e}");
break;
}
}
}
};
// Run both in parallel — exit when either finishes (disconnection)
tokio::select! {
_ = media_task => {}
_ = signal_task => {}
}
// Clean up — leave room and broadcast update to remaining participants