Compare commits
4 Commits
55d4004f86
...
ec437afbce
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec437afbce | ||
|
|
137e7973c4 | ||
|
|
aa09275015 | ||
|
|
59bf3f6587 |
45
Cargo.lock
generated
45
Cargo.lock
generated
@@ -169,6 +169,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum-core 0.4.5",
|
"axum-core 0.4.5",
|
||||||
|
"base64",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"http",
|
"http",
|
||||||
@@ -184,8 +185,10 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rustversion",
|
"rustversion",
|
||||||
"serde",
|
"serde",
|
||||||
|
"sha1",
|
||||||
"sync_wrapper",
|
"sync_wrapper",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-tungstenite 0.24.0",
|
||||||
"tower",
|
"tower",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
@@ -220,7 +223,7 @@ dependencies = [
|
|||||||
"sha1",
|
"sha1",
|
||||||
"sync_wrapper",
|
"sync_wrapper",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-tungstenite",
|
"tokio-tungstenite 0.28.0",
|
||||||
"tower",
|
"tower",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
@@ -380,6 +383,12 @@ version = "3.20.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
|
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "byteorder"
|
||||||
|
version = "1.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bytes"
|
name = "bytes"
|
||||||
version = "1.11.1"
|
version = "1.11.1"
|
||||||
@@ -3140,6 +3149,18 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-tungstenite"
|
||||||
|
version = "0.24.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"tungstenite 0.24.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-tungstenite"
|
name = "tokio-tungstenite"
|
||||||
version = "0.28.0"
|
version = "0.28.0"
|
||||||
@@ -3149,7 +3170,7 @@ dependencies = [
|
|||||||
"futures-util",
|
"futures-util",
|
||||||
"log",
|
"log",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tungstenite",
|
"tungstenite 0.28.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -3366,6 +3387,24 @@ version = "0.2.5"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.24.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
|
||||||
|
dependencies = [
|
||||||
|
"byteorder",
|
||||||
|
"bytes",
|
||||||
|
"data-encoding",
|
||||||
|
"http",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"sha1",
|
||||||
|
"thiserror 1.0.69",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tungstenite"
|
name = "tungstenite"
|
||||||
version = "0.28.0"
|
version = "0.28.0"
|
||||||
@@ -4228,6 +4267,7 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"axum 0.7.9",
|
"axum 0.7.9",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"quinn",
|
"quinn",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
@@ -4236,6 +4276,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
|
"tower-http",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"wzp-client",
|
"wzp-client",
|
||||||
|
|||||||
@@ -46,6 +46,23 @@ impl MediaHeader {
|
|||||||
/// Header size in bytes on the wire.
|
/// Header size in bytes on the wire.
|
||||||
pub const WIRE_SIZE: usize = 12;
|
pub const WIRE_SIZE: usize = 12;
|
||||||
|
|
||||||
|
/// Create a default header for raw PCM relay (used by WebSocket bridge).
|
||||||
|
pub fn default_pcm() -> Self {
|
||||||
|
Self {
|
||||||
|
version: 0,
|
||||||
|
is_repair: false,
|
||||||
|
codec_id: CodecId::Opus24k,
|
||||||
|
has_quality_report: false,
|
||||||
|
fec_ratio_encoded: 0,
|
||||||
|
seq: 0,
|
||||||
|
timestamp: 0,
|
||||||
|
fec_block: 0,
|
||||||
|
fec_symbol: 0,
|
||||||
|
reserved: 0,
|
||||||
|
csrc_count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Encode the FEC ratio float (0.0-2.0+) to a 7-bit value (0-127).
|
/// Encode the FEC ratio float (0.0-2.0+) to a 7-bit value (0-127).
|
||||||
pub fn encode_fec_ratio(ratio: f32) -> u8 {
|
pub fn encode_fec_ratio(ratio: f32) -> u8 {
|
||||||
// Map 0.0-2.0 to 0-127, clamping at 127
|
// Map 0.0-2.0 to 0-127, clamping at 127
|
||||||
|
|||||||
@@ -25,7 +25,9 @@ serde_json = "1"
|
|||||||
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
|
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
|
||||||
quinn = { workspace = true }
|
quinn = { workspace = true }
|
||||||
prometheus = "0.13"
|
prometheus = "0.13"
|
||||||
axum = { version = "0.7", default-features = false, features = ["tokio", "http1"] }
|
axum = { version = "0.7", default-features = false, features = ["tokio", "http1", "ws"] }
|
||||||
|
tower-http = { version = "0.6", features = ["fs"] }
|
||||||
|
futures-util = "0.3"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
name = "wzp-relay"
|
name = "wzp-relay"
|
||||||
|
|||||||
@@ -39,6 +39,11 @@ pub struct RelayConfig {
|
|||||||
/// reducing per-packet QUIC datagram overhead.
|
/// reducing per-packet QUIC datagram overhead.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub trunking_enabled: bool,
|
pub trunking_enabled: bool,
|
||||||
|
/// Port for the WebSocket listener (browser clients connect here).
|
||||||
|
/// If None, WebSocket support is disabled.
|
||||||
|
pub ws_port: Option<u16>,
|
||||||
|
/// Directory to serve static files from (HTML/JS/WASM for web clients).
|
||||||
|
pub static_dir: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RelayConfig {
|
impl Default for RelayConfig {
|
||||||
@@ -55,6 +60,8 @@ impl Default for RelayConfig {
|
|||||||
probe_targets: Vec::new(),
|
probe_targets: Vec::new(),
|
||||||
probe_mesh: false,
|
probe_mesh: false,
|
||||||
trunking_enabled: false,
|
trunking_enabled: false,
|
||||||
|
ws_port: None,
|
||||||
|
static_dir: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ pub mod room;
|
|||||||
pub mod route;
|
pub mod route;
|
||||||
pub mod session_mgr;
|
pub mod session_mgr;
|
||||||
pub mod trunk;
|
pub mod trunk;
|
||||||
|
pub mod ws;
|
||||||
|
|
||||||
pub use config::RelayConfig;
|
pub use config::RelayConfig;
|
||||||
pub use handshake::accept_handshake;
|
pub use handshake::accept_handshake;
|
||||||
|
|||||||
@@ -68,6 +68,19 @@ fn parse_args() -> RelayConfig {
|
|||||||
"--trunking" => {
|
"--trunking" => {
|
||||||
config.trunking_enabled = true;
|
config.trunking_enabled = true;
|
||||||
}
|
}
|
||||||
|
"--ws-port" => {
|
||||||
|
i += 1;
|
||||||
|
config.ws_port = Some(
|
||||||
|
args.get(i).expect("--ws-port requires a port number")
|
||||||
|
.parse().expect("invalid --ws-port number"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
"--static-dir" => {
|
||||||
|
i += 1;
|
||||||
|
config.static_dir = Some(
|
||||||
|
args.get(i).expect("--static-dir requires a directory path").to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
"--mesh-status" => {
|
"--mesh-status" => {
|
||||||
// Print mesh table from a fresh registry and exit.
|
// Print mesh table from a fresh registry and exit.
|
||||||
// In practice this is useful after the relay has been running;
|
// In practice this is useful after the relay has been running;
|
||||||
@@ -89,6 +102,8 @@ fn parse_args() -> RelayConfig {
|
|||||||
eprintln!(" --probe-mesh Enable mesh mode (mark config flag, probes all --probe targets).");
|
eprintln!(" --probe-mesh Enable mesh mode (mark config flag, probes all --probe targets).");
|
||||||
eprintln!(" --mesh-status Print mesh health table and exit (diagnostic).");
|
eprintln!(" --mesh-status Print mesh health table and exit (diagnostic).");
|
||||||
eprintln!(" --trunking Enable trunk batching for outgoing media in room mode.");
|
eprintln!(" --trunking Enable trunk batching for outgoing media in room mode.");
|
||||||
|
eprintln!(" --ws-port <port> WebSocket listener port for browser clients (e.g., 8080).");
|
||||||
|
eprintln!(" --static-dir <dir> Directory to serve static files from (HTML/JS/WASM).");
|
||||||
eprintln!();
|
eprintln!();
|
||||||
eprintln!("Room mode (default):");
|
eprintln!("Room mode (default):");
|
||||||
eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU).");
|
eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU).");
|
||||||
@@ -233,6 +248,20 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
tokio::spawn(async move { mesh.run_all().await });
|
tokio::spawn(async move { mesh.run_all().await });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WebSocket server for browser clients
|
||||||
|
if let Some(ws_port) = config.ws_port {
|
||||||
|
let ws_state = wzp_relay::ws::WsState {
|
||||||
|
room_mgr: room_mgr.clone(),
|
||||||
|
session_mgr: session_mgr.clone(),
|
||||||
|
auth_url: config.auth_url.clone(),
|
||||||
|
metrics: metrics.clone(),
|
||||||
|
presence: presence.clone(),
|
||||||
|
};
|
||||||
|
let static_dir = config.static_dir.clone();
|
||||||
|
tokio::spawn(wzp_relay::ws::run_ws_server(ws_port, ws_state, static_dir));
|
||||||
|
info!(ws_port, "WebSocket listener enabled for browser clients");
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(ref url) = config.auth_url {
|
if let Some(ref url) = config.auth_url {
|
||||||
info!(url, "auth enabled — clients must present featherChat token");
|
info!(url, "auth enabled — clients must present featherChat token");
|
||||||
} else {
|
} else {
|
||||||
@@ -473,7 +502,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let participant_id = {
|
let participant_id = {
|
||||||
let mut mgr = room_mgr.lock().await;
|
let mut mgr = room_mgr.lock().await;
|
||||||
match mgr.join(&room_name, addr, transport.clone(), authenticated_fp.as_deref()) {
|
match mgr.join(&room_name, addr, room::ParticipantSender::Quic(transport.clone()), authenticated_fp.as_deref()) {
|
||||||
Ok(id) => {
|
Ok(id) => {
|
||||||
metrics.active_rooms.set(mgr.list().len() as i64);
|
metrics.active_rooms.set(mgr.list().len() as i64);
|
||||||
id
|
id
|
||||||
|
|||||||
@@ -27,11 +27,51 @@ fn next_id() -> ParticipantId {
|
|||||||
NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed)
|
NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// How to send data to a participant — either via QUIC transport or WebSocket channel.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum ParticipantSender {
|
||||||
|
Quic(Arc<wzp_transport::QuinnTransport>),
|
||||||
|
WebSocket(tokio::sync::mpsc::Sender<Bytes>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ParticipantSender {
|
||||||
|
/// Send raw bytes to this participant.
|
||||||
|
pub async fn send_raw(&self, data: &[u8]) -> Result<(), String> {
|
||||||
|
match self {
|
||||||
|
ParticipantSender::WebSocket(tx) => {
|
||||||
|
tx.try_send(Bytes::copy_from_slice(data))
|
||||||
|
.map_err(|e| format!("ws send: {e}"))
|
||||||
|
}
|
||||||
|
ParticipantSender::Quic(transport) => {
|
||||||
|
let pkt = wzp_proto::MediaPacket {
|
||||||
|
header: wzp_proto::packet::MediaHeader::default_pcm(),
|
||||||
|
payload: Bytes::copy_from_slice(data),
|
||||||
|
quality_report: None,
|
||||||
|
};
|
||||||
|
transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if this is a QUIC participant.
|
||||||
|
pub fn is_quic(&self) -> bool {
|
||||||
|
matches!(self, ParticipantSender::Quic(_))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the QUIC transport if this is a QUIC participant.
|
||||||
|
pub fn as_quic(&self) -> Option<&Arc<wzp_transport::QuinnTransport>> {
|
||||||
|
match self {
|
||||||
|
ParticipantSender::Quic(t) => Some(t),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A participant in a room.
|
/// A participant in a room.
|
||||||
struct Participant {
|
struct Participant {
|
||||||
id: ParticipantId,
|
id: ParticipantId,
|
||||||
_addr: std::net::SocketAddr,
|
_addr: std::net::SocketAddr,
|
||||||
transport: Arc<wzp_transport::QuinnTransport>,
|
sender: ParticipantSender,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A room holding multiple participants.
|
/// A room holding multiple participants.
|
||||||
@@ -46,10 +86,10 @@ impl Room {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add(&mut self, addr: std::net::SocketAddr, transport: Arc<wzp_transport::QuinnTransport>) -> ParticipantId {
|
fn add(&mut self, addr: std::net::SocketAddr, sender: ParticipantSender) -> ParticipantId {
|
||||||
let id = next_id();
|
let id = next_id();
|
||||||
info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room");
|
info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room");
|
||||||
self.participants.push(Participant { id, _addr: addr, transport });
|
self.participants.push(Participant { id, _addr: addr, sender });
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -58,11 +98,11 @@ impl Room {
|
|||||||
info!(room_size = self.participants.len(), participant = id, "left room");
|
info!(room_size = self.participants.len(), participant = id, "left room");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn others(&self, exclude_id: ParticipantId) -> Vec<Arc<wzp_transport::QuinnTransport>> {
|
fn others(&self, exclude_id: ParticipantId) -> Vec<ParticipantSender> {
|
||||||
self.participants
|
self.participants
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|p| p.id != exclude_id)
|
.filter(|p| p.id != exclude_id)
|
||||||
.map(|p| p.transport.clone())
|
.map(|p| p.sender.clone())
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,7 +170,7 @@ impl RoomManager {
|
|||||||
&mut self,
|
&mut self,
|
||||||
room_name: &str,
|
room_name: &str,
|
||||||
addr: std::net::SocketAddr,
|
addr: std::net::SocketAddr,
|
||||||
transport: Arc<wzp_transport::QuinnTransport>,
|
sender: ParticipantSender,
|
||||||
fingerprint: Option<&str>,
|
fingerprint: Option<&str>,
|
||||||
) -> Result<ParticipantId, String> {
|
) -> Result<ParticipantId, String> {
|
||||||
if !self.is_authorized(room_name, fingerprint) {
|
if !self.is_authorized(room_name, fingerprint) {
|
||||||
@@ -138,7 +178,18 @@ impl RoomManager {
|
|||||||
return Err("not authorized for this room".to_string());
|
return Err("not authorized for this room".to_string());
|
||||||
}
|
}
|
||||||
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
|
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
|
||||||
Ok(room.add(addr, transport))
|
Ok(room.add(addr, sender))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Join a room via WebSocket. Convenience wrapper around `join()`.
|
||||||
|
pub fn join_ws(
|
||||||
|
&mut self,
|
||||||
|
room_name: &str,
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
sender: tokio::sync::mpsc::Sender<Bytes>,
|
||||||
|
fingerprint: Option<&str>,
|
||||||
|
) -> Result<ParticipantId, String> {
|
||||||
|
self.join(room_name, addr, ParticipantSender::WebSocket(sender), fingerprint)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Leave a room. Removes the room if empty.
|
/// Leave a room. Removes the room if empty.
|
||||||
@@ -152,12 +203,12 @@ impl RoomManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get transports for all OTHER participants in a room.
|
/// Get senders for all OTHER participants in a room.
|
||||||
pub fn others(
|
pub fn others(
|
||||||
&self,
|
&self,
|
||||||
room_name: &str,
|
room_name: &str,
|
||||||
participant_id: ParticipantId,
|
participant_id: ParticipantId,
|
||||||
) -> Vec<Arc<wzp_transport::QuinnTransport>> {
|
) -> Vec<ParticipantSender> {
|
||||||
self.rooms
|
self.rooms
|
||||||
.get(room_name)
|
.get(room_name)
|
||||||
.map(|r| r.others(participant_id))
|
.map(|r| r.others(participant_id))
|
||||||
@@ -305,10 +356,14 @@ async fn run_participant_plain(
|
|||||||
// Forward to all others
|
// Forward to all others
|
||||||
let pkt_bytes = pkt.payload.len() as u64;
|
let pkt_bytes = pkt.payload.len() as u64;
|
||||||
for other in &others {
|
for other in &others {
|
||||||
// Best-effort: if one send fails, continue to others
|
match other {
|
||||||
if let Err(e) = other.send_media(&pkt).await {
|
ParticipantSender::Quic(t) => {
|
||||||
// Don't log every failure — they'll be cleaned up when their recv loop breaks
|
let _ = t.send_media(&pkt).await;
|
||||||
let _ = e;
|
}
|
||||||
|
ParticipantSender::WebSocket(_) => {
|
||||||
|
// WS clients receive raw payload bytes
|
||||||
|
let _ = other.send_raw(&pkt.payload).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -390,12 +445,20 @@ async fn run_participant_trunked(
|
|||||||
|
|
||||||
let pkt_bytes = pkt.payload.len() as u64;
|
let pkt_bytes = pkt.payload.len() as u64;
|
||||||
for other in &others {
|
for other in &others {
|
||||||
let peer_addr = other.connection().remote_address();
|
match other {
|
||||||
let fwd = forwarders
|
ParticipantSender::Quic(t) => {
|
||||||
.entry(peer_addr)
|
let peer_addr = t.connection().remote_address();
|
||||||
.or_insert_with(|| TrunkedForwarder::new(other.clone(), sid_bytes));
|
let fwd = forwarders
|
||||||
if let Err(e) = fwd.send(&pkt).await {
|
.entry(peer_addr)
|
||||||
let _ = e;
|
.or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes));
|
||||||
|
if let Err(e) = fwd.send(&pkt).await {
|
||||||
|
let _ = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ParticipantSender::WebSocket(_) => {
|
||||||
|
// WS clients bypass trunking — send raw payload directly
|
||||||
|
let _ = other.send_raw(&pkt.payload).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
243
crates/wzp-relay/src/ws.rs
Normal file
243
crates/wzp-relay/src/ws.rs
Normal file
@@ -0,0 +1,243 @@
|
|||||||
|
//! WebSocket transport for browser clients.
|
||||||
|
//!
|
||||||
|
//! Browsers connect via `GET /ws/{room}` → WebSocket upgrade.
|
||||||
|
//! First message must be auth JSON (if auth is enabled).
|
||||||
|
//! Subsequent messages are binary PCM frames forwarded to/from the room.
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
extract::{
|
||||||
|
ws::{Message, WebSocket},
|
||||||
|
Path, State, WebSocketUpgrade,
|
||||||
|
},
|
||||||
|
response::IntoResponse,
|
||||||
|
routing::get,
|
||||||
|
Router,
|
||||||
|
};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures_util::{SinkExt, StreamExt};
|
||||||
|
use tokio::sync::{mpsc, Mutex};
|
||||||
|
use tower_http::services::ServeDir;
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
use crate::auth;
|
||||||
|
use crate::metrics::RelayMetrics;
|
||||||
|
use crate::presence::PresenceRegistry;
|
||||||
|
use crate::room::RoomManager;
|
||||||
|
use crate::session_mgr::SessionManager;
|
||||||
|
|
||||||
|
/// Shared state for WebSocket handlers.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct WsState {
|
||||||
|
pub room_mgr: Arc<Mutex<RoomManager>>,
|
||||||
|
pub session_mgr: Arc<Mutex<SessionManager>>,
|
||||||
|
pub auth_url: Option<String>,
|
||||||
|
pub metrics: Arc<RelayMetrics>,
|
||||||
|
pub presence: Arc<Mutex<PresenceRegistry>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the WebSocket + static file server.
|
||||||
|
pub async fn run_ws_server(port: u16, state: WsState, static_dir: Option<String>) {
|
||||||
|
let mut app = Router::new()
|
||||||
|
.route("/ws/{room}", get(ws_upgrade_handler))
|
||||||
|
.with_state(state);
|
||||||
|
|
||||||
|
if let Some(dir) = static_dir {
|
||||||
|
info!(dir = %dir, "serving static files");
|
||||||
|
app = app.fallback_service(ServeDir::new(dir));
|
||||||
|
}
|
||||||
|
|
||||||
|
let addr: SocketAddr = ([0, 0, 0, 0], port).into();
|
||||||
|
info!(%addr, "WebSocket server listening");
|
||||||
|
|
||||||
|
let listener = tokio::net::TcpListener::bind(addr)
|
||||||
|
.await
|
||||||
|
.expect("failed to bind WS listener");
|
||||||
|
axum::serve(listener, app).await.expect("WS server failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ws_upgrade_handler(
|
||||||
|
Path(room): Path<String>,
|
||||||
|
State(state): State<WsState>,
|
||||||
|
ws: WebSocketUpgrade,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
ws.on_upgrade(move |socket| handle_ws_connection(socket, room, state))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) {
|
||||||
|
let (mut ws_tx, mut ws_rx) = socket.split();
|
||||||
|
|
||||||
|
// 1. Auth: if auth_url is set, first message must be {"type":"auth","token":"..."}
|
||||||
|
let fingerprint: Option<String> = if let Some(ref auth_url) = state.auth_url {
|
||||||
|
match ws_rx.next().await {
|
||||||
|
Some(Ok(Message::Text(text))) => {
|
||||||
|
match serde_json::from_str::<serde_json::Value>(&text) {
|
||||||
|
Ok(parsed) if parsed["type"] == "auth" => {
|
||||||
|
if let Some(token) = parsed["token"].as_str() {
|
||||||
|
match auth::validate_token(auth_url, token).await {
|
||||||
|
Ok(client) => {
|
||||||
|
state.metrics.auth_attempts.with_label_values(&["ok"]).inc();
|
||||||
|
info!(fingerprint = %client.fingerprint, "WS authenticated");
|
||||||
|
let _ = ws_tx
|
||||||
|
.send(Message::Text(r#"{"type":"auth_ok"}"#.into()))
|
||||||
|
.await;
|
||||||
|
Some(client.fingerprint)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
state
|
||||||
|
.metrics
|
||||||
|
.auth_attempts
|
||||||
|
.with_label_values(&["fail"])
|
||||||
|
.inc();
|
||||||
|
let _ = ws_tx
|
||||||
|
.send(Message::Text(
|
||||||
|
format!(r#"{{"type":"auth_error","error":"{e}"}}"#)
|
||||||
|
.into(),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
warn!("WS auth failed: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("WS auth: missing token field");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
warn!("WS: expected auth message as first frame");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
warn!("WS: connection closed before auth");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let _ = ws_tx
|
||||||
|
.send(Message::Text(r#"{"type":"auth_ok"}"#.into()))
|
||||||
|
.await;
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// 2. Create mpsc channel for outbound frames (room → browser)
|
||||||
|
let (tx, mut rx) = mpsc::channel::<Bytes>(64);
|
||||||
|
|
||||||
|
// 3. Create session
|
||||||
|
let session_id = {
|
||||||
|
let mut smgr = state.session_mgr.lock().await;
|
||||||
|
match smgr.create_session(&room, fingerprint.clone()) {
|
||||||
|
Ok(id) => id,
|
||||||
|
Err(e) => {
|
||||||
|
error!(room = %room, "WS session rejected: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
state.metrics.active_sessions.inc();
|
||||||
|
|
||||||
|
// 4. Join room with WS sender
|
||||||
|
let addr: SocketAddr = ([0, 0, 0, 0], 0).into();
|
||||||
|
let participant_id = {
|
||||||
|
let mut mgr = state.room_mgr.lock().await;
|
||||||
|
match mgr.join_ws(&room, addr, tx, fingerprint.as_deref()) {
|
||||||
|
Ok(id) => {
|
||||||
|
state.metrics.active_rooms.set(mgr.list().len() as i64);
|
||||||
|
id
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(room = %room, "WS room join denied: {e}");
|
||||||
|
state.metrics.active_sessions.dec();
|
||||||
|
let mut smgr = state.session_mgr.lock().await;
|
||||||
|
smgr.remove_session(session_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 5. Register presence
|
||||||
|
if let Some(ref fp) = fingerprint {
|
||||||
|
let mut reg = state.presence.lock().await;
|
||||||
|
reg.register_local(fp, None, Some(room.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(room = %room, participant = participant_id, "WS client joined");
|
||||||
|
|
||||||
|
// 6. Outbound task: mpsc rx → WS binary frames
|
||||||
|
let send_task = tokio::spawn(async move {
|
||||||
|
while let Some(data) = rx.recv().await {
|
||||||
|
if ws_tx
|
||||||
|
.send(Message::Binary(data.to_vec().into()))
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 7. Inbound: WS recv → fan-out to room
|
||||||
|
loop {
|
||||||
|
match ws_rx.next().await {
|
||||||
|
Some(Ok(Message::Binary(data))) => {
|
||||||
|
let others = {
|
||||||
|
let mgr = state.room_mgr.lock().await;
|
||||||
|
mgr.others(&room, participant_id)
|
||||||
|
};
|
||||||
|
for other in &others {
|
||||||
|
let _ = other.send_raw(&data).await;
|
||||||
|
}
|
||||||
|
state
|
||||||
|
.metrics
|
||||||
|
.packets_forwarded
|
||||||
|
.inc_by(others.len() as u64);
|
||||||
|
state
|
||||||
|
.metrics
|
||||||
|
.bytes_forwarded
|
||||||
|
.inc_by(data.len() as u64 * others.len() as u64);
|
||||||
|
}
|
||||||
|
Some(Ok(Message::Close(_))) | None => break,
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 8. Cleanup
|
||||||
|
send_task.abort();
|
||||||
|
info!(room = %room, participant = participant_id, "WS client disconnected");
|
||||||
|
|
||||||
|
if let Some(ref fp) = fingerprint {
|
||||||
|
let mut reg = state.presence.lock().await;
|
||||||
|
reg.unregister_local(fp);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut mgr = state.room_mgr.lock().await;
|
||||||
|
mgr.leave(&room, participant_id);
|
||||||
|
state.metrics.active_rooms.set(mgr.list().len() as i64);
|
||||||
|
}
|
||||||
|
|
||||||
|
let session_id_str: String = session_id.iter().map(|b| format!("{b:02x}")).collect();
|
||||||
|
state.metrics.remove_session_metrics(&session_id_str);
|
||||||
|
state.metrics.active_sessions.dec();
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut smgr = state.session_mgr.lock().await;
|
||||||
|
smgr.remove_session(session_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ws_state_is_clone() {
|
||||||
|
// WsState must be Clone for axum's State extractor
|
||||||
|
fn assert_clone<T: Clone>() {}
|
||||||
|
assert_clone::<WsState>();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -117,8 +117,9 @@ function wzpBoot() {
|
|||||||
var proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
|
var proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||||
var wsUrl = proto + '//' + location.host + '/ws/' + encodeURIComponent(room);
|
var wsUrl = proto + '//' + location.host + '/ws/' + encodeURIComponent(room);
|
||||||
|
|
||||||
// Create client (currently always WZPPureClient; future: switch on variant)
|
// Create client based on selected variant
|
||||||
client = new WZPPureClient({
|
var variant = WZPCore.detectVariant();
|
||||||
|
var clientOpts = {
|
||||||
wsUrl: wsUrl,
|
wsUrl: wsUrl,
|
||||||
room: room,
|
room: room,
|
||||||
onAudio: function(pcm) {
|
onAudio: function(pcm) {
|
||||||
@@ -130,7 +131,17 @@ function wzpBoot() {
|
|||||||
onStats: function(stats) {
|
onStats: function(stats) {
|
||||||
WZPCore.updateStats(stats);
|
WZPCore.updateStats(stats);
|
||||||
},
|
},
|
||||||
});
|
};
|
||||||
|
|
||||||
|
if (variant === 'full' && typeof WZPFullClient !== 'undefined') {
|
||||||
|
// Full variant: add WebTransport URL, falls back to WS if WT unavailable
|
||||||
|
clientOpts.url = location.origin.replace('http', 'https');
|
||||||
|
client = new WZPFullClient(clientOpts);
|
||||||
|
} else if (variant === 'hybrid' && typeof WZPHybridClient !== 'undefined') {
|
||||||
|
client = new WZPHybridClient(clientOpts);
|
||||||
|
} else {
|
||||||
|
client = new WZPPureClient(clientOpts);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await client.connect();
|
await client.connect();
|
||||||
|
|||||||
@@ -34,12 +34,14 @@ class WZPFullClient {
|
|||||||
*/
|
*/
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
this.url = options.url;
|
this.url = options.url;
|
||||||
|
this.wsUrl = options.wsUrl; // WS fallback URL
|
||||||
this.room = options.room;
|
this.room = options.room;
|
||||||
this.onAudio = options.onAudio || null;
|
this.onAudio = options.onAudio || null;
|
||||||
this.onStatus = options.onStatus || null;
|
this.onStatus = options.onStatus || null;
|
||||||
this.onStats = options.onStats || null;
|
this.onStats = options.onStats || null;
|
||||||
|
|
||||||
this.wt = null; // WebTransport instance
|
this.wt = null; // WebTransport instance
|
||||||
|
this.ws = null; // WebSocket fallback
|
||||||
this.datagramWriter = null; // WritableStreamDefaultWriter
|
this.datagramWriter = null; // WritableStreamDefaultWriter
|
||||||
this.datagramReader = null; // ReadableStreamDefaultReader
|
this.datagramReader = null; // ReadableStreamDefaultReader
|
||||||
this.cryptoSession = null; // WzpCryptoSession (WASM)
|
this.cryptoSession = null; // WzpCryptoSession (WASM)
|
||||||
@@ -48,6 +50,7 @@ class WZPFullClient {
|
|||||||
this.sequence = 0;
|
this.sequence = 0;
|
||||||
this._wasmModule = null;
|
this._wasmModule = null;
|
||||||
this._connected = false;
|
this._connected = false;
|
||||||
|
this._useWebTransport = false; // true if WT connected, false = WS fallback
|
||||||
this._startTime = 0;
|
this._startTime = 0;
|
||||||
this._statsInterval = null;
|
this._statsInterval = null;
|
||||||
this._recvLoopRunning = false;
|
this._recvLoopRunning = false;
|
||||||
@@ -61,49 +64,45 @@ class WZPFullClient {
|
|||||||
async connect() {
|
async connect() {
|
||||||
if (this._connected) return;
|
if (this._connected) return;
|
||||||
|
|
||||||
// --- Guard: WebTransport support ---
|
|
||||||
if (typeof WebTransport === 'undefined') {
|
|
||||||
throw new Error(
|
|
||||||
'WebTransport is not supported in this browser. ' +
|
|
||||||
'Use the hybrid (?variant=hybrid) or pure (?variant=pure) variant instead.'
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
this._status('Loading WASM module...');
|
this._status('Loading WASM module...');
|
||||||
|
|
||||||
// 1. Load WASM
|
// 1. Load WASM (FEC + crypto)
|
||||||
this._wasmModule = await import(WZP_WASM_PATH);
|
this._wasmModule = await import(WZP_WASM_PATH);
|
||||||
await this._wasmModule.default();
|
await this._wasmModule.default();
|
||||||
|
|
||||||
this._status('Connecting via WebTransport to ' + this.url + '...');
|
// 2. Try WebTransport first, fall back to WebSocket
|
||||||
|
let wtSuccess = false;
|
||||||
// 2. WebTransport connection
|
if (typeof WebTransport !== 'undefined' && this.url) {
|
||||||
// The URL should include the room, e.g. https://host:port/room
|
try {
|
||||||
const wtUrl = this.url + '/' + encodeURIComponent(this.room);
|
this._status('Trying WebTransport...');
|
||||||
this.wt = new WebTransport(wtUrl);
|
const wtUrl = this.url + '/' + encodeURIComponent(this.room);
|
||||||
|
this.wt = new WebTransport(wtUrl);
|
||||||
this.wt.closed.then(() => {
|
await Promise.race([
|
||||||
const wasConnected = this._connected;
|
this.wt.ready,
|
||||||
this._cleanup();
|
new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), 3000)),
|
||||||
if (wasConnected) {
|
]);
|
||||||
this._status('WebTransport closed');
|
this.datagramWriter = this.wt.datagrams.writable.getWriter();
|
||||||
|
this.datagramReader = this.wt.datagrams.readable.getReader();
|
||||||
|
this._status('Performing key exchange...');
|
||||||
|
await this._performKeyExchange();
|
||||||
|
wtSuccess = true;
|
||||||
|
this._useWebTransport = true;
|
||||||
|
} catch (e) {
|
||||||
|
console.warn('[wzp-full] WebTransport failed, falling back to WebSocket:', e.message);
|
||||||
|
if (this.wt) { try { this.wt.close(); } catch (_) {} }
|
||||||
|
this.wt = null;
|
||||||
|
this.datagramWriter = null;
|
||||||
|
this.datagramReader = null;
|
||||||
}
|
}
|
||||||
}).catch((err) => {
|
}
|
||||||
this._cleanup();
|
|
||||||
this._status('WebTransport error: ' + err.message);
|
|
||||||
});
|
|
||||||
|
|
||||||
await this.wt.ready;
|
if (!wtSuccess) {
|
||||||
|
// WebSocket fallback (same as hybrid — WASM loaded but uses WS transport)
|
||||||
|
this._useWebTransport = false;
|
||||||
|
await this._connectWebSocket();
|
||||||
|
}
|
||||||
|
|
||||||
// 3. Get datagram streams (unreliable, QUIC DATAGRAM frames)
|
// 3. Initialise FEC
|
||||||
this.datagramWriter = this.wt.datagrams.writable.getWriter();
|
|
||||||
this.datagramReader = this.wt.datagrams.readable.getReader();
|
|
||||||
|
|
||||||
// 4. Key exchange over a bidirectional stream
|
|
||||||
this._status('Performing key exchange...');
|
|
||||||
await this._performKeyExchange();
|
|
||||||
|
|
||||||
// 5. Initialise FEC (5 source symbols per block, 256-byte symbols)
|
|
||||||
this.fecEncoder = new this._wasmModule.WzpFecEncoder(5, 256);
|
this.fecEncoder = new this._wasmModule.WzpFecEncoder(5, 256);
|
||||||
this.fecDecoder = new this._wasmModule.WzpFecDecoder(5, 256);
|
this.fecDecoder = new this._wasmModule.WzpFecDecoder(5, 256);
|
||||||
|
|
||||||
@@ -113,10 +112,50 @@ class WZPFullClient {
|
|||||||
this._startTime = Date.now();
|
this._startTime = Date.now();
|
||||||
this._startStatsTimer();
|
this._startStatsTimer();
|
||||||
|
|
||||||
// 6. Start receive loop (runs until disconnect)
|
// 4. Start receive loop (WebTransport only — WS uses onmessage)
|
||||||
this._recvLoop();
|
if (this._useWebTransport) {
|
||||||
|
this._recvLoop();
|
||||||
|
this._status('Connected to room: ' + this.room + ' (WebTransport, encrypted, FEC active)');
|
||||||
|
} else {
|
||||||
|
this._status('Connected to room: ' + this.room + ' (WebSocket fallback, WASM FEC loaded)');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this._status('Connected to room: ' + this.room + ' (encrypted, FEC active)');
|
/**
|
||||||
|
* WebSocket fallback connection (used when WebTransport unavailable).
|
||||||
|
*/
|
||||||
|
async _connectWebSocket() {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this._status('Connecting via WebSocket (fallback)...');
|
||||||
|
this.ws = new WebSocket(this.wsUrl);
|
||||||
|
this.ws.binaryType = 'arraybuffer';
|
||||||
|
|
||||||
|
this.ws.onopen = () => {
|
||||||
|
this._status('WebSocket connected to room: ' + this.room);
|
||||||
|
resolve();
|
||||||
|
};
|
||||||
|
|
||||||
|
this.ws.onmessage = (event) => {
|
||||||
|
if (!(event.data instanceof ArrayBuffer)) return;
|
||||||
|
const pcm = new Int16Array(event.data);
|
||||||
|
this.stats.recv++;
|
||||||
|
if (this.onAudio) this.onAudio(pcm);
|
||||||
|
};
|
||||||
|
|
||||||
|
this.ws.onclose = () => {
|
||||||
|
if (this._connected) {
|
||||||
|
this._cleanup();
|
||||||
|
this._status('Disconnected');
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
this.ws.onerror = () => {
|
||||||
|
if (!this._connected) {
|
||||||
|
this._cleanup();
|
||||||
|
reject(new Error('WebSocket connection failed'));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -128,6 +167,10 @@ class WZPFullClient {
|
|||||||
try { this.wt.close(); } catch (_) { /* ignore */ }
|
try { this.wt.close(); } catch (_) { /* ignore */ }
|
||||||
this.wt = null;
|
this.wt = null;
|
||||||
}
|
}
|
||||||
|
if (this.ws) {
|
||||||
|
try { this.ws.close(); } catch (_) { /* ignore */ }
|
||||||
|
this.ws = null;
|
||||||
|
}
|
||||||
this._cleanup();
|
this._cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,7 +182,19 @@ class WZPFullClient {
|
|||||||
* @param {ArrayBuffer} pcmBuffer 960-sample Int16 PCM (1920 bytes)
|
* @param {ArrayBuffer} pcmBuffer 960-sample Int16 PCM (1920 bytes)
|
||||||
*/
|
*/
|
||||||
async sendAudio(pcmBuffer) {
|
async sendAudio(pcmBuffer) {
|
||||||
if (!this._connected || !this.datagramWriter || !this.cryptoSession) return;
|
if (!this._connected) return;
|
||||||
|
|
||||||
|
// WebSocket fallback: send raw PCM like pure/hybrid
|
||||||
|
if (!this._useWebTransport) {
|
||||||
|
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||||
|
this.ws.send(pcmBuffer);
|
||||||
|
this.sequence++;
|
||||||
|
this.stats.sent++;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.datagramWriter || !this.cryptoSession) return;
|
||||||
|
|
||||||
const pcmBytes = new Uint8Array(pcmBuffer);
|
const pcmBytes = new Uint8Array(pcmBuffer);
|
||||||
|
|
||||||
|
|||||||
257
docs/WS_RELAY_SPEC.md
Normal file
257
docs/WS_RELAY_SPEC.md
Normal file
@@ -0,0 +1,257 @@
|
|||||||
|
# WS Support in wzp-relay — Implementation Spec
|
||||||
|
|
||||||
|
## Goal
|
||||||
|
|
||||||
|
Add WebSocket listener to `wzp-relay` so browsers connect directly, eliminating `wzp-web` bridge.
|
||||||
|
|
||||||
|
```
|
||||||
|
Before: Browser → WS → wzp-web → QUIC → wzp-relay
|
||||||
|
After: Browser → WS → wzp-relay (handles both WS + QUIC)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
wzp-relay
|
||||||
|
├── QUIC listener (:4433) — native clients, inter-relay
|
||||||
|
├── WS listener (:8080) — browsers via Caddy
|
||||||
|
│ ├── GET /ws/{room} — WebSocket upgrade
|
||||||
|
│ └── Auth: first msg = {"type":"auth","token":"..."}
|
||||||
|
└── Shared RoomManager — both transports in same rooms
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key Changes
|
||||||
|
|
||||||
|
### 1. Abstract `Participant` over transport type
|
||||||
|
|
||||||
|
**File: `room.rs`**
|
||||||
|
|
||||||
|
Currently:
|
||||||
|
```rust
|
||||||
|
struct Participant {
|
||||||
|
id: ParticipantId,
|
||||||
|
_addr: std::net::SocketAddr,
|
||||||
|
transport: Arc<wzp_transport::QuinnTransport>,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Change to:
|
||||||
|
```rust
|
||||||
|
struct Participant {
|
||||||
|
id: ParticipantId,
|
||||||
|
_addr: std::net::SocketAddr,
|
||||||
|
sender: ParticipantSender,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// How to send a media packet to a participant.
|
||||||
|
enum ParticipantSender {
|
||||||
|
Quic(Arc<wzp_transport::QuinnTransport>),
|
||||||
|
WebSocket(tokio::sync::mpsc::Sender<bytes::Bytes>),
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The `others()` method returns `Vec<ParticipantSender>` instead of `Vec<Arc<QuinnTransport>>`.
|
||||||
|
|
||||||
|
`ParticipantSender` implements a `send_pcm(&self, data: &[u8])` method:
|
||||||
|
- **Quic**: wraps in `MediaPacket`, calls `transport.send_media()`
|
||||||
|
- **WebSocket**: sends raw binary frame via the mpsc channel
|
||||||
|
|
||||||
|
### 2. Add `join_ws()` to RoomManager
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub fn join_ws(
|
||||||
|
&mut self,
|
||||||
|
room_name: &str,
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
sender: tokio::sync::mpsc::Sender<bytes::Bytes>,
|
||||||
|
fingerprint: Option<&str>,
|
||||||
|
) -> Result<ParticipantId, String>
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Add WS listener in `main.rs`
|
||||||
|
|
||||||
|
New flag: `--ws-port 8080`
|
||||||
|
|
||||||
|
```rust
|
||||||
|
if let Some(ws_port) = config.ws_port {
|
||||||
|
let room_mgr = room_mgr.clone();
|
||||||
|
let auth_url = config.auth_url.clone();
|
||||||
|
let metrics = metrics.clone();
|
||||||
|
tokio::spawn(run_ws_server(ws_port, room_mgr, auth_url, metrics));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. WebSocket handler (`ws.rs` — new file)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use axum::{
|
||||||
|
extract::{ws::{Message, WebSocket}, Path, WebSocketUpgrade},
|
||||||
|
routing::get,
|
||||||
|
Router,
|
||||||
|
};
|
||||||
|
|
||||||
|
async fn ws_handler(
|
||||||
|
Path(room): Path<String>,
|
||||||
|
ws: WebSocketUpgrade,
|
||||||
|
/* state */
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
ws.on_upgrade(move |socket| handle_ws(socket, room, state))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_ws(mut socket: WebSocket, room: String, state: WsState) {
|
||||||
|
let addr = /* peer addr */;
|
||||||
|
|
||||||
|
// 1. Auth: first message must be {"type":"auth","token":"..."}
|
||||||
|
let fingerprint = if let Some(ref auth_url) = state.auth_url {
|
||||||
|
match socket.recv().await {
|
||||||
|
Some(Ok(Message::Text(text))) => {
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&text)?;
|
||||||
|
if parsed["type"] == "auth" {
|
||||||
|
let token = parsed["token"].as_str().unwrap();
|
||||||
|
let client = auth::validate_token(auth_url, token).await?;
|
||||||
|
Some(client.fingerprint)
|
||||||
|
} else { return; }
|
||||||
|
}
|
||||||
|
_ => return,
|
||||||
|
}
|
||||||
|
} else { None };
|
||||||
|
|
||||||
|
// 2. Create mpsc channel for outbound frames
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(64);
|
||||||
|
|
||||||
|
// 3. Join room
|
||||||
|
let participant_id = {
|
||||||
|
let mut mgr = state.room_mgr.lock().await;
|
||||||
|
mgr.join_ws(&room, addr, tx, fingerprint.as_deref())?
|
||||||
|
};
|
||||||
|
|
||||||
|
// 4. Run send/recv loops
|
||||||
|
let (mut ws_tx, mut ws_rx) = socket.split();
|
||||||
|
|
||||||
|
// Outbound: mpsc rx → WS send
|
||||||
|
let send_task = tokio::spawn(async move {
|
||||||
|
while let Some(data) = rx.recv().await {
|
||||||
|
if ws_tx.send(Message::Binary(data.to_vec())).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Inbound: WS recv → fan-out to room
|
||||||
|
loop {
|
||||||
|
match ws_rx.next().await {
|
||||||
|
Some(Ok(Message::Binary(data))) => {
|
||||||
|
// Raw PCM Int16 from browser — fan-out to all others
|
||||||
|
let others = {
|
||||||
|
let mgr = state.room_mgr.lock().await;
|
||||||
|
mgr.others(&room, participant_id)
|
||||||
|
};
|
||||||
|
for other in &others {
|
||||||
|
other.send_raw(&data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Ok(Message::Close(_))) | None => break,
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Cleanup
|
||||||
|
send_task.abort();
|
||||||
|
let mut mgr = state.room_mgr.lock().await;
|
||||||
|
mgr.leave(&room, participant_id);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 5. Cross-transport fan-out
|
||||||
|
|
||||||
|
When a QUIC participant sends audio → WS participants receive raw PCM bytes.
|
||||||
|
When a WS participant sends audio → QUIC participants receive a `MediaPacket`.
|
||||||
|
|
||||||
|
The `ParticipantSender::send_raw()` method:
|
||||||
|
```rust
|
||||||
|
impl ParticipantSender {
|
||||||
|
async fn send_raw(&self, pcm_bytes: &[u8]) {
|
||||||
|
match self {
|
||||||
|
ParticipantSender::WebSocket(tx) => {
|
||||||
|
let _ = tx.try_send(bytes::Bytes::copy_from_slice(pcm_bytes));
|
||||||
|
}
|
||||||
|
ParticipantSender::Quic(transport) => {
|
||||||
|
// Wrap raw PCM in a MediaPacket
|
||||||
|
let pkt = MediaPacket {
|
||||||
|
header: MediaHeader::default_pcm(),
|
||||||
|
payload: bytes::Bytes::copy_from_slice(pcm_bytes),
|
||||||
|
quality_report: None,
|
||||||
|
};
|
||||||
|
let _ = transport.send_media(&pkt).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
For QUIC→WS direction, `run_participant` extracts `pkt.payload` bytes and sends to WS channels.
|
||||||
|
|
||||||
|
### 6. Dependencies to add
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# wzp-relay/Cargo.toml
|
||||||
|
axum = { version = "0.8", features = ["ws"] }
|
||||||
|
tokio = { version = "1", features = ["full"] } # already present
|
||||||
|
```
|
||||||
|
|
||||||
|
### 7. Config change
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// config.rs
|
||||||
|
pub struct RelayConfig {
|
||||||
|
// ... existing fields ...
|
||||||
|
pub ws_port: Option<u16>,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 8. Docker compose change (featherChat side)
|
||||||
|
|
||||||
|
Remove `wzp-web` service entirely. Update Caddy to proxy `/audio/*` to relay's WS port:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# Before:
|
||||||
|
wzp-web:
|
||||||
|
entrypoint: ["wzp-web"]
|
||||||
|
command: ["--port", "8080", "--relay", "172.28.0.10:4433"]
|
||||||
|
|
||||||
|
# After: REMOVED. Relay handles WS directly.
|
||||||
|
|
||||||
|
wzp-relay:
|
||||||
|
command:
|
||||||
|
- "--listen"
|
||||||
|
- "0.0.0.0:4433"
|
||||||
|
- "--ws-port"
|
||||||
|
- "8080"
|
||||||
|
- "--auth-url"
|
||||||
|
- "http://warzone-server:7700/v1/auth/validate"
|
||||||
|
```
|
||||||
|
|
||||||
|
## What Stays the Same
|
||||||
|
|
||||||
|
- Browser's `startAudio()` — unchanged, still connects WS to `/audio/ws/ROOM`
|
||||||
|
- Caddy proxies `/audio/*` → relay:8080 (same path, different backend)
|
||||||
|
- Auth flow — same JSON token as first message
|
||||||
|
- PCM format — same Int16 binary frames
|
||||||
|
- QUIC clients — unchanged, still connect to :4433
|
||||||
|
- Room naming, ACL, session management — all unchanged
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
1. Start relay with `--ws-port 8080 --listen 0.0.0.0:4433`
|
||||||
|
2. Open browser, initiate call via featherChat
|
||||||
|
3. Verify audio flows (both directions)
|
||||||
|
4. Verify QUIC + WS clients can be in same room (mixed mode)
|
||||||
|
5. Verify auth works
|
||||||
|
6. Verify room cleanup on disconnect
|
||||||
|
|
||||||
|
## Migration Path
|
||||||
|
|
||||||
|
1. Implement WS in relay
|
||||||
|
2. Test with featherChat (no featherChat changes needed)
|
||||||
|
3. Remove wzp-web from Docker stack
|
||||||
|
4. Later: add WebTransport alongside WS
|
||||||
Reference in New Issue
Block a user