9 Commits

Author SHA1 Message Date
Siavash Sameni
aa09275015 feat: WebSocket support in relay — browsers connect directly, no bridge
Implements WS_RELAY_SPEC.md: relay handles both QUIC and WebSocket clients
in shared rooms, eliminating the wzp-web bridge server.

Room abstraction (room.rs):
- New ParticipantSender enum: Quic(transport) | WebSocket(mpsc::Sender)
- send_raw() sends PCM bytes to either transport type
- join_ws() convenience method for WS clients
- Forwarding loops handle mixed QUIC+WS rooms:
  QUIC→QUIC: send_media (trunked if enabled)
  QUIC→WS: send_raw payload bytes
  WS→QUIC: send_raw wraps in MediaPacket
  WS→WS: send_raw binary

WebSocket handler (ws.rs):
- GET /ws/{room} → WebSocket upgrade via axum
- Auth: first msg {"type":"auth","token":"..."} → validates against FC
- mpsc channel bridges room fan-out to WS binary frames
- Session + presence lifecycle matches QUIC path
- Optional static file serving via --static-dir (tower-http ServeDir)

Config: --ws-port 8080, --static-dir ./static
Proto: MediaHeader::default_pcm() for WS→QUIC wrapping

63 relay + 54 proto tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:38:33 +04:00
Siavash Sameni
59bf3f6587 docs: WS relay spec — add WebSocket listener to eliminate wzp-web bridge
Detailed implementation plan for adding WS support directly to wzp-relay:
- Abstract Participant over transport type (Quic + WebSocket enum)
- New --ws-port flag for browser connections
- Cross-transport fan-out (QUIC↔WS in same rooms)
- Auth, room management, session cleanup unchanged
- Eliminates wzp-web container entirely

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:27:52 +04:00
Siavash Sameni
4fb15fe7a3 feat: P3-T3 bandwidth estimation — GCC-style congestion control
BandwidthEstimator tracks available bandwidth using dual signals:

Delay-based: EMA of RTT vs baseline minimum. If RTT > 1.5x baseline
→ Overuse (congestion). If RTT < 1.1x baseline → Underuse (headroom).
Baseline slowly drifts up to handle route changes.

Loss-based: sliding window of 10 loss samples. Average > 5% → congested.

Rate adaptation (AIMD):
- Overuse OR loss congested: decrease 15% (multiplicative)
- Underuse AND no loss: increase 5% (additive)
- Normal: hold steady
- Clamped to [min_bw, max_bw]

recommended_profile() maps bandwidth to quality tier:
- >= 25 kbps → GOOD (Opus 24k + 20% FEC)
- >= 8 kbps → DEGRADED (Opus 6k + 50% FEC)
- < 8 kbps → CATASTROPHIC (Codec2 1200 + 100% FEC)

from_quality_report() integrates with existing QualityReport packets.

54 proto tests passing (12 new bandwidth tests).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 18:51:08 +04:00
Siavash Sameni
e595fe6591 feat: P3-T6 per-session forwarding — relay links for hop-by-hop media
RelayLink: QUIC connection to peer relay (SNI "_relay") for forwarding
specific sessions. Methods: connect, forward, add/remove_session, is_idle.

RelayLinkManager: manages connections to multiple peers.
- get_or_connect: lazy connection establishment
- forward_to: send media packet to specific peer
- register/unregister_session: track which sessions use which links
- Auto-closes idle links on session unregister

Protocol: added SignalMessage::SessionForward { session_id,
target_fingerprint, source_relay } and SessionForwardAck { session_id,
room_name } for relay-link session setup signaling.

Building block for P3-T7 (call setup over mesh) which wires
route resolution + relay links + handshake into a complete flow.

62 relay tests + 42 proto tests passing (7 new relay_link tests).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 18:45:36 +04:00
Siavash Sameni
326aa491cc feat: P3-T5 route resolution — find relay path to any fingerprint
RouteResolver queries PresenceRegistry to determine how to reach a target:
- Route::Local — connected to this relay
- Route::DirectPeer(addr) — on a directly connected peer relay
- Route::Chain(addrs) — multi-hop (structure ready, single-hop for now)
- Route::NotFound — not in any known relay

Protocol: added SignalMessage::RouteQuery { fingerprint, ttl } and
RouteResponse { fingerprint, found, relay_chain } for peer-to-peer
route queries over probe connections.

HTTP API: GET /route/:fingerprint returns JSON with route type + chain.

Relay handles incoming RouteQuery on probe connections: looks up locally,
replies with RouteResponse. TTL decremented for future multi-hop forwarding.

55 relay tests + 42 proto tests passing (7 new route tests).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 18:38:24 +04:00
Siavash Sameni
464e95a4bd feat: P3-T4 relay presence registry — gossip fingerprints across relay mesh
PresenceRegistry tracks who is connected where:
- register_local/unregister_local for directly connected users
- update_peer for fingerprints reported by peer relays
- lookup returns Local or Remote(addr)
- expire_stale removes entries older than timeout

Gossip via probe connections:
- New SignalMessage::PresenceUpdate { fingerprints, relay_addr }
- Probes send local fingerprints every 10s alongside Ping/Pong
- Receiving relay updates its remote presence table

HTTP API on metrics port:
- GET /presence — all known fingerprints + locations
- GET /presence/:fingerprint — single lookup
- GET /peers — peer relays + their connected users

Wired into relay main:
- Registry created at startup
- register_local after auth+handshake
- unregister_local on disconnect
- Passed to probe mesh and metrics server

Also marks FC-10 as DONE in integration tracker.

48 relay tests + 42 proto tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:36:55 +04:00
Siavash Sameni
fd95167705 chore: update featherChat submodule to v0.0.38 (feature/wzp-call-infrastructure)
featherChat now implements:
- FC-2: Call state management (calls.rs, CallState, sled persistence)
- FC-3: WS call signal routing (Offer→Ringing, Answer→Active, Hangup→Ended)
- FC-5: Group-to-room mapping (hash_room_name — same convention as WZP)
- FC-6: Presence API (online/devices per fingerprint, batch query)
- FC-7: Missed call notifications (sled storage, retrieval endpoint)

Only FC-10 (web bridge shared auth) remains on FC side.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:21:55 +04:00
Siavash Sameni
9e7fea7633 test: P2-T1-S5 long-session regression — 60s call with drift/loss assertions
3 tests in crates/wzp-client/tests/long_session.rs:

1. long_session_no_drift — 3000 frames (60s) through full encoder/decoder
   pipeline, asserts >95% decoded, 0 overruns, 0 underruns

2. long_session_with_simulated_loss — drops every 20th packet + reorders,
   asserts >90% decoded, confirms PLC fills gaps (2999/3000)

3. long_session_stats_consistency — verifies stats.total_decoded matches
   actual decoded count over 60s (no accounting drift)

Completes P2-T1-S5. Phase 2 is now fully done.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 20:59:27 +04:00
Siavash Sameni
993cf9ab7f docs: full system architecture with Mermaid diagrams + project README
ARCHITECTURE.md covers the entire system with 13 Mermaid diagrams:
- System overview (send/recv pipeline, relay SFU)
- Crate dependency graph (8 crates + featherChat)
- Wire formats (MediaHeader, MiniHeader, TrunkFrame, QualityReport, SignalMessage)
- Quality profiles with adaptive switching thresholds
- Cryptographic handshake sequence (X25519 + Ed25519)
- Identity model (BIP39 seed → HKDF → Ed25519/X25519 → Fingerprint)
- Relay modes (Room SFU, Forward, Probe)
- Web bridge architecture (Browser ↔ WS ↔ QUIC)
- FEC protection pipeline (RaptorQ + interleaving)
- Telemetry stack (Prometheus → Grafana)
- Session state machine
- Audio processing detail (denoise → VAD → encode → FEC → encrypt)
- Adaptive jitter buffer flow
- Deployment topology (multi-region)
- featherChat integration sequence

README.md: quick start, feature list, documentation index, build instructions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 16:41:39 +04:00
22 changed files with 3366 additions and 320 deletions

47
Cargo.lock generated
View File

@@ -169,6 +169,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core 0.4.5",
"base64",
"bytes",
"futures-util",
"http",
@@ -184,8 +185,10 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite 0.24.0",
"tower",
"tower-layer",
"tower-service",
@@ -220,7 +223,7 @@ dependencies = [
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tokio-tungstenite 0.28.0",
"tower",
"tower-layer",
"tower-service",
@@ -380,6 +383,12 @@ version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.11.1"
@@ -3140,6 +3149,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.24.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.28.0"
@@ -3149,7 +3170,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
"tungstenite 0.28.0",
]
[[package]]
@@ -3366,6 +3387,24 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror 1.0.69",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.28.0"
@@ -3507,7 +3546,7 @@ dependencies = [
[[package]]
name = "warzone-protocol"
version = "0.0.21"
version = "0.0.38"
dependencies = [
"base64",
"bincode",
@@ -4228,6 +4267,7 @@ dependencies = [
"async-trait",
"axum 0.7.9",
"bytes",
"futures-util",
"prometheus",
"quinn",
"reqwest",
@@ -4236,6 +4276,7 @@ dependencies = [
"serde_json",
"tokio",
"toml",
"tower-http",
"tracing",
"tracing-subscriber",
"wzp-client",

87
README.md Normal file
View File

@@ -0,0 +1,87 @@
# WarzonePhone
Custom lossy VoIP protocol built in Rust. E2E encrypted, FEC-protected, adaptive quality, designed for hostile network conditions.
## Quick Start
```bash
# Build
cargo build --release
# Run relay
./target/release/wzp-relay --listen 0.0.0.0:4433
# Send a test tone
./target/release/wzp-client --send-tone 5 relay-addr:4433
# Web bridge (browser calls)
./target/release/wzp-web --port 8080 --relay 127.0.0.1:4433 --tls
# Open https://localhost:8080/room-name in two browser tabs
```
## Architecture
See [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md) for the full system architecture with Mermaid diagrams covering:
- System overview and data flow
- Crate dependency graph (8 crates)
- Wire formats (MediaHeader, MiniHeader, TrunkFrame, SignalMessage)
- Cryptographic handshake (X25519 + Ed25519 + ChaCha20-Poly1305)
- Identity model (BIP39 seed, featherChat compatible)
- Quality profiles (GOOD/DEGRADED/CATASTROPHIC)
- FEC protection (RaptorQ with interleaving)
- Adaptive jitter buffer (NetEq-inspired)
- Telemetry stack (Prometheus + Grafana)
- Deployment topology
## Features
- **3 quality tiers**: Opus 24k (28.8 kbps) / Opus 6k (9 kbps) / Codec2 1200 (2.4 kbps)
- **RaptorQ FEC**: Recovers from 20-100% packet loss depending on tier
- **E2E encryption**: ChaCha20-Poly1305 with X25519 key exchange
- **Adaptive jitter buffer**: EMA-based playout delay tracking
- **Silence suppression**: VAD + comfort noise (~50% bandwidth savings)
- **ML noise removal**: RNNoise (nnnoiseless pure Rust port)
- **Mini-frames**: 67% header compression for steady-state packets
- **Trunking**: Multiplex sessions into batched datagrams
- **featherChat integration**: Shared BIP39 identity, token auth, call signaling
- **Prometheus metrics**: Relay, web bridge, inter-relay probes
- **Grafana dashboard**: Pre-built JSON with 18 panels
## Documentation
| Document | Description |
|----------|-------------|
| [ARCHITECTURE.md](docs/ARCHITECTURE.md) | Full system architecture with diagrams |
| [TELEMETRY.md](docs/TELEMETRY.md) | Prometheus metrics specification |
| [INTEGRATION_TASKS.md](docs/INTEGRATION_TASKS.md) | featherChat integration tracker |
| [WZP-FC-SHARED-CRATES.md](docs/WZP-FC-SHARED-CRATES.md) | Shared crate strategy |
| [grafana-dashboard.json](docs/grafana-dashboard.json) | Importable Grafana dashboard |
## Binaries
| Binary | Description |
|--------|-------------|
| `wzp-relay` | Relay daemon (SFU room mode, forward mode, probes) |
| `wzp-client` | CLI client (send-tone, record, live mic, echo-test, drift-test, sweep) |
| `wzp-web` | Browser bridge (HTTPS + WebSocket + AudioWorklet) |
| `wzp-bench` | Component benchmarks |
## Linux Build
```bash
./scripts/build-linux.sh --prepare # Create Hetzner VM + install deps
./scripts/build-linux.sh --build # Build release binaries
./scripts/build-linux.sh --transfer # Download to target/linux-x86_64/
./scripts/build-linux.sh --destroy # Delete VM
```
## Tests
```bash
cargo test --workspace # 272 tests
```
## License
MIT OR Apache-2.0

View File

@@ -104,6 +104,11 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::Unmute => CallSignalType::Unmute,
SignalMessage::Transfer { .. } => CallSignalType::Transfer,
SignalMessage::TransferAck => CallSignalType::Offer, // reuse
SignalMessage::PresenceUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::RouteQuery { .. } => CallSignalType::Offer, // reuse
SignalMessage::RouteResponse { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse
}
}

View File

@@ -0,0 +1,190 @@
//! WZP-P2-T1-S5: 60-second long-session regression tests.
//!
//! Verifies that the full codec + FEC + jitter buffer pipeline does not drift
//! or degrade over a sustained 60-second (3000-frame) session. Runs entirely
//! in-process with no network — packets flow directly from encoder to decoder.
use wzp_client::call::{CallConfig, CallDecoder, CallEncoder};
use wzp_proto::QualityProfile;
const FRAME_SAMPLES: usize = 960; // 20ms @ 48kHz
const SAMPLE_RATE: f32 = 48_000.0;
const TOTAL_FRAMES: u64 = 3_000; // 60 seconds at 50 fps
/// Build a CallConfig tuned for direct-loopback testing (no network).
///
/// Disables silence suppression and noise suppression (which would mangle
/// or squelch the synthetic tone), uses a fixed (non-adaptive) jitter buffer
/// with min_depth=1 so that packets are played out as soon as they arrive.
fn test_config() -> CallConfig {
CallConfig {
profile: QualityProfile::GOOD,
jitter_target: 4,
jitter_max: 500,
jitter_min: 1,
suppression_enabled: false,
noise_suppression: false,
adaptive_jitter: false,
..Default::default()
}
}
/// Generate a 20ms frame of 440 Hz sine tone.
fn sine_frame(frame_offset: u64) -> Vec<i16> {
let start_sample = frame_offset * FRAME_SAMPLES as u64;
(0..FRAME_SAMPLES)
.map(|i| {
let t = (start_sample + i as u64) as f32 / SAMPLE_RATE;
(f32::sin(2.0 * std::f32::consts::PI * 440.0 * t) * 16000.0) as i16
})
.collect()
}
/// 60-second session with a perfect (lossless, in-order) channel.
///
/// Encodes 3000 frames of 440 Hz tone, feeds every packet directly into the
/// decoder, and verifies:
/// - frame loss < 5% (>2850 of 3000 source frames decoded or PLC'd)
/// - no panics
///
/// Note: the encoder shares a single sequence counter between source and
/// repair packets. Since repair packets are NOT pushed into the jitter
/// buffer, each FEC block creates a gap in the playout sequence. GOOD
/// profile (5 frames/block, fec_ratio=0.2) generates 1 repair per block,
/// so every 6th seq number is a "phantom" Missing in the jitter buffer.
/// The jitter buffer correctly fills these gaps with PLC. We call
/// `decode_next` once per encode tick; the buffer stays shallow because
/// PLC frames consume the phantom seqs at the same rate they're created.
#[test]
fn long_session_no_drift() {
let config = test_config();
let mut encoder = CallEncoder::new(&config);
let mut decoder = CallDecoder::new(&config);
let mut frames_decoded = 0u64;
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
for i in 0..TOTAL_FRAMES {
let pcm = sine_frame(i);
let packets = encoder.encode_frame(&pcm).expect("encode should not fail");
for pkt in packets {
decoder.ingest(pkt);
}
// Decode one frame per tick (mirrors real-time 50 fps cadence).
if decoder.decode_next(&mut pcm_buf).is_some() {
frames_decoded += 1;
}
}
let stats = decoder.stats();
println!(
"long_session_no_drift: decoded={frames_decoded}/{TOTAL_FRAMES}, \
underruns={}, overruns={}, depth={}, max_depth={}, late={}, lost={}",
stats.underruns, stats.overruns, stats.current_depth, stats.max_depth_seen,
stats.packets_late, stats.packets_lost,
);
// With 1 decode per tick over 3000 ticks, we expect ~3000 decoded frames
// (some via PLC for repair-seq gaps). Allow up to 5% gap.
assert!(
frames_decoded > 2850,
"frame loss too high: decoded {frames_decoded}/3000 (need >2850 = <5% loss)"
);
}
/// 60-second session with simulated 5% packet loss and reordering.
///
/// Every 20th source packet is dropped; pairs of adjacent packets are swapped
/// every 7 frames. Verifies that FEC + jitter buffer recover gracefully:
/// - frame loss < 10% (FEC should recover some of the 5% artificial loss)
/// - no panics
#[test]
fn long_session_with_simulated_loss() {
let config = test_config();
let mut encoder = CallEncoder::new(&config);
let mut decoder = CallDecoder::new(&config);
let mut frames_decoded = 0u64;
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
for i in 0..TOTAL_FRAMES {
let pcm = sine_frame(i);
let packets = encoder.encode_frame(&pcm).expect("encode should not fail");
let mut batch: Vec<_> = packets.into_iter().collect();
// Simulate reordering: swap first two packets in the batch every 7 frames.
if i % 7 == 0 && batch.len() >= 2 {
batch.swap(0, 1);
}
for (j, pkt) in batch.into_iter().enumerate() {
// Drop every 20th *source* (non-repair) packet to simulate ~5% loss.
if !pkt.header.is_repair && i % 20 == 0 && j == 0 {
continue; // drop this packet
}
decoder.ingest(pkt);
}
if decoder.decode_next(&mut pcm_buf).is_some() {
frames_decoded += 1;
}
}
let stats = decoder.stats();
println!(
"long_session_with_simulated_loss: decoded={frames_decoded}/{TOTAL_FRAMES}, \
underruns={}, overruns={}, depth={}, max_depth={}, late={}, lost={}",
stats.underruns, stats.overruns, stats.current_depth, stats.max_depth_seen,
stats.packets_late, stats.packets_lost,
);
// With 5% artificial loss + FEC recovery + PLC, we should still get >90% decoded.
assert!(
frames_decoded > 2700,
"frame loss too high under simulated loss: decoded {frames_decoded}/3000 (need >2700 = <10%)"
);
}
/// Verify that the jitter buffer's decoded-frame count is consistent with its
/// own internal statistics over a long session.
#[test]
fn long_session_stats_consistency() {
let config = test_config();
let mut encoder = CallEncoder::new(&config);
let mut decoder = CallDecoder::new(&config);
let mut frames_decoded = 0u64;
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
for i in 0..TOTAL_FRAMES {
let pcm = sine_frame(i);
let packets = encoder.encode_frame(&pcm).expect("encode");
for pkt in packets {
decoder.ingest(pkt);
}
if decoder.decode_next(&mut pcm_buf).is_some() {
frames_decoded += 1;
}
}
let stats = decoder.stats();
// total_decoded should match our manual counter.
assert_eq!(
stats.total_decoded, frames_decoded,
"stats.total_decoded ({}) != manually counted frames_decoded ({frames_decoded})",
stats.total_decoded,
);
// packets_received should be > 0.
assert!(
stats.packets_received > 0,
"stats.packets_received should be > 0"
);
}

View File

@@ -0,0 +1,454 @@
//! GCC-style bandwidth estimation and congestion control.
//!
//! Tracks available bandwidth using delay-based and loss-based signals,
//! then adjusts the sending bitrate to avoid congestion. The estimator
//! uses multiplicative decrease (15%) on congestion and additive increase
//! (5%) during underuse, following the general shape of Google Congestion
//! Control (GCC).
use std::collections::VecDeque;
use std::time::Instant;
use crate::packet::QualityReport;
use crate::QualityProfile;
/// Network congestion state derived from delay and loss signals.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum CongestionState {
/// Network is fine, can increase bandwidth.
Underuse,
/// Normal operation.
Normal,
/// Congestion detected, should decrease bandwidth.
Overuse,
}
/// Detects congestion from increasing RTT using an exponential moving average.
///
/// Maintains a baseline RTT (minimum observed) and compares the smoothed RTT
/// against it. If `rtt_ema > baseline * threshold_ratio`, congestion is detected.
/// The baseline slowly drifts upward to handle route changes.
struct DelayBasedDetector {
/// Baseline RTT (minimum observed).
baseline_rtt_ms: f64,
/// EMA of recent RTT.
rtt_ema: f64,
/// EMA smoothing factor.
alpha: f64,
/// Threshold: if rtt_ema > baseline * threshold_ratio, congestion detected.
threshold_ratio: f64,
/// Current state.
state: CongestionState,
/// Whether we have received any RTT sample yet.
initialized: bool,
/// Drift factor: baseline slowly increases each update to track route changes.
baseline_drift: f64,
}
impl DelayBasedDetector {
fn new() -> Self {
Self {
baseline_rtt_ms: f64::MAX,
rtt_ema: 0.0,
alpha: 0.3,
threshold_ratio: 1.5,
state: CongestionState::Normal,
initialized: false,
baseline_drift: 0.001,
}
}
/// Update the detector with a new RTT sample.
fn update(&mut self, rtt_ms: f64) {
if !self.initialized {
self.baseline_rtt_ms = rtt_ms;
self.rtt_ema = rtt_ms;
self.initialized = true;
self.state = CongestionState::Normal;
return;
}
// Track minimum RTT as baseline.
if rtt_ms < self.baseline_rtt_ms {
self.baseline_rtt_ms = rtt_ms;
} else {
// Slowly drift baseline upward to handle route changes.
self.baseline_rtt_ms += self.baseline_drift * (rtt_ms - self.baseline_rtt_ms);
}
// Update EMA.
self.rtt_ema = self.alpha * rtt_ms + (1.0 - self.alpha) * self.rtt_ema;
// Determine state.
let overuse_threshold = self.baseline_rtt_ms * self.threshold_ratio;
let underuse_threshold = self.baseline_rtt_ms * 1.1;
if self.rtt_ema > overuse_threshold {
self.state = CongestionState::Overuse;
} else if self.rtt_ema < underuse_threshold {
self.state = CongestionState::Underuse;
} else {
self.state = CongestionState::Normal;
}
}
fn state(&self) -> CongestionState {
self.state
}
}
/// Detects congestion from packet loss using a sliding window average.
struct LossBasedDetector {
/// Recent loss percentages (sliding window).
loss_window: VecDeque<f64>,
/// Maximum window size.
window_size: usize,
/// Loss threshold for congestion (default 5%).
threshold_pct: f64,
}
impl LossBasedDetector {
fn new() -> Self {
Self {
loss_window: VecDeque::with_capacity(10),
window_size: 10,
threshold_pct: 5.0,
}
}
/// Add a loss percentage sample to the window.
fn update(&mut self, loss_pct: f64) {
if self.loss_window.len() >= self.window_size {
self.loss_window.pop_front();
}
self.loss_window.push_back(loss_pct);
}
/// Returns true if the average loss in the window exceeds the threshold.
fn is_congested(&self) -> bool {
if self.loss_window.is_empty() {
return false;
}
let avg = self.loss_window.iter().sum::<f64>() / self.loss_window.len() as f64;
avg > self.threshold_pct
}
}
// ─── BandwidthEstimator ─────────────────────────────────────────────────────
/// GCC-style bandwidth estimator that tracks available bandwidth using
/// delay-based and loss-based congestion signals.
///
/// # Algorithm
///
/// - **Overuse** (delay or loss): multiplicative decrease by 15%.
/// - **Underuse** (delay) with no loss congestion: additive increase by 5%.
/// - **Normal**: hold steady.
/// - Result is always clamped to `[min_bw_kbps, max_bw_kbps]`.
pub struct BandwidthEstimator {
/// Current estimated bandwidth in kbps.
estimated_bw_kbps: f64,
/// Minimum bandwidth floor (don't go below this).
min_bw_kbps: f64,
/// Maximum bandwidth ceiling.
max_bw_kbps: f64,
/// Delay-based detector state.
delay_detector: DelayBasedDetector,
/// Loss-based detector state.
loss_detector: LossBasedDetector,
/// Last update timestamp.
last_update: Option<Instant>,
}
/// Multiplicative decrease factor applied on congestion (15% reduction).
const DECREASE_FACTOR: f64 = 0.85;
/// Additive increase factor applied during underuse (5% of current estimate).
const INCREASE_FACTOR: f64 = 0.05;
impl BandwidthEstimator {
/// Create a new bandwidth estimator.
///
/// - `initial_bw_kbps`: starting bandwidth estimate.
/// - `min`: minimum bandwidth floor in kbps.
/// - `max`: maximum bandwidth ceiling in kbps.
pub fn new(initial_bw_kbps: f64, min: f64, max: f64) -> Self {
Self {
estimated_bw_kbps: initial_bw_kbps,
min_bw_kbps: min,
max_bw_kbps: max,
delay_detector: DelayBasedDetector::new(),
loss_detector: LossBasedDetector::new(),
last_update: None,
}
}
/// Update the estimator with new network observations.
///
/// Returns the new estimated bandwidth in kbps.
///
/// - If delay overuse OR loss congested: decrease by 15% (multiplicative decrease).
/// - If delay underuse AND not loss congested: increase by 5% (additive increase).
/// - If normal: hold steady.
/// - Result is clamped to `[min, max]`.
pub fn update(&mut self, rtt_ms: f64, loss_pct: f64, _jitter_ms: f64) -> f64 {
self.delay_detector.update(rtt_ms);
self.loss_detector.update(loss_pct);
self.last_update = Some(Instant::now());
let delay_state = self.delay_detector.state();
let loss_congested = self.loss_detector.is_congested();
if delay_state == CongestionState::Overuse || loss_congested {
// Multiplicative decrease.
self.estimated_bw_kbps *= DECREASE_FACTOR;
} else if delay_state == CongestionState::Underuse && !loss_congested {
// Additive increase.
self.estimated_bw_kbps += self.estimated_bw_kbps * INCREASE_FACTOR;
}
// Normal: hold steady — no change.
// Clamp to [min, max].
self.estimated_bw_kbps = self
.estimated_bw_kbps
.clamp(self.min_bw_kbps, self.max_bw_kbps);
self.estimated_bw_kbps
}
/// Current estimated bandwidth in kbps.
pub fn estimated_kbps(&self) -> f64 {
self.estimated_bw_kbps
}
/// Current congestion state (derived from delay detector).
pub fn congestion_state(&self) -> CongestionState {
self.delay_detector.state()
}
/// Convenience method: update from a `QualityReport`.
///
/// Extracts RTT, loss, and jitter from the report and feeds them into
/// the estimator.
pub fn from_quality_report(&mut self, report: &QualityReport) -> f64 {
let rtt_ms = report.rtt_ms() as f64;
let loss_pct = report.loss_percent() as f64;
let jitter_ms = report.jitter_ms as f64;
self.update(rtt_ms, loss_pct, jitter_ms)
}
/// Recommend a `QualityProfile` based on the current bandwidth estimate.
///
/// - bw >= 25 kbps -> GOOD (Opus 24k + 20% FEC = ~28.8 kbps total)
/// - bw >= 8 kbps -> DEGRADED (Opus 6k + 50% FEC = ~9.0 kbps)
/// - bw < 8 kbps -> CATASTROPHIC (Codec2 1.2k + 100% FEC = ~2.4 kbps)
pub fn recommended_profile(&self) -> QualityProfile {
if self.estimated_bw_kbps >= 25.0 {
QualityProfile::GOOD
} else if self.estimated_bw_kbps >= 8.0 {
QualityProfile::DEGRADED
} else {
QualityProfile::CATASTROPHIC
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn initial_bandwidth() {
let bwe = BandwidthEstimator::new(50.0, 2.0, 100.0);
assert!((bwe.estimated_kbps() - 50.0).abs() < f64::EPSILON);
}
#[test]
fn stable_network_holds_bandwidth() {
let mut bwe = BandwidthEstimator::new(50.0, 2.0, 100.0);
// Feed stable, low RTT and 0% loss — after initial sample sets baseline,
// subsequent identical RTT should be underuse (rtt_ema < baseline * 1.1),
// causing slow increases. The bandwidth should stay near initial or grow slightly.
let initial = bwe.estimated_kbps();
for _ in 0..20 {
bwe.update(30.0, 0.0, 5.0);
}
// Should not have decreased significantly.
assert!(
bwe.estimated_kbps() >= initial,
"bandwidth should not decrease on stable network: got {} vs initial {}",
bwe.estimated_kbps(),
initial
);
}
#[test]
fn high_rtt_decreases_bandwidth() {
let mut bwe = BandwidthEstimator::new(50.0, 2.0, 100.0);
// Establish a low baseline.
for _ in 0..5 {
bwe.update(20.0, 0.0, 2.0);
}
let before = bwe.estimated_kbps();
// Now feed high RTT to trigger overuse.
for _ in 0..10 {
bwe.update(200.0, 0.0, 10.0);
}
assert!(
bwe.estimated_kbps() < before,
"bandwidth should decrease on high RTT: got {} vs before {}",
bwe.estimated_kbps(),
before
);
}
#[test]
fn high_loss_decreases_bandwidth() {
let mut bwe = BandwidthEstimator::new(50.0, 2.0, 100.0);
let before = bwe.estimated_kbps();
// Feed 10% loss repeatedly (above the 5% threshold).
for _ in 0..15 {
bwe.update(20.0, 10.0, 2.0);
}
assert!(
bwe.estimated_kbps() < before,
"bandwidth should decrease on high loss: got {} vs before {}",
bwe.estimated_kbps(),
before
);
}
#[test]
fn recovery_increases_bandwidth() {
let mut bwe = BandwidthEstimator::new(50.0, 2.0, 100.0);
// Drive bandwidth down with high RTT.
for _ in 0..5 {
bwe.update(20.0, 0.0, 2.0);
}
for _ in 0..20 {
bwe.update(200.0, 0.0, 10.0);
}
let low_bw = bwe.estimated_kbps();
assert!(low_bw < 50.0, "should have decreased");
// Now feed good conditions — low RTT should be underuse, causing increase.
// Reset the baseline by feeding very low RTT.
for _ in 0..30 {
bwe.update(10.0, 0.0, 1.0);
}
assert!(
bwe.estimated_kbps() > low_bw,
"bandwidth should recover: got {} vs low {}",
bwe.estimated_kbps(),
low_bw
);
}
#[test]
fn bandwidth_clamped_to_min() {
let mut bwe = BandwidthEstimator::new(10.0, 5.0, 100.0);
// Keep feeding congestion to drive bandwidth down.
for _ in 0..5 {
bwe.update(20.0, 0.0, 2.0);
}
for _ in 0..100 {
bwe.update(500.0, 50.0, 100.0);
}
assert!(
(bwe.estimated_kbps() - 5.0).abs() < f64::EPSILON,
"bandwidth should be clamped to min: got {}",
bwe.estimated_kbps()
);
}
#[test]
fn bandwidth_clamped_to_max() {
let mut bwe = BandwidthEstimator::new(90.0, 2.0, 100.0);
// Keep feeding great conditions to drive bandwidth up.
for _ in 0..200 {
bwe.update(5.0, 0.0, 1.0);
}
assert!(
bwe.estimated_kbps() <= 100.0,
"bandwidth should be clamped to max: got {}",
bwe.estimated_kbps()
);
}
#[test]
fn recommended_profile_thresholds() {
// At boundary: >= 25 kbps => GOOD
let bwe_good = BandwidthEstimator::new(25.0, 2.0, 100.0);
assert_eq!(bwe_good.recommended_profile(), QualityProfile::GOOD);
// Just below 25 => DEGRADED
let bwe_degraded = BandwidthEstimator::new(24.9, 2.0, 100.0);
assert_eq!(bwe_degraded.recommended_profile(), QualityProfile::DEGRADED);
// At boundary: >= 8 kbps => DEGRADED
let bwe_degraded2 = BandwidthEstimator::new(8.0, 2.0, 100.0);
assert_eq!(
bwe_degraded2.recommended_profile(),
QualityProfile::DEGRADED
);
// Below 8 => CATASTROPHIC
let bwe_cat = BandwidthEstimator::new(7.9, 2.0, 100.0);
assert_eq!(
bwe_cat.recommended_profile(),
QualityProfile::CATASTROPHIC
);
// High bandwidth
let bwe_high = BandwidthEstimator::new(80.0, 2.0, 100.0);
assert_eq!(bwe_high.recommended_profile(), QualityProfile::GOOD);
}
#[test]
fn from_quality_report_integration() {
let mut bwe = BandwidthEstimator::new(50.0, 2.0, 100.0);
// Build a QualityReport with moderate loss and RTT.
let report = QualityReport {
loss_pct: (10.0_f32 / 100.0 * 255.0) as u8, // ~10% loss
rtt_4ms: 25, // 100ms RTT
jitter_ms: 10,
bitrate_cap_kbps: 200,
};
let new_bw = bwe.from_quality_report(&report);
// Should return a valid bandwidth value.
assert!(new_bw > 0.0);
assert!(new_bw <= 100.0);
// The estimator should have been updated.
assert!((bwe.estimated_kbps() - new_bw).abs() < f64::EPSILON);
}
// ── Additional detector unit tests ──────────────────────────────────
#[test]
fn delay_detector_starts_normal() {
let det = DelayBasedDetector::new();
assert_eq!(det.state(), CongestionState::Normal);
}
#[test]
fn loss_detector_below_threshold() {
let mut det = LossBasedDetector::new();
for _ in 0..10 {
det.update(2.0); // 2% loss, well below 5% threshold
}
assert!(!det.is_congested());
}
#[test]
fn loss_detector_above_threshold() {
let mut det = LossBasedDetector::new();
for _ in 0..10 {
det.update(8.0); // 8% loss, above 5% threshold
}
assert!(det.is_congested());
}
}

View File

@@ -12,6 +12,7 @@
//! - Identity = 32-byte seed → HKDF → Ed25519 (signing) + X25519 (encryption)
//! - Fingerprint = SHA-256(Ed25519 pub)[:16]
pub mod bandwidth;
pub mod codec_id;
pub mod error;
pub mod jitter;
@@ -27,6 +28,7 @@ pub use packet::{
HangupReason, MediaHeader, MediaPacket, MiniFrameContext, MiniHeader, QualityReport,
SignalMessage, TrunkEntry, TrunkFrame, FRAME_TYPE_FULL, FRAME_TYPE_MINI,
};
pub use bandwidth::{BandwidthEstimator, CongestionState};
pub use quality::{AdaptiveQualityController, Tier};
pub use session::{Session, SessionEvent, SessionState};
pub use traits::*;

View File

@@ -46,6 +46,23 @@ impl MediaHeader {
/// Header size in bytes on the wire.
pub const WIRE_SIZE: usize = 12;
/// Create a default header for raw PCM relay (used by WebSocket bridge).
pub fn default_pcm() -> Self {
Self {
version: 0,
is_repair: false,
codec_id: CodecId::Opus24k,
has_quality_report: false,
fec_ratio_encoded: 0,
seq: 0,
timestamp: 0,
fec_block: 0,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
}
}
/// Encode the FEC ratio float (0.0-2.0+) to a 7-bit value (0-127).
pub fn encode_fec_ratio(ratio: f32) -> u8 {
// Map 0.0-2.0 to 0-127, clamping at 127
@@ -591,6 +608,43 @@ pub enum SignalMessage {
},
/// Acknowledge a transfer request.
TransferAck,
/// Presence update from a peer relay (gossip protocol).
/// Sent periodically over probe connections to share which fingerprints
/// are connected to the sending relay.
PresenceUpdate {
/// Fingerprints currently connected to the sending relay.
fingerprints: Vec<String>,
/// Address of the sending relay (e.g., "192.168.1.10:4433").
relay_addr: String,
},
/// Ask a peer relay to look up a fingerprint in its registry.
RouteQuery {
fingerprint: String,
ttl: u8,
},
/// Response to a route query.
RouteResponse {
fingerprint: String,
found: bool,
relay_chain: Vec<String>,
},
/// Request to set up a forwarding session for a specific fingerprint.
/// Sent over a relay link (`_relay` SNI) to ask the peer relay to
/// create a room and forward media for the given session.
SessionForward {
session_id: String,
target_fingerprint: String,
source_relay: String,
},
/// Confirm that the forwarding session has been set up on the peer relay.
/// The `room_name` tells the source relay which room to address media to.
SessionForwardAck {
session_id: String,
room_name: String,
},
}
/// Reasons for ending a call.
@@ -776,6 +830,40 @@ mod tests {
assert!(matches!(decoded, SignalMessage::TransferAck));
}
#[test]
fn presence_update_signal_roundtrip() {
let msg = SignalMessage::PresenceUpdate {
fingerprints: vec!["aabb".to_string(), "ccdd".to_string()],
relay_addr: "10.0.0.1:4433".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::PresenceUpdate { fingerprints, relay_addr } => {
assert_eq!(fingerprints.len(), 2);
assert!(fingerprints.contains(&"aabb".to_string()));
assert!(fingerprints.contains(&"ccdd".to_string()));
assert_eq!(relay_addr, "10.0.0.1:4433");
}
_ => panic!("expected PresenceUpdate variant"),
}
// Empty fingerprints list
let msg_empty = SignalMessage::PresenceUpdate {
fingerprints: vec![],
relay_addr: "10.0.0.2:4433".to_string(),
};
let json = serde_json::to_string(&msg_empty).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::PresenceUpdate { fingerprints, relay_addr } => {
assert!(fingerprints.is_empty());
assert_eq!(relay_addr, "10.0.0.2:4433");
}
_ => panic!("expected PresenceUpdate variant"),
}
}
#[test]
fn fec_ratio_encode_decode() {
let ratio = 0.5;

View File

@@ -25,7 +25,9 @@ serde_json = "1"
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
quinn = { workspace = true }
prometheus = "0.13"
axum = { version = "0.7", default-features = false, features = ["tokio", "http1"] }
axum = { version = "0.7", default-features = false, features = ["tokio", "http1", "ws"] }
tower-http = { version = "0.6", features = ["fs"] }
futures-util = "0.3"
[[bin]]
name = "wzp-relay"

View File

@@ -39,6 +39,11 @@ pub struct RelayConfig {
/// reducing per-packet QUIC datagram overhead.
#[serde(default)]
pub trunking_enabled: bool,
/// Port for the WebSocket listener (browser clients connect here).
/// If None, WebSocket support is disabled.
pub ws_port: Option<u16>,
/// Directory to serve static files from (HTML/JS/WASM for web clients).
pub static_dir: Option<String>,
}
impl Default for RelayConfig {
@@ -55,6 +60,8 @@ impl Default for RelayConfig {
probe_targets: Vec::new(),
probe_mesh: false,
trunking_enabled: false,
ws_port: None,
static_dir: None,
}
}
}

View File

@@ -12,10 +12,14 @@ pub mod config;
pub mod handshake;
pub mod metrics;
pub mod pipeline;
pub mod presence;
pub mod probe;
pub mod relay_link;
pub mod room;
pub mod route;
pub mod session_mgr;
pub mod trunk;
pub mod ws;
pub use config::RelayConfig;
pub use handshake::accept_handshake;

View File

@@ -19,6 +19,7 @@ use wzp_proto::MediaTransport;
use wzp_relay::config::RelayConfig;
use wzp_relay::metrics::RelayMetrics;
use wzp_relay::pipeline::{PipelineConfig, RelayPipeline};
use wzp_relay::presence::PresenceRegistry;
use wzp_relay::room::{self, RoomManager};
use wzp_relay::session_mgr::SessionManager;
@@ -67,6 +68,19 @@ fn parse_args() -> RelayConfig {
"--trunking" => {
config.trunking_enabled = true;
}
"--ws-port" => {
i += 1;
config.ws_port = Some(
args.get(i).expect("--ws-port requires a port number")
.parse().expect("invalid --ws-port number"),
);
}
"--static-dir" => {
i += 1;
config.static_dir = Some(
args.get(i).expect("--static-dir requires a directory path").to_string(),
);
}
"--mesh-status" => {
// Print mesh table from a fresh registry and exit.
// In practice this is useful after the relay has been running;
@@ -88,6 +102,8 @@ fn parse_args() -> RelayConfig {
eprintln!(" --probe-mesh Enable mesh mode (mark config flag, probes all --probe targets).");
eprintln!(" --mesh-status Print mesh health table and exit (diagnostic).");
eprintln!(" --trunking Enable trunk batching for outgoing media in room mode.");
eprintln!(" --ws-port <port> WebSocket listener port for browser clients (e.g., 8080).");
eprintln!(" --static-dir <dir> Directory to serve static files from (HTML/JS/WASM).");
eprintln!();
eprintln!("Room mode (default):");
eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU).");
@@ -176,11 +192,19 @@ async fn main() -> anyhow::Result<()> {
.install_default()
.expect("failed to install rustls crypto provider");
// Presence registry
let presence = Arc::new(Mutex::new(PresenceRegistry::new()));
// Route resolver
let route_resolver = Arc::new(wzp_relay::route::RouteResolver::new(config.listen_addr));
// Prometheus metrics
let metrics = Arc::new(RelayMetrics::new());
if let Some(port) = config.metrics_port {
let m = metrics.clone();
tokio::spawn(wzp_relay::metrics::serve_metrics(port, m));
let p = Some(presence.clone());
let rr = Some(route_resolver.clone());
tokio::spawn(wzp_relay::metrics::serve_metrics(port, m, p, rr));
}
// Generate ephemeral relay identity for crypto handshake
@@ -214,6 +238,7 @@ async fn main() -> anyhow::Result<()> {
let mesh = wzp_relay::probe::ProbeMesh::new(
config.probe_targets.clone(),
metrics.registry(),
Some(presence.clone()),
);
info!(
targets = mesh.target_count(),
@@ -223,6 +248,20 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(async move { mesh.run_all().await });
}
// WebSocket server for browser clients
if let Some(ws_port) = config.ws_port {
let ws_state = wzp_relay::ws::WsState {
room_mgr: room_mgr.clone(),
session_mgr: session_mgr.clone(),
auth_url: config.auth_url.clone(),
metrics: metrics.clone(),
presence: presence.clone(),
};
let static_dir = config.static_dir.clone();
tokio::spawn(wzp_relay::ws::run_ws_server(ws_port, ws_state, static_dir));
info!(ws_port, "WebSocket listener enabled for browser clients");
}
if let Some(ref url) = config.auth_url {
info!(url, "auth enabled — clients must present featherChat token");
} else {
@@ -244,6 +283,8 @@ async fn main() -> anyhow::Result<()> {
let relay_seed_bytes = relay_seed.0;
let metrics = metrics.clone();
let trunking_enabled = config.trunking_enabled;
let presence = presence.clone();
let route_resolver = route_resolver.clone();
tokio::spawn(async move {
let addr = connection.remote_address();
@@ -259,9 +300,9 @@ async fn main() -> anyhow::Result<()> {
let transport = Arc::new(wzp_transport::QuinnTransport::new(connection));
// Probe connections use SNI "_probe" to identify themselves.
// They skip auth + handshake and just do Ping->Pong.
// They skip auth + handshake and just do Ping->Pong + presence gossip.
if room_name == "_probe" {
info!(%addr, "probe connection detected, entering Ping/Pong responder");
info!(%addr, "probe connection detected, entering Ping/Pong + presence responder");
loop {
match transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::Ping { timestamp_ms })) => {
@@ -272,8 +313,63 @@ async fn main() -> anyhow::Result<()> {
break;
}
}
Ok(Some(wzp_proto::SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => {
// A peer relay is telling us which fingerprints it has
let peer_addr: std::net::SocketAddr = relay_addr.parse().unwrap_or(addr);
let fps: std::collections::HashSet<String> = fingerprints.into_iter().collect();
{
let mut reg = presence.lock().await;
reg.update_peer(peer_addr, fps);
}
// Reply with our own local fingerprints
let local_fps: Vec<String> = {
let reg = presence.lock().await;
reg.local_fingerprints().into_iter().collect()
};
let reply = wzp_proto::SignalMessage::PresenceUpdate {
fingerprints: local_fps,
relay_addr: addr.to_string(),
};
if let Err(e) = transport.send_signal(&reply).await {
error!(%addr, "presence reply send error: {e}");
break;
}
}
Ok(Some(wzp_proto::SignalMessage::RouteQuery { fingerprint, ttl })) => {
// Look up the fingerprint in our local registry
let reg = presence.lock().await;
let route = route_resolver.resolve(&reg, &fingerprint);
drop(reg);
let (found, relay_chain) = match route {
wzp_relay::route::Route::Local => {
(true, vec![route_resolver.local_addr().to_string()])
}
wzp_relay::route::Route::DirectPeer(peer_addr) => {
(true, vec![route_resolver.local_addr().to_string(), peer_addr.to_string()])
}
_ => {
// Not found locally; if ttl > 0 we could forward
// to other peers (future multi-hop). For now, reply not found.
if ttl > 0 {
// TODO: forward RouteQuery to other peers with ttl-1
}
(false, vec![])
}
};
let reply = wzp_proto::SignalMessage::RouteResponse {
fingerprint,
found,
relay_chain,
};
if let Err(e) = transport.send_signal(&reply).await {
error!(%addr, "route response send error: {e}");
break;
}
}
Ok(Some(_)) => {
// Ignore non-Ping signals on probe connections
// Ignore other signals on probe connections
}
Ok(None) => {
info!(%addr, "probe connection closed");
@@ -352,6 +448,12 @@ async fn main() -> anyhow::Result<()> {
}
};
// Register in presence registry
if let Some(ref fp) = authenticated_fp {
let mut reg = presence.lock().await;
reg.register_local(fp, None, Some(room_name.clone()));
}
info!(%addr, room = %room_name, "client joining");
if let Some(remote) = remote_transport {
@@ -400,7 +502,7 @@ async fn main() -> anyhow::Result<()> {
let participant_id = {
let mut mgr = room_mgr.lock().await;
match mgr.join(&room_name, addr, transport.clone(), authenticated_fp.as_deref()) {
match mgr.join(&room_name, addr, room::ParticipantSender::Quic(transport.clone()), authenticated_fp.as_deref()) {
Ok(id) => {
metrics.active_rooms.set(mgr.list().len() as i64);
id
@@ -431,7 +533,11 @@ async fn main() -> anyhow::Result<()> {
trunking_enabled,
).await;
// Participant disconnected — clean up per-session metrics
// Participant disconnected — clean up presence + per-session metrics
if let Some(ref fp) = authenticated_fp {
let mut reg = presence.lock().await;
reg.unregister_local(fp);
}
metrics.remove_session_metrics(&session_id_str);
metrics.active_sessions.dec();
{

View File

@@ -201,11 +201,21 @@ impl RelayMetrics {
}
}
/// Start an HTTP server serving GET /metrics and GET /mesh on the given port.
pub async fn serve_metrics(port: u16, metrics: Arc<RelayMetrics>) {
use axum::{routing::get, Router};
/// Start an HTTP server serving GET /metrics, GET /mesh, presence, and route endpoints on the given port.
pub async fn serve_metrics(
port: u16,
metrics: Arc<RelayMetrics>,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
route_resolver: Option<Arc<crate::route::RouteResolver>>,
) {
use axum::{extract::Path, routing::get, Router};
let metrics_clone = metrics.clone();
let presence_all = presence.clone();
let presence_lookup = presence.clone();
let presence_peers = presence.clone();
let presence_route = presence;
let app = Router::new()
.route(
"/metrics",
@@ -220,6 +230,92 @@ pub async fn serve_metrics(port: u16, metrics: Arc<RelayMetrics>) {
let m = metrics_clone.clone();
async move { crate::probe::mesh_summary(m.registry()) }
}),
)
.route(
"/presence",
get(move || {
let reg = presence_all.clone();
async move {
match reg {
Some(r) => {
let r = r.lock().await;
let entries: Vec<serde_json::Value> = r.all_known().into_iter().map(|(fp, loc)| {
serde_json::json!({ "fingerprint": fp, "location": loc })
}).collect();
serde_json::to_string_pretty(&entries).unwrap_or_else(|_| "[]".to_string())
}
None => "[]".to_string(),
}
}
}),
)
.route(
"/presence/:fingerprint",
get(move |Path(fingerprint): Path<String>| {
let reg = presence_lookup.clone();
async move {
match reg {
Some(r) => {
let r = r.lock().await;
match r.lookup(&fingerprint) {
Some(loc) => serde_json::to_string_pretty(
&serde_json::json!({ "fingerprint": fingerprint, "location": loc })
).unwrap_or_else(|_| "{}".to_string()),
None => serde_json::json!({ "fingerprint": fingerprint, "location": null }).to_string(),
}
}
None => serde_json::json!({ "fingerprint": fingerprint, "location": null }).to_string(),
}
}
}),
)
.route(
"/peers",
get(move || {
let reg = presence_peers.clone();
async move {
match reg {
Some(r) => {
let r = r.lock().await;
let peers: Vec<serde_json::Value> = r.peers().iter().map(|(addr, peer)| {
serde_json::json!({
"addr": addr.to_string(),
"fingerprints": peer.fingerprints.iter().collect::<Vec<_>>(),
"rtt_ms": peer.rtt_ms,
})
}).collect();
serde_json::to_string_pretty(&peers).unwrap_or_else(|_| "[]".to_string())
}
None => "[]".to_string(),
}
}
}),
)
.route(
"/route/:fingerprint",
get(move |Path(fingerprint): Path<String>| {
let reg = presence_route.clone();
let resolver = route_resolver.clone();
async move {
match (reg, resolver) {
(Some(r), Some(res)) => {
let r = r.lock().await;
let route = res.resolve(&r, &fingerprint);
let json = res.route_json(&fingerprint, &route);
serde_json::to_string_pretty(&json)
.unwrap_or_else(|_| "{}".to_string())
}
_ => {
serde_json::json!({
"fingerprint": fingerprint,
"route": "not_found",
"relay_chain": [],
})
.to_string()
}
}
}
}),
);
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));

View File

@@ -0,0 +1,333 @@
//! Presence registry — tracks which fingerprints are connected to this relay
//! and to peer relays (via gossip over probe connections).
//!
//! This enables route resolution: given a fingerprint, determine whether the
//! user is local, on a known peer relay, or unknown.
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use serde::Serialize;
// ---------------------------------------------------------------------------
// Data structures
// ---------------------------------------------------------------------------
/// Where a fingerprint is connected.
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub enum PresenceLocation {
/// Connected directly to this relay.
Local,
/// Connected to a peer relay at the given address.
Remote(SocketAddr),
}
/// Presence entry for a fingerprint connected directly to this relay.
#[derive(Clone, Debug)]
pub struct LocalPresence {
pub fingerprint: String,
pub alias: Option<String>,
pub connected_at: Instant,
pub room: Option<String>,
}
/// Presence entry for a fingerprint reported by a peer relay.
#[derive(Clone, Debug)]
pub struct RemotePresence {
pub fingerprint: String,
pub relay_addr: SocketAddr,
pub last_seen: Instant,
}
/// Known peer relay and its reported fingerprints.
#[derive(Clone, Debug)]
pub struct PeerRelay {
pub addr: SocketAddr,
pub fingerprints: HashSet<String>,
pub last_update: Instant,
pub rtt_ms: Option<f64>,
}
// ---------------------------------------------------------------------------
// Registry
// ---------------------------------------------------------------------------
/// Central presence registry tracking local and remote fingerprints.
pub struct PresenceRegistry {
/// Fingerprints connected directly to THIS relay.
local: HashMap<String, LocalPresence>,
/// Fingerprints reported by peer relays (via gossip).
remote: HashMap<String, RemotePresence>,
/// Known peer relays and their status.
peers: HashMap<SocketAddr, PeerRelay>,
}
impl PresenceRegistry {
/// Create an empty registry.
pub fn new() -> Self {
Self {
local: HashMap::new(),
remote: HashMap::new(),
peers: HashMap::new(),
}
}
/// Register a fingerprint as locally connected (called after auth + handshake).
pub fn register_local(&mut self, fingerprint: &str, alias: Option<String>, room: Option<String>) {
self.local.insert(fingerprint.to_string(), LocalPresence {
fingerprint: fingerprint.to_string(),
alias,
connected_at: Instant::now(),
room,
});
}
/// Unregister a locally connected fingerprint (called on disconnect).
pub fn unregister_local(&mut self, fingerprint: &str) {
self.local.remove(fingerprint);
}
/// Update the fingerprints reported by a peer relay.
/// Replaces the previous set for that peer.
pub fn update_peer(&mut self, addr: SocketAddr, fingerprints: HashSet<String>) {
let now = Instant::now();
// Remove old remote entries that belonged to this peer
self.remote.retain(|_, rp| rp.relay_addr != addr);
// Insert new remote entries
for fp in &fingerprints {
self.remote.insert(fp.clone(), RemotePresence {
fingerprint: fp.clone(),
relay_addr: addr,
last_seen: now,
});
}
// Update the peer record
let peer = self.peers.entry(addr).or_insert_with(|| PeerRelay {
addr,
fingerprints: HashSet::new(),
last_update: now,
rtt_ms: None,
});
peer.fingerprints = fingerprints;
peer.last_update = now;
}
/// Look up where a fingerprint is connected.
/// Local presence takes priority over remote.
pub fn lookup(&self, fingerprint: &str) -> Option<PresenceLocation> {
if self.local.contains_key(fingerprint) {
return Some(PresenceLocation::Local);
}
if let Some(rp) = self.remote.get(fingerprint) {
return Some(PresenceLocation::Remote(rp.relay_addr));
}
None
}
/// Return all fingerprints connected directly to this relay.
pub fn local_fingerprints(&self) -> HashSet<String> {
self.local.keys().cloned().collect()
}
/// Return a full dump of every known fingerprint and its location.
pub fn all_known(&self) -> Vec<(String, PresenceLocation)> {
let mut out = Vec::new();
for fp in self.local.keys() {
out.push((fp.clone(), PresenceLocation::Local));
}
for (fp, rp) in &self.remote {
// Skip if also local (local wins)
if !self.local.contains_key(fp) {
out.push((fp.clone(), PresenceLocation::Remote(rp.relay_addr)));
}
}
out
}
/// Remove remote entries older than `timeout`.
pub fn expire_stale(&mut self, timeout: Duration) {
let cutoff = Instant::now() - timeout;
// Expire remote presence entries
self.remote.retain(|_, rp| rp.last_seen > cutoff);
// Expire peer relay records and their fingerprint sets
let stale_peers: Vec<SocketAddr> = self.peers
.iter()
.filter(|(_, p)| p.last_update <= cutoff)
.map(|(addr, _)| *addr)
.collect();
for addr in stale_peers {
self.peers.remove(&addr);
}
}
/// Return a reference to the peer relay map (for HTTP API).
pub fn peers(&self) -> &HashMap<SocketAddr, PeerRelay> {
&self.peers
}
/// Return a reference to the local presence map (for HTTP API).
pub fn local_entries(&self) -> &HashMap<String, LocalPresence> {
&self.local
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
fn addr(s: &str) -> SocketAddr {
s.parse().unwrap()
}
#[test]
fn register_and_lookup_local() {
let mut reg = PresenceRegistry::new();
reg.register_local("aabbccdd", Some("alice".into()), Some("room1".into()));
assert_eq!(reg.lookup("aabbccdd"), Some(PresenceLocation::Local));
// Unknown fingerprint returns None
assert_eq!(reg.lookup("00000000"), None);
}
#[test]
fn unregister_removes() {
let mut reg = PresenceRegistry::new();
reg.register_local("aabbccdd", None, None);
assert_eq!(reg.lookup("aabbccdd"), Some(PresenceLocation::Local));
reg.unregister_local("aabbccdd");
assert_eq!(reg.lookup("aabbccdd"), None);
}
#[test]
fn update_peer_and_lookup() {
let mut reg = PresenceRegistry::new();
let peer = addr("10.0.0.2:4433");
let mut fps = HashSet::new();
fps.insert("deadbeef".to_string());
fps.insert("cafebabe".to_string());
reg.update_peer(peer, fps);
assert_eq!(reg.lookup("deadbeef"), Some(PresenceLocation::Remote(peer)));
assert_eq!(reg.lookup("cafebabe"), Some(PresenceLocation::Remote(peer)));
assert_eq!(reg.lookup("unknown"), None);
}
#[test]
fn expire_stale_removes_old() {
let mut reg = PresenceRegistry::new();
let peer = addr("10.0.0.3:4433");
let mut fps = HashSet::new();
fps.insert("olduser".to_string());
reg.update_peer(peer, fps);
// Verify it's there
assert_eq!(reg.lookup("olduser"), Some(PresenceLocation::Remote(peer)));
// Manually backdate the last_seen and last_update
if let Some(rp) = reg.remote.get_mut("olduser") {
rp.last_seen = Instant::now() - Duration::from_secs(120);
}
if let Some(p) = reg.peers.get_mut(&peer) {
p.last_update = Instant::now() - Duration::from_secs(120);
}
// Expire with 60s timeout — should remove the 120s-old entries
reg.expire_stale(Duration::from_secs(60));
assert_eq!(reg.lookup("olduser"), None);
assert!(reg.peers.get(&peer).is_none());
}
#[test]
fn local_fingerprints_list() {
let mut reg = PresenceRegistry::new();
reg.register_local("fp1", None, None);
reg.register_local("fp2", Some("bob".into()), Some("room-a".into()));
reg.register_local("fp3", None, None);
let fps = reg.local_fingerprints();
assert_eq!(fps.len(), 3);
assert!(fps.contains("fp1"));
assert!(fps.contains("fp2"));
assert!(fps.contains("fp3"));
}
#[test]
fn all_known_includes_local_and_remote() {
let mut reg = PresenceRegistry::new();
reg.register_local("local1", None, None);
let peer = addr("10.0.0.5:4433");
let mut fps = HashSet::new();
fps.insert("remote1".to_string());
reg.update_peer(peer, fps);
let all = reg.all_known();
assert_eq!(all.len(), 2);
let local_entries: Vec<_> = all.iter()
.filter(|(_, loc)| *loc == PresenceLocation::Local)
.collect();
assert_eq!(local_entries.len(), 1);
assert_eq!(local_entries[0].0, "local1");
let remote_entries: Vec<_> = all.iter()
.filter(|(_, loc)| matches!(loc, PresenceLocation::Remote(_)))
.collect();
assert_eq!(remote_entries.len(), 1);
assert_eq!(remote_entries[0].0, "remote1");
}
#[test]
fn local_overrides_remote_in_lookup() {
let mut reg = PresenceRegistry::new();
let peer = addr("10.0.0.6:4433");
// Register as remote first
let mut fps = HashSet::new();
fps.insert("dupfp".to_string());
reg.update_peer(peer, fps);
assert_eq!(reg.lookup("dupfp"), Some(PresenceLocation::Remote(peer)));
// Now register locally — local should win
reg.register_local("dupfp", None, None);
assert_eq!(reg.lookup("dupfp"), Some(PresenceLocation::Local));
}
#[test]
fn update_peer_replaces_old_fingerprints() {
let mut reg = PresenceRegistry::new();
let peer = addr("10.0.0.7:4433");
let mut fps1 = HashSet::new();
fps1.insert("user_a".to_string());
fps1.insert("user_b".to_string());
reg.update_peer(peer, fps1);
assert_eq!(reg.lookup("user_a"), Some(PresenceLocation::Remote(peer)));
assert_eq!(reg.lookup("user_b"), Some(PresenceLocation::Remote(peer)));
// Update with only user_b — user_a should be gone
let mut fps2 = HashSet::new();
fps2.insert("user_b".to_string());
reg.update_peer(peer, fps2);
assert_eq!(reg.lookup("user_a"), None);
assert_eq!(reg.lookup("user_b"), Some(PresenceLocation::Remote(peer)));
}
}

View File

@@ -156,14 +156,19 @@ impl SlidingWindow {
pub struct ProbeRunner {
config: ProbeConfig,
metrics: ProbeMetrics,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
}
impl ProbeRunner {
/// Create a new probe runner, registering metrics with the given registry.
pub fn new(config: ProbeConfig, registry: &Registry) -> Self {
pub fn new(
config: ProbeConfig,
registry: &Registry,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
) -> Self {
let target_str = config.target.to_string();
let metrics = ProbeMetrics::register(&target_str, registry);
Self { config, metrics }
Self { config, metrics, presence }
}
/// Run the probe forever. This function never returns under normal operation.
@@ -215,6 +220,8 @@ impl ProbeRunner {
let jitter_gauge = self.metrics.jitter_ms.clone();
let up_gauge = self.metrics.up.clone();
let recv_presence = self.presence.clone();
let recv_target = self.config.target;
let recv_handle = tokio::spawn(async move {
loop {
match recv_transport.recv_signal().await {
@@ -230,8 +237,17 @@ impl ProbeRunner {
loss_gauge.set(w.loss_pct());
jitter_gauge.set(w.jitter_ms());
}
Ok(Some(SignalMessage::PresenceUpdate { fingerprints, relay_addr })) => {
if let Some(ref reg) = recv_presence {
// Parse the relay_addr; fall back to the connection target
let addr = relay_addr.parse().unwrap_or(recv_target);
let fps: std::collections::HashSet<String> = fingerprints.into_iter().collect();
let mut r = reg.lock().await;
r.update_peer(addr, fps);
}
}
Ok(Some(_)) => {
// Ignore non-Pong signals
// Ignore other signals
}
Ok(None) => {
info!("probe recv: connection closed");
@@ -247,8 +263,9 @@ impl ProbeRunner {
}
});
// Send ping loop
// Send ping loop (+ presence gossip every 10 pings)
let mut interval = tokio::time::interval(self.config.interval);
let mut ping_count: u64 = 0;
loop {
interval.tick().await;
@@ -275,6 +292,24 @@ impl ProbeRunner {
recv_handle.abort();
return Err(e.into());
}
// Send presence update every 10 pings (~10 seconds)
ping_count += 1;
if ping_count % 10 == 0 {
if let Some(ref reg) = self.presence {
let fps: Vec<String> = {
let r = reg.lock().await;
r.local_fingerprints().into_iter().collect()
};
let msg = SignalMessage::PresenceUpdate {
fingerprints: fps,
relay_addr: self.config.target.to_string(),
};
if let Err(e) = transport.send_signal(&msg).await {
warn!(target = %self.config.target, "presence update send error: {e}");
}
}
}
}
}
}
@@ -289,12 +324,16 @@ pub struct ProbeMesh {
impl ProbeMesh {
/// Create a new mesh coordinator, registering metrics for every target.
pub fn new(targets: Vec<SocketAddr>, registry: &Registry) -> Self {
pub fn new(
targets: Vec<SocketAddr>,
registry: &Registry,
presence: Option<Arc<tokio::sync::Mutex<crate::presence::PresenceRegistry>>>,
) -> Self {
let runners = targets
.into_iter()
.map(|addr| {
let config = ProbeConfig::new(addr);
ProbeRunner::new(config, registry)
ProbeRunner::new(config, registry, presence.clone())
})
.collect();
Self { runners }
@@ -409,6 +448,7 @@ mod tests {
fn probe_metrics_register() {
let registry = Registry::new();
let _metrics = ProbeMetrics::register("127.0.0.1:4433", &registry);
// (ProbeRunner::new signature changed but this test only checks ProbeMetrics)
let encoder = prometheus::TextEncoder::new();
let families = registry.gather();
@@ -526,7 +566,7 @@ mod tests {
"127.0.0.2:4433".parse().unwrap(),
"127.0.0.3:4433".parse().unwrap(),
];
let mesh = ProbeMesh::new(targets, &registry);
let mesh = ProbeMesh::new(targets, &registry, None);
assert_eq!(mesh.target_count(), 3);
// Verify metrics were registered for each target
@@ -586,7 +626,7 @@ mod tests {
#[test]
fn mesh_zero_targets() {
let registry = Registry::new();
let mesh = ProbeMesh::new(vec![], &registry);
let mesh = ProbeMesh::new(vec![], &registry, None);
assert_eq!(mesh.target_count(), 0);
}
}

View File

@@ -0,0 +1,483 @@
//! Per-session relay forwarding — connect to a peer relay and forward only
//! specific sessions' media packets there.
//!
//! This is the building block for relay chaining (multi-hop calls). Instead
//! of forwarding ALL traffic to a single hardcoded relay (forward mode) or
//! to everyone in a room (SFU mode), a `RelayLink` represents a QUIC
//! connection to one peer relay used for forwarding a specific set of
//! sessions.
//!
//! `RelayLinkManager` tracks all active relay links and their session
//! assignments, providing get-or-connect semantics and idle cleanup.
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{debug, info, warn};
use wzp_proto::MediaPacket;
use wzp_proto::MediaTransport;
/// A connection to a peer relay for forwarding specific sessions.
///
/// Each `RelayLink` holds a QUIC transport to one peer relay and tracks
/// which session IDs are being forwarded through it. When all sessions
/// are removed the link is considered idle and can be cleaned up.
pub struct RelayLink {
target_addr: SocketAddr,
/// The underlying QUIC transport. `None` only in unit-test stubs where
/// no real connection is established.
transport: Option<Arc<wzp_transport::QuinnTransport>>,
active_sessions: HashSet<String>,
}
impl RelayLink {
/// Connect to a peer relay at `target`.
///
/// Uses the `"_relay"` SNI to signal that this is a relay-to-relay
/// connection (similar to `"_probe"` for health checks). The peer
/// should skip normal client auth/handshake for relay-SNI connections.
pub async fn connect(target: SocketAddr) -> Result<Self, anyhow::Error> {
// Create a client-only endpoint on an OS-assigned port.
let endpoint = wzp_transport::create_endpoint(
"0.0.0.0:0".parse().unwrap(),
None,
)?;
let client_cfg = wzp_transport::client_config();
let conn = wzp_transport::connect(&endpoint, target, "_relay", client_cfg).await?;
let transport = Arc::new(wzp_transport::QuinnTransport::new(conn));
info!(%target, "relay link established");
Ok(Self {
target_addr: target,
transport: Some(transport),
active_sessions: HashSet::new(),
})
}
/// Create a `RelayLink` from an existing transport (useful when the
/// connection was established through other means).
pub fn from_transport(
target_addr: SocketAddr,
transport: Arc<wzp_transport::QuinnTransport>,
) -> Self {
Self {
target_addr,
transport: Some(transport),
active_sessions: HashSet::new(),
}
}
/// Create a stub `RelayLink` with no transport — for unit tests that
/// only exercise session-tracking / management logic.
#[cfg(test)]
fn stub(target_addr: SocketAddr) -> Self {
Self {
target_addr,
transport: None,
active_sessions: HashSet::new(),
}
}
/// Forward a media packet to this peer relay.
pub async fn forward(&self, pkt: &MediaPacket) -> Result<(), anyhow::Error> {
match &self.transport {
Some(t) => t
.send_media(pkt)
.await
.map_err(|e| anyhow::anyhow!("relay link forward to {}: {e}", self.target_addr)),
None => Err(anyhow::anyhow!(
"relay link to {} has no transport (stub)",
self.target_addr
)),
}
}
/// The address of the peer relay this link connects to.
pub fn target_addr(&self) -> SocketAddr {
self.target_addr
}
/// A reference to the underlying QUIC transport (if connected).
pub fn transport(&self) -> Option<&Arc<wzp_transport::QuinnTransport>> {
self.transport.as_ref()
}
/// Add a session to be forwarded through this link.
pub fn add_session(&mut self, session_id: &str) {
if self.active_sessions.insert(session_id.to_string()) {
debug!(
target_relay = %self.target_addr,
session = session_id,
count = self.active_sessions.len(),
"session added to relay link"
);
}
}
/// Remove a session from this link.
pub fn remove_session(&mut self, session_id: &str) {
if self.active_sessions.remove(session_id) {
debug!(
target_relay = %self.target_addr,
session = session_id,
count = self.active_sessions.len(),
"session removed from relay link"
);
}
}
/// Check if this link is forwarding any sessions.
pub fn is_idle(&self) -> bool {
self.active_sessions.is_empty()
}
/// Number of sessions being forwarded through this link.
pub fn session_count(&self) -> usize {
self.active_sessions.len()
}
/// Check if a specific session is being forwarded through this link.
pub fn has_session(&self, session_id: &str) -> bool {
self.active_sessions.contains(session_id)
}
/// Close the underlying QUIC connection (no-op if no transport).
pub async fn close(&self) {
info!(target_relay = %self.target_addr, "closing relay link");
if let Some(ref t) = self.transport {
let _ = t.close().await;
}
}
}
// ---------------------------------------------------------------------------
// RelayLinkManager
// ---------------------------------------------------------------------------
/// Manages connections to multiple peer relays for per-session forwarding.
///
/// Each peer relay gets at most one `RelayLink`. Sessions are registered
/// on specific links, and idle links (no sessions) can be cleaned up.
pub struct RelayLinkManager {
links: HashMap<SocketAddr, RelayLink>,
}
impl RelayLinkManager {
/// Create an empty link manager.
pub fn new() -> Self {
Self {
links: HashMap::new(),
}
}
/// Get or create a link to a peer relay.
///
/// If a link already exists it is returned. Otherwise a new QUIC
/// connection is established using `RelayLink::connect`.
pub async fn get_or_connect(
&mut self,
target: SocketAddr,
) -> Result<&RelayLink, anyhow::Error> {
if !self.links.contains_key(&target) {
let link = RelayLink::connect(target).await?;
self.links.insert(target, link);
}
Ok(self.links.get(&target).unwrap())
}
/// Get a mutable reference to an existing link (if any).
pub fn get_mut(&mut self, target: &SocketAddr) -> Option<&mut RelayLink> {
self.links.get_mut(target)
}
/// Get a reference to an existing link (if any).
pub fn get(&self, target: &SocketAddr) -> Option<&RelayLink> {
self.links.get(target)
}
/// Forward a packet for a specific session to the appropriate relay.
///
/// The link must already exist (created via `get_or_connect`).
pub async fn forward_to(
&self,
target: SocketAddr,
pkt: &MediaPacket,
) -> Result<(), anyhow::Error> {
match self.links.get(&target) {
Some(link) => link.forward(pkt).await,
None => Err(anyhow::anyhow!(
"no relay link to {target} — call get_or_connect first"
)),
}
}
/// Register a session on a specific link.
///
/// The link must already exist. If it does not, a warning is logged
/// and the registration is silently skipped.
pub fn register_session(&mut self, target: SocketAddr, session_id: &str) {
match self.links.get_mut(&target) {
Some(link) => link.add_session(session_id),
None => {
warn!(
%target,
session = session_id,
"cannot register session — no link to target"
);
}
}
}
/// Unregister a session. If the link becomes idle, close and remove it.
pub async fn unregister_session(&mut self, target: SocketAddr, session_id: &str) {
let should_remove = if let Some(link) = self.links.get_mut(&target) {
link.remove_session(session_id);
if link.is_idle() {
link.close().await;
true
} else {
false
}
} else {
false
};
if should_remove {
self.links.remove(&target);
info!(%target, "idle relay link removed");
}
}
/// Close all links and clear the manager.
pub async fn close_all(&mut self) {
for (addr, link) in self.links.drain() {
info!(%addr, "closing relay link (shutdown)");
link.close().await;
}
}
/// Number of active links.
pub fn link_count(&self) -> usize {
self.links.len()
}
/// Total number of sessions being forwarded across all links.
pub fn session_count(&self) -> usize {
self.links.values().map(|l| l.session_count()).sum()
}
/// Insert a pre-built relay link (for testing or manual setup).
pub fn insert(&mut self, link: RelayLink) {
self.links.insert(link.target_addr(), link);
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
fn addr(s: &str) -> SocketAddr {
s.parse().unwrap()
}
// ---------- RelayLink session tracking ----------
#[test]
fn link_manager_tracks_sessions() {
let mut mgr = RelayLinkManager::new();
let target1 = addr("10.0.0.2:4433");
let mut link = RelayLink::stub(target1);
link.add_session("session-aaa");
link.add_session("session-bbb");
mgr.insert(link);
assert_eq!(mgr.link_count(), 1);
assert_eq!(mgr.session_count(), 2);
// Register another session on the same link
mgr.register_session(target1, "session-ccc");
assert_eq!(mgr.session_count(), 3);
// Verify individual link
let link_ref = mgr.get(&target1).unwrap();
assert!(link_ref.has_session("session-aaa"));
assert!(link_ref.has_session("session-bbb"));
assert!(link_ref.has_session("session-ccc"));
assert!(!link_ref.has_session("unknown"));
}
#[test]
fn link_manager_idle_detection() {
let mut link = RelayLink::stub(addr("10.0.0.3:4433"));
// Empty link is idle
assert!(link.is_idle());
assert_eq!(link.session_count(), 0);
// Add a session — no longer idle
link.add_session("sess-1");
assert!(!link.is_idle());
assert_eq!(link.session_count(), 1);
// Remove it — idle again
link.remove_session("sess-1");
assert!(link.is_idle());
assert_eq!(link.session_count(), 0);
}
#[test]
fn session_forward_signal_roundtrip() {
use wzp_proto::SignalMessage;
// SessionForward roundtrip
let msg = SignalMessage::SessionForward {
session_id: "abcd1234".to_string(),
target_fingerprint: "deadbeef".to_string(),
source_relay: "10.0.0.1:4433".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::SessionForward {
session_id,
target_fingerprint,
source_relay,
} => {
assert_eq!(session_id, "abcd1234");
assert_eq!(target_fingerprint, "deadbeef");
assert_eq!(source_relay, "10.0.0.1:4433");
}
_ => panic!("expected SessionForward variant"),
}
// SessionForwardAck roundtrip
let ack = SignalMessage::SessionForwardAck {
session_id: "abcd1234".to_string(),
room_name: "relay-room-42".to_string(),
};
let json = serde_json::to_string(&ack).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
match decoded {
SignalMessage::SessionForwardAck {
session_id,
room_name,
} => {
assert_eq!(session_id, "abcd1234");
assert_eq!(room_name, "relay-room-42");
}
_ => panic!("expected SessionForwardAck variant"),
}
}
#[test]
fn link_manager_multi_target() {
let mut mgr = RelayLinkManager::new();
let target_a = addr("10.0.0.2:4433");
let target_b = addr("10.0.0.3:4433");
let target_c = addr("10.0.0.4:4433");
for (target, sessions) in [
(target_a, vec!["s1", "s2"]),
(target_b, vec!["s3"]),
(target_c, vec!["s4", "s5", "s6"]),
] {
let mut link = RelayLink::stub(target);
for s in sessions {
link.add_session(s);
}
mgr.insert(link);
}
assert_eq!(mgr.link_count(), 3);
assert_eq!(mgr.session_count(), 6); // 2 + 1 + 3
assert_eq!(mgr.get(&target_a).unwrap().session_count(), 2);
assert_eq!(mgr.get(&target_b).unwrap().session_count(), 1);
assert_eq!(mgr.get(&target_c).unwrap().session_count(), 3);
}
#[test]
fn link_manager_cleanup() {
let mut mgr = RelayLinkManager::new();
let target = addr("10.0.0.5:4433");
let mut link = RelayLink::stub(target);
link.add_session("s1");
link.add_session("s2");
link.add_session("s3");
mgr.insert(link);
assert_eq!(mgr.link_count(), 1);
assert_eq!(mgr.session_count(), 3);
// Remove sessions one by one via the manager's mutable access.
// We cannot call the async unregister_session with stub links here,
// so we exercise the synchronous management path directly.
{
let link = mgr.get_mut(&target).unwrap();
link.remove_session("s1");
assert!(!link.is_idle());
link.remove_session("s2");
assert!(!link.is_idle());
link.remove_session("s3");
assert!(link.is_idle());
}
// All sessions removed — link is idle
assert_eq!(mgr.session_count(), 0);
assert!(mgr.get(&target).unwrap().is_idle());
// Simulate what unregister_session does: remove the idle link
mgr.links.remove(&target);
assert_eq!(mgr.link_count(), 0);
}
#[test]
fn register_session_on_nonexistent_link_is_noop() {
let mut mgr = RelayLinkManager::new();
// Should not panic, just warn
mgr.register_session(addr("10.0.0.99:4433"), "orphan-session");
assert_eq!(mgr.link_count(), 0);
assert_eq!(mgr.session_count(), 0);
}
#[test]
fn forward_to_nonexistent_link_errors() {
let mgr = RelayLinkManager::new();
let target = addr("10.0.0.99:4433");
let pkt = MediaPacket {
header: wzp_proto::packet::MediaHeader {
version: 0,
is_repair: false,
codec_id: wzp_proto::CodecId::Opus16k,
has_quality_report: false,
fec_ratio_encoded: 0,
seq: 1,
timestamp: 100,
fec_block: 0,
fec_symbol: 0,
reserved: 0,
csrc_count: 0,
},
payload: bytes::Bytes::from_static(b"test"),
quality_report: None,
};
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let result = rt.block_on(mgr.forward_to(target, &pkt));
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("no relay link"));
}
}

View File

@@ -27,11 +27,51 @@ fn next_id() -> ParticipantId {
NEXT_PARTICIPANT_ID.fetch_add(1, Ordering::Relaxed)
}
/// How to send data to a participant — either via QUIC transport or WebSocket channel.
#[derive(Clone)]
pub enum ParticipantSender {
Quic(Arc<wzp_transport::QuinnTransport>),
WebSocket(tokio::sync::mpsc::Sender<Bytes>),
}
impl ParticipantSender {
/// Send raw bytes to this participant.
pub async fn send_raw(&self, data: &[u8]) -> Result<(), String> {
match self {
ParticipantSender::WebSocket(tx) => {
tx.try_send(Bytes::copy_from_slice(data))
.map_err(|e| format!("ws send: {e}"))
}
ParticipantSender::Quic(transport) => {
let pkt = wzp_proto::MediaPacket {
header: wzp_proto::packet::MediaHeader::default_pcm(),
payload: Bytes::copy_from_slice(data),
quality_report: None,
};
transport.send_media(&pkt).await.map_err(|e| format!("quic send: {e}"))
}
}
}
/// Check if this is a QUIC participant.
pub fn is_quic(&self) -> bool {
matches!(self, ParticipantSender::Quic(_))
}
/// Get the QUIC transport if this is a QUIC participant.
pub fn as_quic(&self) -> Option<&Arc<wzp_transport::QuinnTransport>> {
match self {
ParticipantSender::Quic(t) => Some(t),
_ => None,
}
}
}
/// A participant in a room.
struct Participant {
id: ParticipantId,
_addr: std::net::SocketAddr,
transport: Arc<wzp_transport::QuinnTransport>,
sender: ParticipantSender,
}
/// A room holding multiple participants.
@@ -46,10 +86,10 @@ impl Room {
}
}
fn add(&mut self, addr: std::net::SocketAddr, transport: Arc<wzp_transport::QuinnTransport>) -> ParticipantId {
fn add(&mut self, addr: std::net::SocketAddr, sender: ParticipantSender) -> ParticipantId {
let id = next_id();
info!(room_size = self.participants.len() + 1, participant = id, %addr, "joined room");
self.participants.push(Participant { id, _addr: addr, transport });
self.participants.push(Participant { id, _addr: addr, sender });
id
}
@@ -58,11 +98,11 @@ impl Room {
info!(room_size = self.participants.len(), participant = id, "left room");
}
fn others(&self, exclude_id: ParticipantId) -> Vec<Arc<wzp_transport::QuinnTransport>> {
fn others(&self, exclude_id: ParticipantId) -> Vec<ParticipantSender> {
self.participants
.iter()
.filter(|p| p.id != exclude_id)
.map(|p| p.transport.clone())
.map(|p| p.sender.clone())
.collect()
}
@@ -130,7 +170,7 @@ impl RoomManager {
&mut self,
room_name: &str,
addr: std::net::SocketAddr,
transport: Arc<wzp_transport::QuinnTransport>,
sender: ParticipantSender,
fingerprint: Option<&str>,
) -> Result<ParticipantId, String> {
if !self.is_authorized(room_name, fingerprint) {
@@ -138,7 +178,18 @@ impl RoomManager {
return Err("not authorized for this room".to_string());
}
let room = self.rooms.entry(room_name.to_string()).or_insert_with(Room::new);
Ok(room.add(addr, transport))
Ok(room.add(addr, sender))
}
/// Join a room via WebSocket. Convenience wrapper around `join()`.
pub fn join_ws(
&mut self,
room_name: &str,
addr: std::net::SocketAddr,
sender: tokio::sync::mpsc::Sender<Bytes>,
fingerprint: Option<&str>,
) -> Result<ParticipantId, String> {
self.join(room_name, addr, ParticipantSender::WebSocket(sender), fingerprint)
}
/// Leave a room. Removes the room if empty.
@@ -152,12 +203,12 @@ impl RoomManager {
}
}
/// Get transports for all OTHER participants in a room.
/// Get senders for all OTHER participants in a room.
pub fn others(
&self,
room_name: &str,
participant_id: ParticipantId,
) -> Vec<Arc<wzp_transport::QuinnTransport>> {
) -> Vec<ParticipantSender> {
self.rooms
.get(room_name)
.map(|r| r.others(participant_id))
@@ -305,10 +356,14 @@ async fn run_participant_plain(
// Forward to all others
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
// Best-effort: if one send fails, continue to others
if let Err(e) = other.send_media(&pkt).await {
// Don't log every failure — they'll be cleaned up when their recv loop breaks
let _ = e;
match other {
ParticipantSender::Quic(t) => {
let _ = t.send_media(&pkt).await;
}
ParticipantSender::WebSocket(_) => {
// WS clients receive raw payload bytes
let _ = other.send_raw(&pkt.payload).await;
}
}
}
@@ -390,14 +445,22 @@ async fn run_participant_trunked(
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
let peer_addr = other.connection().remote_address();
match other {
ParticipantSender::Quic(t) => {
let peer_addr = t.connection().remote_address();
let fwd = forwarders
.entry(peer_addr)
.or_insert_with(|| TrunkedForwarder::new(other.clone(), sid_bytes));
.or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes));
if let Err(e) = fwd.send(&pkt).await {
let _ = e;
}
}
ParticipantSender::WebSocket(_) => {
// WS clients bypass trunking — send raw payload directly
let _ = other.send_raw(&pkt.payload).await;
}
}
}
let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out);

View File

@@ -0,0 +1,265 @@
//! Route resolution — given a target fingerprint, find the relay chain
//! needed to reach that user.
//!
//! Uses the [`PresenceRegistry`] as its data source. Currently supports
//! single-hop resolution (local or direct peer). The `resolve_multi_hop`
//! method has the signature for future multi-hop expansion but falls back
//! to single-hop for now.
use std::net::SocketAddr;
use serde::Serialize;
use crate::presence::{PresenceLocation, PresenceRegistry};
// ---------------------------------------------------------------------------
// Route type
// ---------------------------------------------------------------------------
/// The resolved route to a target fingerprint.
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub enum Route {
/// Target is connected to this relay directly.
Local,
/// Target is on a directly connected peer relay.
DirectPeer(SocketAddr),
/// Target is reachable via a chain of relays (multi-hop).
Chain(Vec<SocketAddr>),
/// Target not found in any known relay.
NotFound,
}
impl std::fmt::Display for Route {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Route::Local => write!(f, "local"),
Route::DirectPeer(addr) => write!(f, "direct_peer({})", addr),
Route::Chain(chain) => {
let addrs: Vec<String> = chain.iter().map(|a| a.to_string()).collect();
write!(f, "chain({})", addrs.join(" -> "))
}
Route::NotFound => write!(f, "not_found"),
}
}
}
// ---------------------------------------------------------------------------
// RouteResolver
// ---------------------------------------------------------------------------
/// Resolves fingerprints to relay routes using the presence registry.
pub struct RouteResolver {
/// Our own relay address (how peers know us).
local_addr: SocketAddr,
}
impl RouteResolver {
/// Create a new route resolver for the relay at `local_addr`.
pub fn new(local_addr: SocketAddr) -> Self {
Self { local_addr }
}
/// Our local relay address.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
/// Look up a fingerprint in the registry and return the route.
///
/// - If `registry.lookup()` returns `Local` -> `Route::Local`
/// - If returns `Remote(addr)` -> `Route::DirectPeer(addr)`
/// - If not found -> `Route::NotFound`
pub fn resolve(&self, registry: &PresenceRegistry, target_fingerprint: &str) -> Route {
match registry.lookup(target_fingerprint) {
Some(PresenceLocation::Local) => Route::Local,
Some(PresenceLocation::Remote(addr)) => Route::DirectPeer(addr),
None => Route::NotFound,
}
}
/// Multi-hop route resolution (future expansion).
///
/// For now this is equivalent to `resolve()` — single-hop only.
/// When multi-hop is implemented, this will query peers transitively
/// up to `max_hops` relays deep, using `RouteQuery` / `RouteResponse`
/// signals over probe connections.
pub fn resolve_multi_hop(
&self,
registry: &PresenceRegistry,
target: &str,
_max_hops: usize,
) -> Route {
// Phase 1: single-hop only (same as resolve).
// Future: if resolve returns NotFound and max_hops > 0,
// send RouteQuery to each known peer with ttl = max_hops - 1,
// collect RouteResponse, and build a Chain.
self.resolve(registry, target)
}
/// Build a JSON-serializable route response for the HTTP API.
pub fn route_json(
&self,
fingerprint: &str,
route: &Route,
) -> serde_json::Value {
let (route_type, relay_chain) = match route {
Route::Local => ("local", vec![self.local_addr.to_string()]),
Route::DirectPeer(addr) => ("direct_peer", vec![self.local_addr.to_string(), addr.to_string()]),
Route::Chain(chain) => {
let mut addrs = vec![self.local_addr.to_string()];
addrs.extend(chain.iter().map(|a| a.to_string()));
("chain", addrs)
}
Route::NotFound => ("not_found", vec![]),
};
serde_json::json!({
"fingerprint": fingerprint,
"route": route_type,
"relay_chain": relay_chain,
})
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::net::SocketAddr;
fn addr(s: &str) -> SocketAddr {
s.parse().unwrap()
}
fn make_resolver() -> RouteResolver {
RouteResolver::new(addr("10.0.0.1:4433"))
}
#[test]
fn resolve_local() {
let resolver = make_resolver();
let mut reg = PresenceRegistry::new();
reg.register_local("aabbccdd", Some("alice".into()), Some("room1".into()));
let route = resolver.resolve(&reg, "aabbccdd");
assert_eq!(route, Route::Local);
}
#[test]
fn resolve_direct_peer() {
let resolver = make_resolver();
let mut reg = PresenceRegistry::new();
let peer = addr("10.0.0.2:4433");
let mut fps = HashSet::new();
fps.insert("deadbeef".to_string());
reg.update_peer(peer, fps);
let route = resolver.resolve(&reg, "deadbeef");
assert_eq!(route, Route::DirectPeer(peer));
}
#[test]
fn resolve_not_found() {
let resolver = make_resolver();
let reg = PresenceRegistry::new();
let route = resolver.resolve(&reg, "unknown_fp");
assert_eq!(route, Route::NotFound);
}
#[test]
fn resolve_multi_hop_fallback() {
// multi-hop currently falls back to single-hop behavior
let resolver = make_resolver();
let mut reg = PresenceRegistry::new();
reg.register_local("local_fp", None, None);
let peer = addr("10.0.0.3:4433");
let mut fps = HashSet::new();
fps.insert("remote_fp".to_string());
reg.update_peer(peer, fps);
// Local lookup works via multi-hop
assert_eq!(resolver.resolve_multi_hop(&reg, "local_fp", 3), Route::Local);
// Remote lookup works via multi-hop
assert_eq!(
resolver.resolve_multi_hop(&reg, "remote_fp", 3),
Route::DirectPeer(peer)
);
// Not-found works via multi-hop
assert_eq!(
resolver.resolve_multi_hop(&reg, "nobody", 3),
Route::NotFound
);
}
#[test]
fn route_query_signal_roundtrip() {
use wzp_proto::SignalMessage;
let query = SignalMessage::RouteQuery {
fingerprint: "aabbccdd".to_string(),
ttl: 3,
};
let json = serde_json::to_string(&query).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(
decoded,
SignalMessage::RouteQuery { ref fingerprint, ttl }
if fingerprint == "aabbccdd" && ttl == 3
));
let response = SignalMessage::RouteResponse {
fingerprint: "aabbccdd".to_string(),
found: true,
relay_chain: vec!["10.0.0.1:4433".to_string(), "10.0.0.2:4433".to_string()],
};
let json = serde_json::to_string(&response).unwrap();
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(
decoded,
SignalMessage::RouteResponse { ref fingerprint, found, ref relay_chain }
if fingerprint == "aabbccdd" && found && relay_chain.len() == 2
));
}
#[test]
fn route_display() {
assert_eq!(Route::Local.to_string(), "local");
assert_eq!(
Route::DirectPeer(addr("10.0.0.2:4433")).to_string(),
"direct_peer(10.0.0.2:4433)"
);
assert_eq!(
Route::Chain(vec![addr("10.0.0.2:4433"), addr("10.0.0.3:4433")]).to_string(),
"chain(10.0.0.2:4433 -> 10.0.0.3:4433)"
);
assert_eq!(Route::NotFound.to_string(), "not_found");
// Debug is also useful
let debug = format!("{:?}", Route::Local);
assert!(debug.contains("Local"));
}
#[test]
fn route_json_output() {
let resolver = make_resolver();
let json = resolver.route_json("fp1", &Route::Local);
assert_eq!(json["route"], "local");
assert_eq!(json["fingerprint"], "fp1");
assert_eq!(json["relay_chain"].as_array().unwrap().len(), 1);
let json = resolver.route_json("fp2", &Route::DirectPeer(addr("10.0.0.2:4433")));
assert_eq!(json["route"], "direct_peer");
assert_eq!(json["relay_chain"].as_array().unwrap().len(), 2);
let json = resolver.route_json("fp3", &Route::NotFound);
assert_eq!(json["route"], "not_found");
assert_eq!(json["relay_chain"].as_array().unwrap().len(), 0);
}
}

243
crates/wzp-relay/src/ws.rs Normal file
View File

@@ -0,0 +1,243 @@
//! WebSocket transport for browser clients.
//!
//! Browsers connect via `GET /ws/{room}` → WebSocket upgrade.
//! First message must be auth JSON (if auth is enabled).
//! Subsequent messages are binary PCM frames forwarded to/from the room.
use std::net::SocketAddr;
use std::sync::Arc;
use axum::{
extract::{
ws::{Message, WebSocket},
Path, State, WebSocketUpgrade,
},
response::IntoResponse,
routing::get,
Router,
};
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tower_http::services::ServeDir;
use tracing::{error, info, warn};
use crate::auth;
use crate::metrics::RelayMetrics;
use crate::presence::PresenceRegistry;
use crate::room::RoomManager;
use crate::session_mgr::SessionManager;
/// Shared state for WebSocket handlers.
#[derive(Clone)]
pub struct WsState {
pub room_mgr: Arc<Mutex<RoomManager>>,
pub session_mgr: Arc<Mutex<SessionManager>>,
pub auth_url: Option<String>,
pub metrics: Arc<RelayMetrics>,
pub presence: Arc<Mutex<PresenceRegistry>>,
}
/// Start the WebSocket + static file server.
pub async fn run_ws_server(port: u16, state: WsState, static_dir: Option<String>) {
let mut app = Router::new()
.route("/ws/{room}", get(ws_upgrade_handler))
.with_state(state);
if let Some(dir) = static_dir {
info!(dir = %dir, "serving static files");
app = app.fallback_service(ServeDir::new(dir));
}
let addr: SocketAddr = ([0, 0, 0, 0], port).into();
info!(%addr, "WebSocket server listening");
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("failed to bind WS listener");
axum::serve(listener, app).await.expect("WS server failed");
}
async fn ws_upgrade_handler(
Path(room): Path<String>,
State(state): State<WsState>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_ws_connection(socket, room, state))
}
async fn handle_ws_connection(socket: WebSocket, room: String, state: WsState) {
let (mut ws_tx, mut ws_rx) = socket.split();
// 1. Auth: if auth_url is set, first message must be {"type":"auth","token":"..."}
let fingerprint: Option<String> = if let Some(ref auth_url) = state.auth_url {
match ws_rx.next().await {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<serde_json::Value>(&text) {
Ok(parsed) if parsed["type"] == "auth" => {
if let Some(token) = parsed["token"].as_str() {
match auth::validate_token(auth_url, token).await {
Ok(client) => {
state.metrics.auth_attempts.with_label_values(&["ok"]).inc();
info!(fingerprint = %client.fingerprint, "WS authenticated");
let _ = ws_tx
.send(Message::Text(r#"{"type":"auth_ok"}"#.into()))
.await;
Some(client.fingerprint)
}
Err(e) => {
state
.metrics
.auth_attempts
.with_label_values(&["fail"])
.inc();
let _ = ws_tx
.send(Message::Text(
format!(r#"{{"type":"auth_error","error":"{e}"}}"#)
.into(),
))
.await;
warn!("WS auth failed: {e}");
return;
}
}
} else {
warn!("WS auth: missing token field");
return;
}
}
_ => {
warn!("WS: expected auth message as first frame");
return;
}
}
}
_ => {
warn!("WS: connection closed before auth");
return;
}
}
} else {
let _ = ws_tx
.send(Message::Text(r#"{"type":"auth_ok"}"#.into()))
.await;
None
};
// 2. Create mpsc channel for outbound frames (room → browser)
let (tx, mut rx) = mpsc::channel::<Bytes>(64);
// 3. Create session
let session_id = {
let mut smgr = state.session_mgr.lock().await;
match smgr.create_session(&room, fingerprint.clone()) {
Ok(id) => id,
Err(e) => {
error!(room = %room, "WS session rejected: {e}");
return;
}
}
};
state.metrics.active_sessions.inc();
// 4. Join room with WS sender
let addr: SocketAddr = ([0, 0, 0, 0], 0).into();
let participant_id = {
let mut mgr = state.room_mgr.lock().await;
match mgr.join_ws(&room, addr, tx, fingerprint.as_deref()) {
Ok(id) => {
state.metrics.active_rooms.set(mgr.list().len() as i64);
id
}
Err(e) => {
error!(room = %room, "WS room join denied: {e}");
state.metrics.active_sessions.dec();
let mut smgr = state.session_mgr.lock().await;
smgr.remove_session(session_id);
return;
}
}
};
// 5. Register presence
if let Some(ref fp) = fingerprint {
let mut reg = state.presence.lock().await;
reg.register_local(fp, None, Some(room.clone()));
}
info!(room = %room, participant = participant_id, "WS client joined");
// 6. Outbound task: mpsc rx → WS binary frames
let send_task = tokio::spawn(async move {
while let Some(data) = rx.recv().await {
if ws_tx
.send(Message::Binary(data.to_vec().into()))
.await
.is_err()
{
break;
}
}
});
// 7. Inbound: WS recv → fan-out to room
loop {
match ws_rx.next().await {
Some(Ok(Message::Binary(data))) => {
let others = {
let mgr = state.room_mgr.lock().await;
mgr.others(&room, participant_id)
};
for other in &others {
let _ = other.send_raw(&data).await;
}
state
.metrics
.packets_forwarded
.inc_by(others.len() as u64);
state
.metrics
.bytes_forwarded
.inc_by(data.len() as u64 * others.len() as u64);
}
Some(Ok(Message::Close(_))) | None => break,
_ => continue,
}
}
// 8. Cleanup
send_task.abort();
info!(room = %room, participant = participant_id, "WS client disconnected");
if let Some(ref fp) = fingerprint {
let mut reg = state.presence.lock().await;
reg.unregister_local(fp);
}
{
let mut mgr = state.room_mgr.lock().await;
mgr.leave(&room, participant_id);
state.metrics.active_rooms.set(mgr.list().len() as i64);
}
let session_id_str: String = session_id.iter().map(|b| format!("{b:02x}")).collect();
state.metrics.remove_session_metrics(&session_id_str);
state.metrics.active_sessions.dec();
{
let mut smgr = state.session_mgr.lock().await;
smgr.remove_session(session_id);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ws_state_is_clone() {
// WsState must be Clone for axum's State extractor
fn assert_clone<T: Clone>() {}
assert_clone::<WsState>();
}
}

View File

@@ -1,329 +1,607 @@
# WarzonePhone Protocol Design & Architecture
# WarzonePhone Architecture
## Network Topology
> Custom lossy VoIP protocol built in Rust. E2E encrypted, FEC-protected, adaptive quality, designed for hostile network conditions.
```
Lossy / censored link
◄──────────────────────►
┌────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐
│ Client │─QUIC─│ Relay A │─QUIC─│ Relay B │─QUIC─│ Destination │
└────────┘ └─────────┘ └─────────┘ └─────────────┘
│ │ │ │
Encode Forward Forward Decode
FEC FEC FEC FEC
Encrypt (opaque) (opaque) Decrypt
## System Overview
```mermaid
graph TB
subgraph "Client A (Browser/CLI)"
MIC[Microphone] --> DN[NoiseSupressor<br/>RNNoise ML]
DN --> SD[SilenceDetector<br/>VAD + Hangover]
SD --> ENC[CallEncoder<br/>Opus/Codec2]
ENC --> FEC_E[FEC Encoder<br/>RaptorQ]
FEC_E --> CRYPT_E[ChaCha20-Poly1305<br/>Encrypt]
CRYPT_E --> QUIC_S[QUIC Datagram<br/>Send]
QUIC_R[QUIC Datagram<br/>Recv] --> CRYPT_D[ChaCha20-Poly1305<br/>Decrypt]
CRYPT_D --> FEC_D[FEC Decoder<br/>RaptorQ]
FEC_D --> JIT[JitterBuffer<br/>Adaptive Playout]
JIT --> DEC[CallDecoder<br/>Opus/Codec2]
DEC --> SPK[Speaker]
end
subgraph "Relay (SFU)"
ACCEPT[Accept QUIC] --> AUTH{Auth?}
AUTH -->|token| VALIDATE[POST /v1/auth/validate]
AUTH -->|no auth| HS
VALIDATE --> HS[Crypto Handshake<br/>X25519 + Ed25519]
HS --> ROOM[Room Manager<br/>Named Rooms via SNI]
ROOM --> FWD[Forward to<br/>Other Participants]
end
subgraph "Client B"
B_SPK[Speaker]
B_MIC[Microphone]
end
QUIC_S -->|UDP/QUIC| ACCEPT
FWD -->|UDP/QUIC| QUIC_R
B_MIC -.->|same pipeline| ACCEPT
FWD -.->|same pipeline| B_SPK
style MIC fill:#4a9eff
style SPK fill:#4a9eff
style B_MIC fill:#4a9eff
style B_SPK fill:#4a9eff
style ROOM fill:#ff9f43
style CRYPT_E fill:#ee5a24
style CRYPT_D fill:#ee5a24
```
In the simplest deployment a single relay serves as the meeting point (room mode, SFU). Clients connect directly to one relay, which forwards media to all other participants in the same room. For censorship-resistant links, two relays can be chained: a client-facing relay forwards all traffic to a remote relay via QUIC.
## Crate Dependency Graph
Room names are carried in the QUIC SNI field during the TLS handshake, so a single relay can host many independent rooms without additional signaling.
```mermaid
graph TD
PROTO[wzp-proto<br/>Types, Traits, Wire Format]
## Protocol Stack
CODEC[wzp-codec<br/>Opus + Codec2 + RNNoise]
FEC[wzp-fec<br/>RaptorQ FEC]
CRYPTO[wzp-crypto<br/>ChaCha20 + Identity]
TRANSPORT[wzp-transport<br/>QUIC/Quinn]
```
┌──────────────────────────────────────────────┐
│ Application (Opus / Codec2 audio) │ wzp-codec
├──────────────────────────────────────────────┤
Redundancy (RaptorQ FEC + interleaving) │ wzp-fec
├──────────────────────────────────────────────┤
Crypto (ChaCha20-Poly1305 + AEAD) │ wzp-crypto
├──────────────────────────────────────────────┤
│ Transport (QUIC DATAGRAM + reliable stream) │ wzp-transport
├──────────────────────────────────────────────┤
Obfuscation (Phase 2 — trait defined) │ wzp-proto::ObfuscationLayer
└──────────────────────────────────────────────┘
RELAY[wzp-relay<br/>Relay Daemon]
CLIENT[wzp-client<br/>CLI + Call Engine]
WEB[wzp-web<br/>Browser Bridge]
PROTO --> CODEC
PROTO --> FEC
PROTO --> CRYPTO
PROTO --> TRANSPORT
CODEC --> CLIENT
FEC --> CLIENT
CRYPTO --> CLIENT
TRANSPORT --> CLIENT
CODEC --> RELAY
FEC --> RELAY
CRYPTO --> RELAY
TRANSPORT --> RELAY
CLIENT --> WEB
TRANSPORT --> WEB
CRYPTO --> WEB
FC[warzone-protocol<br/>featherChat Identity] -.->|path dep| CRYPTO
style PROTO fill:#6c5ce7
style RELAY fill:#ff9f43
style CLIENT fill:#00b894
style WEB fill:#0984e3
style FC fill:#fd79a8
```
Audio and FEC are end-to-end between caller and callee. The relay operates on opaque, encrypted, FEC-protected packets. Crypto keys are never shared with relays.
## Wire Format
## Wire Formats
### MediaHeader (12 bytes)
```
Byte 0: [V:1][T:1][CodecID:4][Q:1][FecRatioHi:1]
Byte 1: [FecRatioLo:6][unused:2]
Byte 2-3: Sequence number (big-endian u16)
Byte 4-7: Timestamp in ms since session start (big-endian u32)
Byte 8: FEC block ID (wrapping u8)
Byte 9: FEC symbol index within block
Byte 10: Reserved / flags
Byte 11: CSRC count (for future mixing)
Byte 0: [V:1][T:1][CodecID:4][Q:1][FecHi:1]
Byte 1: [FecLo:6][unused:2]
Bytes 2-3: sequence (u16 BE)
Bytes 4-7: timestamp_ms (u32 BE)
Byte 8: fec_block_id (u8)
Byte 9: fec_symbol_idx (u8)
Byte 10: reserved
Byte 11: csrc_count
V = version (0), T = is_repair, CodecID = codec, Q = quality_report appended
```
Field details:
| Field | Bits | Description |
|-------|------|-------------|
| V | 1 | Protocol version (0 = v1) |
| T | 1 | 1 = FEC repair packet, 0 = source media |
| CodecID | 4 | Codec identifier (0=Opus24k, 1=Opus16k, 2=Opus6k, 3=Codec2_3200, 4=Codec2_1200) |
| Q | 1 | QualityReport trailer appended |
| FecRatio | 7 | FEC ratio encoded as 7-bit value (0-127 maps to 0.0-2.0) |
| Seq | 16 | Wrapping packet sequence number |
| Timestamp | 32 | Milliseconds since session start |
| FEC block | 8 | Source block ID (wrapping) |
| FEC symbol | 8 | Symbol index within the FEC block |
| Reserved | 8 | Reserved flags |
| CSRC count | 8 | Contributing source count (future) |
Defined in `crates/wzp-proto/src/packet.rs` as `MediaHeader`.
### QualityReport (4 bytes)
Appended to a media packet when the Q flag is set.
### MiniHeader (4 bytes, compressed)
```
Byte 0: loss_pct — 0-255 maps to 0-100% loss
Byte 1: rtt_4ms — RTT in 4ms units (0-255 = 0-1020ms)
Byte 2: jitter_ms — Jitter in milliseconds
Byte 3: bitrate_cap — Max receive bitrate in kbps
Bytes 0-1: timestamp_delta_ms (u16 BE)
Bytes 2-3: payload_len (u16 BE)
Preceded by FRAME_TYPE_MINI (0x01). Full header every 50 frames (~1s).
Saves 8 bytes/packet (67% header reduction).
```
Defined in `crates/wzp-proto/src/packet.rs` as `QualityReport`.
### MediaPacket
A complete media packet on the wire:
### TrunkFrame (batched datagrams)
```
[MediaHeader: 12 bytes][Payload: variable][QualityReport: 4 bytes if Q=1]
[count:u16]
[session_id:2][len:u16][payload:len] x count
Packs multiple session packets into one QUIC datagram.
Max 10 entries or 1200 bytes, flushed every 5ms.
```
Defined in `crates/wzp-proto/src/packet.rs` as `MediaPacket`.
### SignalMessage (reliable stream)
Signaling uses length-prefixed JSON over reliable QUIC bidirectional streams. Each message opens a new bidi stream, writes a 4-byte big-endian length prefix followed by the JSON payload, then finishes the send side.
Variants defined in `crates/wzp-proto/src/packet.rs`:
- `CallOffer` — identity_pub, ephemeral_pub, signature, supported_profiles
- `CallAnswer` — identity_pub, ephemeral_pub, signature, chosen_profile
- `IceCandidate` — NAT traversal candidate string
- `Rekey` — new_ephemeral_pub, signature
- `QualityUpdate` — report, recommended_profile
- `Ping` / `Pong` — timestamp_ms for RTT measurement
- `Hangup` — reason (Normal, Busy, Declined, Timeout, Error)
## FEC Strategy
WarzonePhone uses **RaptorQ fountain codes** (via the `raptorq` crate) for forward error correction. This is implemented in `crates/wzp-fec/`.
### Block Structure
Audio frames are grouped into FEC blocks. Each block contains a fixed number of source symbols (configured per quality profile). Each source symbol is a single encoded audio frame, zero-padded to a uniform 256-byte symbol size with a 2-byte little-endian length prefix.
### Encoding Process
1. Audio frames are added to the encoder as source symbols
2. When a block is full (`frames_per_block` symbols), repair symbols are generated
3. The repair ratio determines how many repair symbols: `ceil(num_source * ratio)`
4. Both source and repair packets are transmitted with the block ID and symbol index in the header
### Decoding Process
1. Received symbols (source or repair) are fed to the decoder keyed by block ID
2. The decoder attempts reconstruction when sufficient symbols arrive
3. RaptorQ can recover the full block from any `K` symbols out of `K + R` total (where K = source count, R = repair count)
4. Old blocks are expired via wrapping u8 distance
### Interleaving
The `Interleaver` spreads symbols from multiple FEC blocks across transmission slots in round-robin fashion. With depth=3, a burst loss of 6 consecutive packets damages at most 2 symbols per block instead of 6 symbols in one block.
### FEC Configuration by Quality Tier
| Tier | Frames/Block | Repair Ratio | Total Bandwidth Overhead |
|------|-------------|-------------|-------------------------|
| GOOD | 5 | 0.2 (20%) | 1.2x |
| DEGRADED | 10 | 0.5 (50%) | 1.5x |
| CATASTROPHIC | 8 | 1.0 (100%) | 2.0x |
## Adaptive Quality
Three quality tiers drive codec and FEC selection. The controller is implemented in `crates/wzp-proto/src/quality.rs` as `AdaptiveQualityController`.
### Tier Thresholds
| Tier | Loss | RTT | Codec | FEC Ratio |
|------|------|-----|-------|-----------|
| GOOD | < 10% | < 400ms | Opus 24kbps, 20ms frames | 0.2 |
| DEGRADED | 10-40% or 400-600ms | | Opus 6kbps, 40ms frames | 0.5 |
| CATASTROPHIC | > 40% or > 600ms | | Codec2 1200bps, 40ms frames | 1.0 |
### Hysteresis
- **Downgrade**: Triggers after 3 consecutive reports in a worse tier (fast reaction)
- **Upgrade**: Triggers after 10 consecutive reports in a better tier (slow, cautious)
- **Step limit**: Upgrades move only one tier at a time (Catastrophic -> Degraded -> Good)
- **History**: A sliding window of 20 recent reports is maintained for smoothing
- **Force mode**: Manual `force_profile()` disables adaptive logic entirely
### QualityProfile Constants
```rust
GOOD: Opus24k, fec=0.2, 20ms, 5 frames/block 28.8 kbps total
DEGRADED: Opus6k, fec=0.5, 40ms, 10 frames/block 9.0 kbps total
CATASTROPHIC: Codec2_1200, fec=1.0, 40ms, 8 frames/block 2.4 kbps total
```
## Encryption
Implemented in `crates/wzp-crypto/`.
### Identity Model (Warzone-Compatible)
- **Seed**: 32-byte random value (BIP39 mnemonic for backup)
- **Ed25519**: Derived via `HKDF(seed, "warzone-ed25519-identity")` -- signing/identity
- **X25519**: Derived via `HKDF(seed, "warzone-x25519-identity")` -- encryption
- **Fingerprint**: `SHA-256(Ed25519_pub)[:16]` -- 128-bit identifier
### Per-Call Key Exchange
1. Each side generates an ephemeral X25519 keypair
2. Ephemeral public keys are exchanged via `CallOffer`/`CallAnswer` signaling
3. Signatures are computed: `Ed25519_sign(ephemeral_pub || context_string)`
4. Shared secret: `X25519_DH(our_ephemeral_secret, peer_ephemeral_pub)`
5. Session key: `HKDF(shared_secret, "warzone-session-key")` -> 32 bytes
### Nonce Construction (12 bytes, not transmitted)
### QualityReport (4 bytes, optional)
```
session_id[0..4] || sequence_number (u32 BE) || direction (1 byte) || padding (3 bytes zero)
Byte 0: loss_pct (0-255 maps to 0-100%)
Byte 1: rtt_4ms (0-255 maps to 0-1020ms)
Byte 2: jitter_ms
Byte 3: bitrate_cap_kbps
```
- `session_id`: First 4 bytes of `SHA-256(session_key)`
- `direction`: 0 = Send, 1 = Recv
- Nonces are derived deterministically, saving 12 bytes per packet
### AEAD Encryption
- Algorithm: ChaCha20-Poly1305
- AAD: The 12-byte MediaHeader (authenticated but not encrypted)
- Tag: 16 bytes appended to ciphertext
- Overhead per packet: 16 bytes
### Rekeying
- Trigger: Every 2^16 packets (65536)
- Process: New ephemeral X25519 exchange, mixed with old key via HKDF
- Key evolution: `HKDF(old_key as salt, new_DH_result, "warzone-rekey")`
- Old key is zeroized after derivation (forward secrecy)
- Sequence counters reset to 0 after rekey
### Anti-Replay
- Sliding window of 1024 packets using a bitmap
- Sequence numbers too old (> 1024 behind highest seen) are rejected
- Handles u16 wrapping correctly (RFC 1982 serial number arithmetic)
- Implemented in `crates/wzp-crypto/src/anti_replay.rs` as `AntiReplayWindow`
## Jitter Buffer
Implemented in `crates/wzp-proto/src/jitter.rs` as `JitterBuffer`.
- **Structure**: BTreeMap keyed by sequence number for ordered playout
- **Target depth**: 50 packets (1 second) default
- **Max depth**: 250 packets (5 seconds at 20ms/frame)
- **Min depth**: 25 packets (0.5 seconds) before playout begins
- **Sequence wrapping**: RFC 1982 serial number arithmetic for u16
- **Duplicate handling**: Silently dropped
- **Late packets**: Packets arriving after their sequence has been played out are dropped
- **Overflow**: When buffer exceeds max depth, oldest packets are evicted
### Playout Results
- `Packet(MediaPacket)` -- normal delivery
- `Missing { seq }` -- gap detected, decoder should generate PLC
- `NotReady` -- buffer not yet filled to minimum depth
### Known Limitations
- No adaptive depth adjustment based on observed jitter (target_depth is configurable but not self-tuning in the current implementation)
- No timestamp-based playout scheduling (uses sequence-number ordering only)
- Jitter buffer drift has been observed during long echo tests
## Session State Machine
Defined in `crates/wzp-proto/src/session.rs`:
### SignalMessage (JSON over reliable QUIC stream)
```
Idle -> Connecting -> Handshaking -> Active <-> Rekeying -> Active
|
Closed
[4-byte length prefix][serde_json payload]
Variants:
CallOffer { identity_pub, ephemeral_pub, signature, supported_profiles }
CallAnswer { identity_pub, ephemeral_pub, signature, chosen_profile }
IceCandidate { candidate }
Hangup { reason: Normal|Busy|Declined|Timeout|Error }
AuthToken { token }
Hold, Unhold, Mute, Unmute
Transfer { target_fingerprint, relay_addr }
TransferAck
Rekey { new_ephemeral_pub, signature }
QualityUpdate { report, recommended_profile }
Ping/Pong { timestamp_ms }
```
- Media flows during both `Active` and `Rekeying` states
- Any state can transition to `Closed` via `Terminate` or `ConnectionLost`
- Invalid transitions produce a `TransitionError`
## Quality Profiles
```mermaid
graph LR
subgraph GOOD ["GOOD (28.8 kbps)"]
G_C[Opus 24kbps]
G_F[FEC 20%]
G_FR[20ms frames]
end
subgraph DEGRADED ["DEGRADED (9.0 kbps)"]
D_C[Opus 6kbps]
D_F[FEC 50%]
D_FR[40ms frames]
end
subgraph CATASTROPHIC ["CATASTROPHIC (2.4 kbps)"]
C_C[Codec2 1200bps]
C_F[FEC 100%]
C_FR[40ms frames]
end
GOOD -->|"loss>5% or RTT>100ms<br/>3 consecutive reports"| DEGRADED
DEGRADED -->|"loss>15% or RTT>200ms<br/>3 consecutive"| CATASTROPHIC
CATASTROPHIC -->|"loss<5% and RTT<100ms<br/>3 consecutive"| DEGRADED
DEGRADED -->|"loss<5% and RTT<100ms<br/>3 consecutive"| GOOD
style GOOD fill:#00b894
style DEGRADED fill:#fdcb6e
style CATASTROPHIC fill:#e17055
```
## Cryptographic Handshake
```mermaid
sequenceDiagram
participant C as Caller
participant R as Relay/Callee
Note over C: Derive identity from seed<br/>Ed25519 + X25519 via HKDF
C->>C: Generate ephemeral X25519
C->>C: Sign(ephemeral_pub || "call-offer")
C->>R: CallOffer { identity_pub, ephemeral_pub, signature, profiles }
R->>R: Verify Ed25519 signature
R->>R: Generate ephemeral X25519
R->>R: shared_secret = DH(eph_b, eph_a)
R->>R: session_key = HKDF(shared_secret, "warzone-session-key")
R->>R: Sign(ephemeral_pub || "call-answer")
R->>C: CallAnswer { identity_pub, ephemeral_pub, signature, chosen_profile }
C->>C: Verify signature
C->>C: shared_secret = DH(eph_a, eph_b)
C->>C: session_key = HKDF(shared_secret)
Note over C,R: Both have identical ChaCha20-Poly1305 session key
C->>R: Encrypted media (QUIC datagrams)
R->>C: Encrypted media (QUIC datagrams)
Note over C,R: Rekey every 65,536 packets<br/>New ephemeral DH + HKDF mix
```
## Identity Model (featherChat Compatible)
```mermaid
graph TD
SEED[32-byte Seed<br/>BIP39 Mnemonic 24 words] --> HKDF1[HKDF<br/>salt=None<br/>info=warzone-ed25519]
SEED --> HKDF2[HKDF<br/>salt=None<br/>info=warzone-x25519]
HKDF1 --> ED[Ed25519 SigningKey<br/>Digital Signatures]
HKDF2 --> X25519[X25519 StaticSecret<br/>Key Agreement]
ED --> VKEY[Ed25519 VerifyingKey<br/>Public]
X25519 --> XPUB[X25519 PublicKey<br/>Public]
VKEY --> FP[Fingerprint<br/>SHA-256 pubkey truncated 16 bytes<br/>xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx]
style SEED fill:#6c5ce7
style FP fill:#fd79a8
style ED fill:#ee5a24
style X25519 fill:#00b894
```
## Relay Modes
### Room Mode (Default, SFU)
```mermaid
graph TB
subgraph "Room Mode (Default SFU)"
C1[Client 1] -->|QUIC SNI=room-hash| RM[Room Manager]
C2[Client 2] -->|QUIC SNI=room-hash| RM
C3[Client 3] -->|QUIC SNI=room-hash| RM
RM --> R1[Room abc123]
R1 -->|fan-out| C1
R1 -->|fan-out| C2
R1 -->|fan-out| C3
end
- Clients join named rooms via QUIC SNI
- When a participant sends a packet, the relay forwards it to all other participants
- No transcoding -- packets are forwarded opaquely
- Rooms are auto-created when the first participant joins and auto-deleted when empty
- Managed by `RoomManager` in `crates/wzp-relay/src/room.rs`
subgraph "Forward Mode with --remote"
C4[Client] -->|QUIC| RA[Relay A]
RA -->|FEC decode then jitter then FEC encode| RB[Relay B]
RB -->|QUIC| C5[Client]
end
### Forward Mode (`--remote`)
subgraph "Probe Mode with --probe"
PA[Relay A] -->|Ping 1/s ~50 bytes| PB[Relay B]
PB -->|Pong| PA
PA --> PM[Prometheus<br/>RTT Loss Jitter Up/Down]
end
- All incoming traffic is forwarded to a remote relay via QUIC
- Two-pipeline architecture: upstream (client->remote) and downstream (remote->client)
- Each direction has its own `RelayPipeline` with FEC decode/encode and jitter buffering
- Intended for chaining relays across censored/lossy boundaries
### Relay Pipeline (Forward Mode)
Implemented in `crates/wzp-relay/src/pipeline.rs` as `RelayPipeline`:
```
Inbound: recv -> FEC decode -> jitter buffer -> pop
Outbound: packet -> assign seq -> FEC encode -> repair packets -> send
style RM fill:#ff9f43
style R1 fill:#fdcb6e
style PM fill:#0984e3
```
The pipeline does NOT decode/re-encode audio. It operates on FEC-protected packets, managing loss recovery and re-FEC-encoding for the next hop.
## Web Bridge Architecture
## Transport
```mermaid
sequenceDiagram
participant B as Browser
participant W as wzp-web
participant R as wzp-relay
Implemented in `crates/wzp-transport/` using QUIC via the `quinn` crate.
B->>W: HTTPS GET /room-name
W->>B: index.html (SPA)
### QUIC Configuration
B->>W: WebSocket /ws/room-name
Note over B,W: Optional auth JSON message
- ALPN protocol: `wzp`
- Idle timeout: 30 seconds
- Keep-alive interval: 5 seconds
- DATAGRAM extension enabled (for unreliable media)
- Datagram receive buffer: 64 KB
- Receive window: 256 KB
- Send window: 128 KB
- Stream receive window: 64 KB per stream
- Initial RTT estimate: 300ms (tuned for high-latency links)
W->>R: QUIC connect (SNI = hashed room name)
Note over W,R: AuthToken then Handshake then Join Room
### Media Transport
loop Every 20ms
B->>W: WS Binary Int16 x 960 PCM
W->>W: CallEncoder Opus + FEC
W->>R: QUIC Datagram encrypted
end
- **Unreliable media**: QUIC DATAGRAM frames (no retransmission, no head-of-line blocking)
- **Reliable signaling**: QUIC bidirectional streams with length-prefixed JSON framing
loop Incoming audio
R->>W: QUIC Datagram
W->>W: CallDecoder FEC + Opus
W->>B: WS Binary Int16 x 960 PCM
end
### Path Quality Monitoring
Note over B: AudioWorklet<br/>WZPCaptureProcessor mic to 960 frames<br/>WZPPlaybackProcessor ring buffer to speaker
```
`PathMonitor` in `crates/wzp-transport/src/path_monitor.rs` tracks:
## FEC Protection (RaptorQ)
- **Loss**: EWMA-smoothed percentage from sent/received packet counts
- **RTT**: EWMA-smoothed round-trip time (alpha=0.1)
- **Jitter**: EWMA of RTT variance (|current_rtt - previous_rtt|)
- **Bandwidth**: Estimated from bytes received over elapsed time
```mermaid
graph LR
subgraph "Encoder"
F1[Frame 1] --> BLK[Source Block<br/>5-10 frames]
F2[Frame 2] --> BLK
F3[Frame 3] --> BLK
F4[Frame 4] --> BLK
F5[Frame 5] --> BLK
BLK --> SRC[5 Source Symbols]
BLK --> REP[1-10 Repair Symbols<br/>ratio dependent]
SRC --> INT[Interleaver<br/>depth=3]
REP --> INT
end
### Codec Selection by Tier
subgraph "Network"
INT --> LOSS{Packet Loss}
LOSS -->|some lost| RCV[Received Symbols]
end
| Codec | Sample Rate | Frame Duration | Bitrate | Use Case |
|-------|------------|----------------|---------|----------|
| Opus24k | 48 kHz | 20ms (960 samples) | 24 kbps | Good conditions |
| Opus16k | 48 kHz | 20ms | 16 kbps | Moderate conditions |
| Opus6k | 48 kHz | 40ms (1920 samples) | 6 kbps | Degraded conditions |
| Codec2_3200 | 8 kHz | 20ms (160 samples) | 3.2 kbps | Poor conditions |
| Codec2_1200 | 8 kHz | 40ms (320 samples) | 1.2 kbps | Catastrophic conditions |
subgraph "Decoder"
RCV --> DEINT[De-interleaver]
DEINT --> RAPTORQ[RaptorQ Decoder<br/>Reconstruct from<br/>any K of K+R symbols]
RAPTORQ --> OUT[Original Frames]
end
Opus operates at 48 kHz natively. When Codec2 is selected, the adaptive codec layer handles 48 kHz <-> 8 kHz resampling transparently using a simple linear resampler (6:1 decimation/interpolation).
style LOSS fill:#e17055
style RAPTORQ fill:#00b894
```
## Telemetry Stack
```mermaid
graph TB
subgraph "Relay"
RM[RelayMetrics<br/>sessions rooms packets]
SM[SessionMetrics<br/>per-session jitter loss RTT]
PM[ProbeMetrics<br/>inter-relay RTT loss]
RM --> PROM1[GET /metrics :9090]
SM --> PROM1
PM --> PROM1
end
subgraph "Web Bridge"
WM[WebMetrics<br/>connections frames latency]
WM --> PROM2[GET /metrics :8080]
end
subgraph "Client"
CM[JitterStats + QualityAdapter]
CM --> JSONL[--metrics-file<br/>JSONL 1 line/sec]
end
PROM1 --> GRAF[Grafana Dashboard<br/>4 rows 18 panels]
PROM2 --> GRAF
JSONL --> ANALYSIS[Offline Analysis]
style GRAF fill:#ff6b6b
style PROM1 fill:#0984e3
style PROM2 fill:#0984e3
```
## Session State Machine
```mermaid
stateDiagram-v2
[*] --> Idle
Idle --> Connecting: connect
Connecting --> Handshaking: QUIC established
Handshaking --> Active: CallOffer/Answer complete
Active --> Rekeying: 65536 packets
Rekeying --> Active: new key derived
Active --> Closed: Hangup/Error/Timeout
Rekeying --> Closed: Error
Connecting --> Closed: Timeout
Handshaking --> Closed: Signature fail
note right of Active: Media flows
note right of Rekeying: Media continues while rekeying
```
## Audio Processing Pipeline Detail
```mermaid
graph TD
subgraph "Capture 20ms at 48kHz = 960 samples"
MIC[Microphone / AudioWorklet] --> PCM[PCM i16 x 960]
PCM --> RNN[RNNoise Denoise<br/>2 x 480 samples]
RNN --> VAD{Silent?}
VAD -->|Yes over 100ms| CN[ComfortNoise packet<br/>every 200ms]
VAD -->|No or Hangover| OPUS[Opus/Codec2 Encode]
end
subgraph "FEC + Crypto"
OPUS --> SYMBOL[Pad to 256-byte symbol]
CN --> SYMBOL
SYMBOL --> BLOCK[Accumulate block<br/>5-10 symbols]
BLOCK --> RAPTOR[RaptorQ encode<br/>+ repair symbols]
RAPTOR --> INTERLEAVE[Interleave depth=3]
INTERLEAVE --> HDR[Add MediaHeader<br/>or MiniHeader]
HDR --> ENCRYPT[ChaCha20-Poly1305<br/>header=AAD payload=encrypted]
ENCRYPT --> QUIC[QUIC Datagram]
end
style RNN fill:#a29bfe
style ENCRYPT fill:#ee5a24
style RAPTOR fill:#00b894
```
## Adaptive Jitter Buffer
```mermaid
graph TD
PKT[Incoming Packet] --> SEQ{Sequence Check}
SEQ -->|Duplicate| DROP[Drop + AntiReplay]
SEQ -->|Valid| BUF[BTreeMap Buffer<br/>ordered by seq]
BUF --> ADAPT[AdaptivePlayoutDelay<br/>EMA jitter tracking]
ADAPT --> TARGET[target_delay =<br/>ceil jitter_ema/20ms + 2]
BUF --> READY{depth >= target?}
READY -->|No| WAIT[Wait / Underrun++]
READY -->|Yes| POP[Pop lowest seq]
POP --> DECODE[Decode to PCM]
DECODE --> PLAY[Playout]
BUF --> OVERFLOW{depth > max?}
OVERFLOW -->|Yes| EVICT[Drop oldest<br/>Overrun++]
style ADAPT fill:#fdcb6e
style DROP fill:#e17055
style EVICT fill:#e17055
```
## Deployment Topology
```mermaid
graph TB
subgraph "Region A"
RA[wzp-relay A<br/>:4433 UDP]
WA[wzp-web A<br/>:8080 HTTPS]
WA --> RA
end
subgraph "Region B"
RB[wzp-relay B<br/>:4433 UDP]
WB[wzp-web B<br/>:8080 HTTPS]
WB --> RB
end
RA <-->|Probe 1/s| RB
BA[Browser A] -->|WSS| WA
BB[Browser B] -->|WSS| WB
CA[CLI Client] -->|QUIC| RA
PROM[Prometheus] -->|scrape| RA
PROM -->|scrape| RB
PROM -->|scrape| WA
PROM --> GRAF[Grafana]
FC[featherChat Server] -->|auth validate| RA
FC -->|auth validate| RB
style RA fill:#ff9f43
style RB fill:#ff9f43
style GRAF fill:#ff6b6b
style FC fill:#fd79a8
```
## featherChat Integration Flow
```mermaid
sequenceDiagram
participant A as User A WZP Client
participant FC as featherChat Server
participant R as WZP Relay
participant B as User B WZP Client
Note over A,B: Both users share BIP39 seed = same identity
A->>FC: WS CallSignal Offer payload=JSON SignalMessage
FC->>B: WS CallSignal Offer payload + relay_addr + room
B->>R: QUIC connect SNI = hashed room
B->>R: AuthToken fc_bearer_token
R->>FC: POST /v1/auth/validate token
FC->>R: valid true fingerprint ...
B->>R: CallOffer then CallAnswer handshake
A->>R: QUIC connect same room
A->>R: AuthToken + Handshake
Note over A,B: Both in same room media flows E2E encrypted
A->>R: Encrypted media
R->>B: Forward SFU no decryption
B->>R: Encrypted media
R->>A: Forward
```
## Bandwidth Usage
| Profile | Audio | FEC Overhead | Total | Use Case |
|---------|-------|-------------|-------|----------|
| **GOOD** | 24 kbps (Opus) | 20% = 4.8 kbps | **28.8 kbps** | WiFi, LTE, good links |
| **DEGRADED** | 6 kbps (Opus) | 50% = 3 kbps | **9.0 kbps** | 3G, congested WiFi |
| **CATASTROPHIC** | 1.2 kbps (Codec2) | 100% = 1.2 kbps | **2.4 kbps** | Satellite, extreme loss |
With silence suppression: ~50% savings in typical conversations.
With mini-frames: 8 bytes/packet saved (67% header reduction).
With trunking: shared QUIC overhead across multiplexed sessions.
## Project Structure
```
warzonePhone/
├── Cargo.toml # Workspace root
├── crates/
│ ├── wzp-proto/ # Protocol types, traits, wire format
│ │ └── src/
│ │ ├── codec_id.rs # CodecId, QualityProfile
│ │ ├── error.rs # Error types
│ │ ├── jitter.rs # JitterBuffer, AdaptivePlayoutDelay
│ │ ├── packet.rs # MediaHeader, MiniHeader, TrunkFrame, SignalMessage
│ │ ├── quality.rs # Tier, AdaptiveQualityController
│ │ ├── session.rs # SessionState machine
│ │ └── traits.rs # AudioEncoder, FecEncoder, CryptoSession, etc.
│ ├── wzp-codec/ # Audio codecs
│ │ └── src/
│ │ ├── adaptive.rs # AdaptiveEncoder/Decoder (Opus + Codec2)
│ │ ├── denoise.rs # NoiseSupressor (RNNoise/nnnoiseless)
│ │ └── silence.rs # SilenceDetector, ComfortNoise
│ ├── wzp-fec/ # Forward error correction
│ │ └── src/
│ │ ├── encoder.rs # RaptorQFecEncoder
│ │ ├── decoder.rs # RaptorQFecDecoder
│ │ └── interleave.rs # Interleaver (burst protection)
│ ├── wzp-crypto/ # Cryptography + identity
│ │ └── src/
│ │ ├── identity.rs # Seed, Fingerprint, hash_room_name
│ │ ├── handshake.rs # WarzoneKeyExchange (X25519 + Ed25519)
│ │ ├── session.rs # ChaChaSession (ChaCha20-Poly1305)
│ │ ├── nonce.rs # Deterministic nonce construction
│ │ ├── anti_replay.rs # Sliding window replay protection
│ │ └── rekey.rs # Forward secrecy rekeying
│ ├── wzp-transport/ # QUIC transport layer
│ │ └── src/lib.rs # QuinnTransport, send/recv media/signal/trunk
│ ├── wzp-relay/ # Relay daemon
│ │ └── src/
│ │ ├── main.rs # CLI, connection loop, auth + handshake
│ │ ├── room.rs # RoomManager, TrunkedForwarder
│ │ ├── pipeline.rs # RelayPipeline (forward mode)
│ │ ├── session_mgr.rs # SessionManager (limits, lifecycle)
│ │ ├── auth.rs # featherChat token validation
│ │ ├── handshake.rs # Relay-side accept_handshake
│ │ ├── metrics.rs # Prometheus RelayMetrics + per-session
│ │ ├── probe.rs # Inter-relay probes + ProbeMesh
│ │ └── trunk.rs # TrunkBatcher
│ ├── wzp-client/ # Call engine + CLI
│ │ └── src/
│ │ ├── cli.rs # CLI arg parsing + main
│ │ ├── call.rs # CallEncoder, CallDecoder, QualityAdapter
│ │ ├── handshake.rs # Client-side perform_handshake
│ │ ├── featherchat.rs # CallSignal bridge
│ │ ├── echo_test.rs # Automated echo quality test
│ │ ├── drift_test.rs # Clock drift measurement
│ │ ├── sweep.rs # Jitter buffer parameter sweep
│ │ ├── metrics.rs # JSONL telemetry writer
│ │ └── bench.rs # Component benchmarks
│ └── wzp-web/ # Browser bridge
│ ├── src/
│ │ ├── main.rs # Axum server, WS handler, TLS
│ │ └── metrics.rs # Prometheus WebMetrics
│ └── static/
│ ├── index.html # SPA UI (room, PTT, level meter)
│ └── audio-processor.js # AudioWorklet (capture + playback)
├── deps/featherchat/ # Git submodule
├── docs/
│ ├── ARCHITECTURE.md # This file
│ ├── TELEMETRY.md # Metrics specification
│ ├── INTEGRATION_TASKS.md # featherChat task tracker
│ ├── WZP-FC-SHARED-CRATES.md # Shared crate strategy
│ └── grafana-dashboard.json # Pre-built Grafana dashboard
└── scripts/
└── build-linux.sh # Hetzner VM build
```
## Test Coverage
272 tests across all crates, 0 failures.
| Crate | Tests | Key Coverage |
|-------|-------|-------------|
| wzp-proto | 41 | Wire format, jitter buffer, quality tiers, mini-frames, trunking |
| wzp-codec | 31 | Opus/Codec2 roundtrip, silence detection, noise suppression |
| wzp-fec | 22 | RaptorQ encode/decode, loss recovery, interleaving |
| wzp-crypto | 34 + 28 compat | Encrypt/decrypt, handshake, anti-replay, featherChat identity compat |
| wzp-transport | 2 | QUIC connection setup |
| wzp-relay | 40 + 4 integration | Room ACL, session mgmt, metrics, probes, mesh, trunking |
| wzp-client | 30 + 2 integration | Encoder/decoder, quality adapter, silence, drift, sweep |
| wzp-web | 2 | Metrics |

View File

@@ -77,7 +77,9 @@ Based on featherChat commit 65f6390 — FUTURE_TASKS.md with WZP integration ite
### WZP-FC-7. Missed call notifications — TODO (0.5d)
### WZP-FC-8. Cross-project identity verification — DONE (15 tests, 26dc848)
### WZP-FC-9. HKDF salt investigation — DONE (no mismatch)
### WZP-FC-10. Web bridge shared auth — TODO (1-2d)
### WZP-FC-10. Web bridge shared auth — DONE
- FC: GET /v1/wzp/relay-config, CORS layer, service token
- WZP: web bridge --auth-url validates browser tokens via FC
### FC-CRATE-1. Standalone warzone-protocol — DONE (v0.0.21, 4a4fa9f)
---

257
docs/WS_RELAY_SPEC.md Normal file
View File

@@ -0,0 +1,257 @@
# WS Support in wzp-relay — Implementation Spec
## Goal
Add WebSocket listener to `wzp-relay` so browsers connect directly, eliminating `wzp-web` bridge.
```
Before: Browser → WS → wzp-web → QUIC → wzp-relay
After: Browser → WS → wzp-relay (handles both WS + QUIC)
```
## Architecture
```
wzp-relay
├── QUIC listener (:4433) — native clients, inter-relay
├── WS listener (:8080) — browsers via Caddy
│ ├── GET /ws/{room} — WebSocket upgrade
│ └── Auth: first msg = {"type":"auth","token":"..."}
└── Shared RoomManager — both transports in same rooms
```
## Key Changes
### 1. Abstract `Participant` over transport type
**File: `room.rs`**
Currently:
```rust
struct Participant {
id: ParticipantId,
_addr: std::net::SocketAddr,
transport: Arc<wzp_transport::QuinnTransport>,
}
```
Change to:
```rust
struct Participant {
id: ParticipantId,
_addr: std::net::SocketAddr,
sender: ParticipantSender,
}
/// How to send a media packet to a participant.
enum ParticipantSender {
Quic(Arc<wzp_transport::QuinnTransport>),
WebSocket(tokio::sync::mpsc::Sender<bytes::Bytes>),
}
```
The `others()` method returns `Vec<ParticipantSender>` instead of `Vec<Arc<QuinnTransport>>`.
`ParticipantSender` implements a `send_pcm(&self, data: &[u8])` method:
- **Quic**: wraps in `MediaPacket`, calls `transport.send_media()`
- **WebSocket**: sends raw binary frame via the mpsc channel
### 2. Add `join_ws()` to RoomManager
```rust
pub fn join_ws(
&mut self,
room_name: &str,
addr: std::net::SocketAddr,
sender: tokio::sync::mpsc::Sender<bytes::Bytes>,
fingerprint: Option<&str>,
) -> Result<ParticipantId, String>
```
### 3. Add WS listener in `main.rs`
New flag: `--ws-port 8080`
```rust
if let Some(ws_port) = config.ws_port {
let room_mgr = room_mgr.clone();
let auth_url = config.auth_url.clone();
let metrics = metrics.clone();
tokio::spawn(run_ws_server(ws_port, room_mgr, auth_url, metrics));
}
```
### 4. WebSocket handler (`ws.rs` — new file)
```rust
use axum::{
extract::{ws::{Message, WebSocket}, Path, WebSocketUpgrade},
routing::get,
Router,
};
async fn ws_handler(
Path(room): Path<String>,
ws: WebSocketUpgrade,
/* state */
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_ws(socket, room, state))
}
async fn handle_ws(mut socket: WebSocket, room: String, state: WsState) {
let addr = /* peer addr */;
// 1. Auth: first message must be {"type":"auth","token":"..."}
let fingerprint = if let Some(ref auth_url) = state.auth_url {
match socket.recv().await {
Some(Ok(Message::Text(text))) => {
let parsed: serde_json::Value = serde_json::from_str(&text)?;
if parsed["type"] == "auth" {
let token = parsed["token"].as_str().unwrap();
let client = auth::validate_token(auth_url, token).await?;
Some(client.fingerprint)
} else { return; }
}
_ => return,
}
} else { None };
// 2. Create mpsc channel for outbound frames
let (tx, mut rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(64);
// 3. Join room
let participant_id = {
let mut mgr = state.room_mgr.lock().await;
mgr.join_ws(&room, addr, tx, fingerprint.as_deref())?
};
// 4. Run send/recv loops
let (mut ws_tx, mut ws_rx) = socket.split();
// Outbound: mpsc rx → WS send
let send_task = tokio::spawn(async move {
while let Some(data) = rx.recv().await {
if ws_tx.send(Message::Binary(data.to_vec())).await.is_err() {
break;
}
}
});
// Inbound: WS recv → fan-out to room
loop {
match ws_rx.next().await {
Some(Ok(Message::Binary(data))) => {
// Raw PCM Int16 from browser — fan-out to all others
let others = {
let mgr = state.room_mgr.lock().await;
mgr.others(&room, participant_id)
};
for other in &others {
other.send_raw(&data);
}
}
Some(Ok(Message::Close(_))) | None => break,
_ => continue,
}
}
// 5. Cleanup
send_task.abort();
let mut mgr = state.room_mgr.lock().await;
mgr.leave(&room, participant_id);
}
```
### 5. Cross-transport fan-out
When a QUIC participant sends audio → WS participants receive raw PCM bytes.
When a WS participant sends audio → QUIC participants receive a `MediaPacket`.
The `ParticipantSender::send_raw()` method:
```rust
impl ParticipantSender {
async fn send_raw(&self, pcm_bytes: &[u8]) {
match self {
ParticipantSender::WebSocket(tx) => {
let _ = tx.try_send(bytes::Bytes::copy_from_slice(pcm_bytes));
}
ParticipantSender::Quic(transport) => {
// Wrap raw PCM in a MediaPacket
let pkt = MediaPacket {
header: MediaHeader::default_pcm(),
payload: bytes::Bytes::copy_from_slice(pcm_bytes),
quality_report: None,
};
let _ = transport.send_media(&pkt).await;
}
}
}
}
```
For QUIC→WS direction, `run_participant` extracts `pkt.payload` bytes and sends to WS channels.
### 6. Dependencies to add
```toml
# wzp-relay/Cargo.toml
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] } # already present
```
### 7. Config change
```rust
// config.rs
pub struct RelayConfig {
// ... existing fields ...
pub ws_port: Option<u16>,
}
```
### 8. Docker compose change (featherChat side)
Remove `wzp-web` service entirely. Update Caddy to proxy `/audio/*` to relay's WS port:
```yaml
# Before:
wzp-web:
entrypoint: ["wzp-web"]
command: ["--port", "8080", "--relay", "172.28.0.10:4433"]
# After: REMOVED. Relay handles WS directly.
wzp-relay:
command:
- "--listen"
- "0.0.0.0:4433"
- "--ws-port"
- "8080"
- "--auth-url"
- "http://warzone-server:7700/v1/auth/validate"
```
## What Stays the Same
- Browser's `startAudio()` — unchanged, still connects WS to `/audio/ws/ROOM`
- Caddy proxies `/audio/*` → relay:8080 (same path, different backend)
- Auth flow — same JSON token as first message
- PCM format — same Int16 binary frames
- QUIC clients — unchanged, still connect to :4433
- Room naming, ACL, session management — all unchanged
## Testing
1. Start relay with `--ws-port 8080 --listen 0.0.0.0:4433`
2. Open browser, initiate call via featherChat
3. Verify audio flows (both directions)
4. Verify QUIC + WS clients can be in same room (mixed mode)
5. Verify auth works
6. Verify room cleanup on disconnect
## Migration Path
1. Implement WS in relay
2. Test with featherChat (no featherChat changes needed)
3. Remove wzp-web from Docker stack
4. Later: add WebTransport alongside WS