feat: Prometheus metrics on relay + web bridge, client JSONL export (T5-S1/S3/S4)

WZP-P2-T5-S1: Relay Prometheus /metrics
- RelayMetrics: active_sessions, active_rooms, packets/bytes_forwarded,
  auth_attempts (ok/fail), handshake_duration histogram
- --metrics-port flag spawns HTTP server
- Wired into auth, handshake, session, and packet forwarding paths
- 2 tests

WZP-P2-T5-S3: Web bridge Prometheus /metrics
- WebMetrics: active_connections, frames_bridged (up/down),
  auth_failures, handshake_latency histogram
- Added /metrics route to existing axum app
- Wired into WS connect/disconnect, auth, handshake, send/recv loops
- 2 tests

WZP-P2-T5-S4: Client --metrics-file JSONL
- ClientMetricsSnapshot with all telemetry fields
- MetricsWriter: writes one JSON line per second to file
- snapshot_from_stats() converts JitterStats to snapshot
- --metrics-file <path> flag
- 3 tests

223 tests passing across all crates.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-28 12:44:57 +04:00
parent 3f813cd510
commit 39f6908478
14 changed files with 645 additions and 12 deletions

89
Cargo.lock generated
View File

@@ -138,13 +138,43 @@ dependencies = [
"fs_extra",
]
[[package]]
name = "axum"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [
"axum-core",
"axum-core 0.5.6",
"base64",
"bytes",
"form_urlencoded",
@@ -155,7 +185,7 @@ dependencies = [
"hyper",
"hyper-util",
"itoa",
"matchit",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
@@ -174,6 +204,26 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.6"
@@ -1521,6 +1571,12 @@ dependencies = [
"libc",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
@@ -1888,6 +1944,27 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "quinn"
version = "0.11.9"
@@ -2591,7 +2668,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom 0.3.4",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
@@ -3722,6 +3799,7 @@ dependencies = [
"anyhow",
"async-trait",
"bytes",
"chrono",
"cpal",
"serde",
"serde_json",
@@ -3796,7 +3874,9 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"axum 0.7.9",
"bytes",
"prometheus",
"quinn",
"reqwest",
"rustls",
@@ -3834,10 +3914,11 @@ name = "wzp-web"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"axum 0.8.8",
"axum-server",
"bytes",
"futures",
"prometheus",
"rcgen",
"rustls",
"rustls-pemfile",

View File

@@ -20,6 +20,7 @@ bytes = { workspace = true }
anyhow = "1"
serde = { workspace = true }
serde_json = "1"
chrono = "0.4"
cpal = { version = "0.15", optional = true }
[features]

View File

@@ -46,6 +46,7 @@ struct CliArgs {
mnemonic: Option<String>,
room: Option<String>,
token: Option<String>,
metrics_file: Option<String>,
}
impl CliArgs {
@@ -86,6 +87,7 @@ fn parse_args() -> CliArgs {
let mut mnemonic = None;
let mut room = None;
let mut token = None;
let mut metrics_file = None;
let mut relay_str = None;
let mut i = 1;
@@ -132,6 +134,14 @@ fn parse_args() -> CliArgs {
i += 1;
token = Some(args.get(i).expect("--token requires a value").to_string());
}
"--metrics-file" => {
i += 1;
metrics_file = Some(
args.get(i)
.expect("--metrics-file requires a path")
.to_string(),
);
}
"--record" => {
i += 1;
record_file = Some(
@@ -174,6 +184,7 @@ fn parse_args() -> CliArgs {
eprintln!(" --mnemonic <words...> Identity seed as BIP39 mnemonic (24 words)");
eprintln!(" --room <name> Room name (hashed for privacy before sending)");
eprintln!(" --token <token> featherChat bearer token for relay auth");
eprintln!(" --metrics-file <path> Write JSONL telemetry to file (1 line/sec)");
eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)");
eprintln!();
eprintln!("Default relay: 127.0.0.1:4433");
@@ -209,6 +220,7 @@ fn parse_args() -> CliArgs {
mnemonic,
room,
token,
metrics_file,
}
}

View File

@@ -14,6 +14,7 @@ pub mod drift_test;
pub mod echo_test;
pub mod featherchat;
pub mod handshake;
pub mod metrics;
pub mod sweep;
#[cfg(feature = "audio")]

View File

@@ -0,0 +1,186 @@
//! Client-side JSONL metrics export.
//!
//! When `--metrics-file <path>` is passed, the client writes one JSON object
//! per second to the specified file. Each line is a self-contained JSON object
//! (JSONL format) containing jitter buffer stats, loss, and quality profile.
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::time::{Duration, Instant};
use serde::Serialize;
use wzp_proto::jitter::JitterStats;
/// A single metrics snapshot written as one JSONL line.
#[derive(Serialize)]
pub struct ClientMetricsSnapshot {
pub ts: String,
pub buffer_depth: usize,
pub underruns: u64,
pub overruns: u64,
pub loss_pct: f64,
pub rtt_ms: u64,
pub jitter_ms: u64,
pub frames_sent: u64,
pub frames_received: u64,
pub quality_profile: String,
}
/// Periodic JSONL writer that respects a configurable interval.
pub struct MetricsWriter {
file: File,
interval: Duration,
last_write: Instant,
}
impl MetricsWriter {
/// Create a new `MetricsWriter` that appends JSONL to the given path.
///
/// The file is created (or truncated) immediately.
pub fn new(path: &str, interval_secs: u64) -> Result<Self, anyhow::Error> {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
Ok(Self {
file,
interval: Duration::from_secs(interval_secs),
// Set last_write far in the past so the first call writes immediately.
last_write: Instant::now() - Duration::from_secs(interval_secs + 1),
})
}
/// Write a JSONL line if the interval has elapsed since the last write.
///
/// Returns `Ok(true)` when a line was written, `Ok(false)` when skipped.
pub fn maybe_write(&mut self, snapshot: &ClientMetricsSnapshot) -> Result<bool, anyhow::Error> {
let now = Instant::now();
if now.duration_since(self.last_write) >= self.interval {
let line = serde_json::to_string(snapshot)?;
writeln!(self.file, "{}", line)?;
self.file.flush()?;
self.last_write = now;
Ok(true)
} else {
Ok(false)
}
}
}
/// Build a `ClientMetricsSnapshot` from jitter buffer stats and a quality profile name.
///
/// Fields not available from `JitterStats` alone (rtt_ms, jitter_ms, frames_sent)
/// are set to zero — the caller can override them if the data is available.
pub fn snapshot_from_stats(stats: &JitterStats, profile: &str) -> ClientMetricsSnapshot {
let loss_pct = if stats.packets_received > 0 {
(stats.packets_lost as f64 / stats.packets_received as f64) * 100.0
} else {
0.0
};
ClientMetricsSnapshot {
ts: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
buffer_depth: stats.current_depth,
underruns: stats.underruns,
overruns: stats.overruns,
loss_pct,
rtt_ms: 0,
jitter_ms: 0,
frames_sent: 0,
frames_received: stats.total_decoded,
quality_profile: profile.to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_stats() -> JitterStats {
JitterStats {
packets_received: 100,
packets_played: 95,
packets_lost: 5,
packets_late: 2,
packets_duplicate: 0,
current_depth: 8,
total_decoded: 93,
underruns: 1,
overruns: 0,
max_depth_seen: 12,
}
}
#[test]
fn snapshot_serializes_to_json() {
let stats = make_test_stats();
let snap = snapshot_from_stats(&stats, "GOOD");
let json = serde_json::to_string(&snap).unwrap();
// Verify expected fields are present in the JSON string.
assert!(json.contains("\"ts\""));
assert!(json.contains("\"buffer_depth\":8"));
assert!(json.contains("\"underruns\":1"));
assert!(json.contains("\"overruns\":0"));
assert!(json.contains("\"loss_pct\":5."));
assert!(json.contains("\"rtt_ms\":0"));
assert!(json.contains("\"jitter_ms\":0"));
assert!(json.contains("\"frames_sent\":0"));
assert!(json.contains("\"frames_received\":93"));
assert!(json.contains("\"quality_profile\":\"GOOD\""));
// Verify it round-trips as valid JSON.
let value: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(value["buffer_depth"], 8);
assert_eq!(value["quality_profile"], "GOOD");
}
#[test]
fn metrics_writer_creates_file() {
let dir = std::env::temp_dir();
let path = dir.join("wzp_metrics_test.jsonl");
let path_str = path.to_str().unwrap();
let mut writer = MetricsWriter::new(path_str, 1).unwrap();
let stats = make_test_stats();
let snap = snapshot_from_stats(&stats, "DEGRADED");
let wrote = writer.maybe_write(&snap).unwrap();
assert!(wrote, "first write should succeed immediately");
// Read the file back and verify it contains valid JSONL.
let contents = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 1, "should have exactly one JSONL line");
let value: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(value["quality_profile"], "DEGRADED");
assert_eq!(value["buffer_depth"], 8);
// Clean up.
let _ = std::fs::remove_file(&path);
}
#[test]
fn metrics_writer_respects_interval() {
let dir = std::env::temp_dir();
let path = dir.join("wzp_metrics_interval_test.jsonl");
let path_str = path.to_str().unwrap();
let mut writer = MetricsWriter::new(path_str, 60).unwrap();
let stats = make_test_stats();
let snap = snapshot_from_stats(&stats, "GOOD");
// First write succeeds (last_write is set far in the past).
let first = writer.maybe_write(&snap).unwrap();
assert!(first, "first write should succeed");
// Immediate second write should be skipped (60s interval).
let second = writer.maybe_write(&snap).unwrap();
assert!(!second, "second write should be skipped — interval not elapsed");
// Clean up.
let _ = std::fs::remove_file(&path);
}
}

View File

@@ -24,6 +24,8 @@ reqwest = { version = "0.12", features = ["json"] }
serde_json = "1"
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
quinn = { workspace = true }
prometheus = "0.13"
axum = { version = "0.7", default-features = false, features = ["tokio", "http1"] }
[[bin]]
name = "wzp-relay"

View File

@@ -22,6 +22,9 @@ pub struct RelayConfig {
/// featherChat auth validation URL (e.g., "https://chat.example.com/v1/auth/validate").
/// If set, clients must present a valid token before joining rooms.
pub auth_url: Option<String>,
/// Port for the Prometheus metrics HTTP endpoint (e.g., 9090).
/// If None, the metrics endpoint is disabled.
pub metrics_port: Option<u16>,
}
impl Default for RelayConfig {
@@ -34,6 +37,7 @@ impl Default for RelayConfig {
jitter_max_depth: 250,
log_level: "info".to_string(),
auth_url: None,
metrics_port: None,
}
}
}

View File

@@ -10,6 +10,7 @@
pub mod auth;
pub mod config;
pub mod handshake;
pub mod metrics;
pub mod pipeline;
pub mod room;
pub mod session_mgr;

View File

@@ -17,6 +17,7 @@ use tracing::{error, info};
use wzp_proto::MediaTransport;
use wzp_relay::config::RelayConfig;
use wzp_relay::metrics::RelayMetrics;
use wzp_relay::pipeline::{PipelineConfig, RelayPipeline};
use wzp_relay::room::{self, RoomManager};
use wzp_relay::session_mgr::SessionManager;
@@ -45,14 +46,22 @@ fn parse_args() -> RelayConfig {
args.get(i).expect("--auth-url requires a URL").to_string(),
);
}
"--metrics-port" => {
i += 1;
config.metrics_port = Some(
args.get(i).expect("--metrics-port requires a port number")
.parse().expect("invalid --metrics-port number"),
);
}
"--help" | "-h" => {
eprintln!("Usage: wzp-relay [--listen <addr>] [--remote <addr>] [--auth-url <url>]");
eprintln!("Usage: wzp-relay [--listen <addr>] [--remote <addr>] [--auth-url <url>] [--metrics-port <port>]");
eprintln!();
eprintln!("Options:");
eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433)");
eprintln!(" --remote <addr> Remote relay for forwarding (disables room mode)");
eprintln!(" --auth-url <url> featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)");
eprintln!(" When set, clients must send a bearer token as first signal message.");
eprintln!(" --listen <addr> Listen address (default: 0.0.0.0:4433)");
eprintln!(" --remote <addr> Remote relay for forwarding (disables room mode)");
eprintln!(" --auth-url <url> featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)");
eprintln!(" When set, clients must send a bearer token as first signal message.");
eprintln!(" --metrics-port <port> Prometheus metrics HTTP port (e.g., 9090). Disabled if not set.");
eprintln!();
eprintln!("Room mode (default):");
eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU).");
@@ -141,6 +150,13 @@ async fn main() -> anyhow::Result<()> {
.install_default()
.expect("failed to install rustls crypto provider");
// Prometheus metrics
let metrics = Arc::new(RelayMetrics::new());
if let Some(port) = config.metrics_port {
let m = metrics.clone();
tokio::spawn(wzp_relay::metrics::serve_metrics(port, m));
}
// Generate ephemeral relay identity for crypto handshake
let relay_seed = wzp_crypto::Seed::generate();
let relay_fp = relay_seed.derive_identity().public_identity().fingerprint;
@@ -186,6 +202,7 @@ async fn main() -> anyhow::Result<()> {
let session_mgr = session_mgr.clone();
let auth_url = config.auth_url.clone();
let relay_seed_bytes = relay_seed.0;
let metrics = metrics.clone();
tokio::spawn(async move {
let addr = connection.remote_address();
@@ -208,6 +225,7 @@ async fn main() -> anyhow::Result<()> {
Ok(Some(wzp_proto::SignalMessage::AuthToken { token })) => {
match wzp_relay::auth::validate_token(url, &token).await {
Ok(client) => {
metrics.auth_attempts.with_label_values(&["ok"]).inc();
info!(
%addr,
fingerprint = %client.fingerprint,
@@ -217,6 +235,7 @@ async fn main() -> anyhow::Result<()> {
Some(client.fingerprint)
}
Err(e) => {
metrics.auth_attempts.with_label_values(&["fail"]).inc();
error!(%addr, "auth failed: {e}");
transport.close().await.ok();
return;
@@ -243,12 +262,15 @@ async fn main() -> anyhow::Result<()> {
};
// Crypto handshake: verify client identity + negotiate quality profile
let handshake_start = std::time::Instant::now();
let (_crypto_session, _chosen_profile) = match wzp_relay::handshake::accept_handshake(
&*transport,
&relay_seed_bytes,
).await {
Ok(result) => {
info!(%addr, "crypto handshake complete");
let elapsed = handshake_start.elapsed().as_secs_f64();
metrics.handshake_duration.observe(elapsed);
info!(%addr, elapsed_ms = %(elapsed * 1000.0), "crypto handshake complete");
result
}
Err(e) => {
@@ -302,13 +324,19 @@ async fn main() -> anyhow::Result<()> {
}
};
metrics.active_sessions.inc();
let participant_id = {
let mut mgr = room_mgr.lock().await;
match mgr.join(&room_name, addr, transport.clone(), authenticated_fp.as_deref()) {
Ok(id) => id,
Ok(id) => {
metrics.active_rooms.set(mgr.list().len() as i64);
id
}
Err(e) => {
error!(%addr, room = %room_name, "room join denied: {e}");
// Clean up the session we just created
metrics.active_sessions.dec();
let mut smgr = session_mgr.lock().await;
smgr.remove_session(session_id);
transport.close().await.ok();
@@ -322,9 +350,15 @@ async fn main() -> anyhow::Result<()> {
room_name,
participant_id,
transport.clone(),
metrics.clone(),
).await;
// Participant disconnected — clean up session
metrics.active_sessions.dec();
{
let mgr = room_mgr.lock().await;
metrics.active_rooms.set(mgr.list().len() as i64);
}
{
let mut smgr = session_mgr.lock().await;
smgr.remove_session(session_id);

View File

@@ -0,0 +1,147 @@
//! Prometheus metrics for the WZP relay daemon.
use prometheus::{
Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry,
TextEncoder,
};
use std::sync::Arc;
/// All relay-level Prometheus metrics.
#[derive(Clone)]
pub struct RelayMetrics {
pub active_sessions: IntGauge,
pub active_rooms: IntGauge,
pub packets_forwarded: IntCounter,
pub bytes_forwarded: IntCounter,
pub auth_attempts: IntCounterVec,
pub handshake_duration: Histogram,
registry: Registry,
}
impl RelayMetrics {
/// Create and register all relay metrics with a new registry.
pub fn new() -> Self {
let registry = Registry::new();
let active_sessions = IntGauge::with_opts(
Opts::new("wzp_relay_active_sessions", "Current active sessions"),
)
.expect("metric");
let active_rooms = IntGauge::with_opts(
Opts::new("wzp_relay_active_rooms", "Current active rooms"),
)
.expect("metric");
let packets_forwarded = IntCounter::with_opts(
Opts::new("wzp_relay_packets_forwarded_total", "Total packets forwarded"),
)
.expect("metric");
let bytes_forwarded = IntCounter::with_opts(
Opts::new("wzp_relay_bytes_forwarded_total", "Total bytes forwarded"),
)
.expect("metric");
let auth_attempts = IntCounterVec::new(
Opts::new("wzp_relay_auth_attempts_total", "Auth validation attempts"),
&["result"],
)
.expect("metric");
let handshake_duration = Histogram::with_opts(
HistogramOpts::new(
"wzp_relay_handshake_duration_seconds",
"Crypto handshake time",
)
.buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]),
)
.expect("metric");
registry.register(Box::new(active_sessions.clone())).expect("register");
registry.register(Box::new(active_rooms.clone())).expect("register");
registry.register(Box::new(packets_forwarded.clone())).expect("register");
registry.register(Box::new(bytes_forwarded.clone())).expect("register");
registry.register(Box::new(auth_attempts.clone())).expect("register");
registry.register(Box::new(handshake_duration.clone())).expect("register");
Self {
active_sessions,
active_rooms,
packets_forwarded,
bytes_forwarded,
auth_attempts,
handshake_duration,
registry,
}
}
/// Gather all metrics and encode them as Prometheus text format.
pub fn metrics_handler(&self) -> String {
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).expect("encode");
String::from_utf8(buffer).expect("utf8")
}
}
/// Start an HTTP server serving GET /metrics on the given port.
pub async fn serve_metrics(port: u16, metrics: Arc<RelayMetrics>) {
use axum::{routing::get, Router};
let app = Router::new().route(
"/metrics",
get(move || {
let m = metrics.clone();
async move { m.metrics_handler() }
}),
);
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("failed to bind metrics port");
tracing::info!(%addr, "metrics endpoint serving");
axum::serve(listener, app)
.await
.expect("metrics server error");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metrics_register() {
let m = RelayMetrics::new();
// Touch the CounterVec labels so they appear in output
m.auth_attempts.with_label_values(&["ok"]);
m.auth_attempts.with_label_values(&["fail"]);
let output = m.metrics_handler();
// Should contain all registered metric names (as HELP or TYPE lines)
assert!(output.contains("wzp_relay_active_sessions"));
assert!(output.contains("wzp_relay_active_rooms"));
assert!(output.contains("wzp_relay_packets_forwarded_total"));
assert!(output.contains("wzp_relay_bytes_forwarded_total"));
assert!(output.contains("wzp_relay_auth_attempts_total"));
assert!(output.contains("wzp_relay_handshake_duration_seconds"));
}
#[test]
fn metrics_increment() {
let m = RelayMetrics::new();
m.active_sessions.set(5);
m.active_rooms.set(2);
m.packets_forwarded.inc_by(100);
m.bytes_forwarded.inc_by(48000);
m.auth_attempts.with_label_values(&["ok"]).inc();
m.auth_attempts.with_label_values(&["fail"]).inc_by(3);
m.handshake_duration.observe(0.042);
let output = m.metrics_handler();
assert!(output.contains("wzp_relay_active_sessions 5"));
assert!(output.contains("wzp_relay_active_rooms 2"));
assert!(output.contains("wzp_relay_packets_forwarded_total 100"));
assert!(output.contains("wzp_relay_bytes_forwarded_total 48000"));
assert!(output.contains("wzp_relay_auth_attempts_total{result=\"ok\"} 1"));
assert!(output.contains("wzp_relay_auth_attempts_total{result=\"fail\"} 3"));
assert!(output.contains("wzp_relay_handshake_duration_seconds_count 1"));
}
}

View File

@@ -12,6 +12,8 @@ use tracing::{error, info, warn};
use wzp_proto::MediaTransport;
use crate::metrics::RelayMetrics;
/// Unique participant ID within a room.
pub type ParticipantId = u64;
@@ -176,6 +178,7 @@ pub async fn run_participant(
room_name: String,
participant_id: ParticipantId,
transport: Arc<wzp_transport::QuinnTransport>,
metrics: Arc<RelayMetrics>,
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
@@ -200,6 +203,7 @@ pub async fn run_participant(
};
// Forward to all others
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
// Best-effort: if one send fails, continue to others
if let Err(e) = other.send_media(&pkt).await {
@@ -208,6 +212,9 @@ pub async fn run_participant(
}
}
let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
if packets_forwarded % 500 == 0 {
let room_size = {

View File

@@ -29,6 +29,7 @@ rcgen = "0.13"
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
rustls-pki-types = "1"
tokio-rustls = "0.26"
prometheus = "0.13"
[[bin]]
name = "wzp-web"

View File

@@ -25,6 +25,9 @@ use tracing::{error, info, warn};
use wzp_client::call::{CallConfig, CallDecoder, CallEncoder};
use wzp_proto::MediaTransport;
mod metrics;
use metrics::WebMetrics;
const FRAME_SAMPLES: usize = 960;
#[derive(Clone)]
@@ -32,6 +35,7 @@ struct AppState {
relay_addr: SocketAddr,
rooms: Arc<Mutex<HashMap<String, RoomSlot>>>,
auth_url: Option<String>,
metrics: WebMetrics,
}
/// A waiting client in a room.
@@ -90,10 +94,12 @@ async fn main() -> anyhow::Result<()> {
info!(url, "auth enabled — browsers must send token as first WS message");
}
let web_metrics = WebMetrics::new();
let state = AppState {
relay_addr,
rooms: Arc::new(Mutex::new(HashMap::new())),
auth_url,
metrics: web_metrics,
};
let static_dir = if std::path::Path::new("crates/wzp-web/static").exists() {
@@ -106,6 +112,7 @@ async fn main() -> anyhow::Result<()> {
let app = Router::new()
.route("/ws/{room}", get(ws_handler))
.route("/metrics", get(metrics::metrics_handler))
.fallback_service(ServeDir::new(static_dir))
.with_state(state);
@@ -172,6 +179,8 @@ async fn ws_handler(
async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
info!(room = %room, "client joined room");
state.metrics.active_connections.inc();
let (mut ws_sender, mut ws_receiver) = socket.split();
// Auth: if --auth-url is set, expect a JSON auth message from the browser first
@@ -184,6 +193,8 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
let token = v.get("token").and_then(|t| t.as_str()).unwrap_or("").to_string();
if token.is_empty() {
error!(room = %room, "empty auth token");
state.metrics.auth_failures.inc();
state.metrics.active_connections.dec();
return;
}
// Validate against featherChat
@@ -194,6 +205,8 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
}
Err(e) => {
error!(room = %room, "browser auth failed: {e}");
state.metrics.auth_failures.inc();
state.metrics.active_connections.dec();
return;
}
}
@@ -202,12 +215,16 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
}
_ => {
error!(room = %room, "expected auth JSON, got: {text}");
state.metrics.auth_failures.inc();
state.metrics.active_connections.dec();
return;
}
}
}
_ => {
error!(room = %room, "no auth message from browser");
state.metrics.auth_failures.inc();
state.metrics.active_connections.dec();
return;
}
}
@@ -257,14 +274,18 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
}
// Crypto handshake with relay
let handshake_start = std::time::Instant::now();
let bridge_seed = wzp_crypto::Seed::generate();
match wzp_client::handshake::perform_handshake(&*transport, &bridge_seed.0).await {
Ok(_session) => {
info!(room = %room, "crypto handshake with relay complete");
let elapsed = handshake_start.elapsed().as_secs_f64();
state.metrics.handshake_latency.observe(elapsed);
info!(room = %room, elapsed_ms = %(elapsed * 1000.0), "crypto handshake with relay complete");
}
Err(e) => {
error!(room = %room, "relay handshake failed: {e}");
transport.close().await.ok();
state.metrics.active_connections.dec();
return;
}
}
@@ -277,6 +298,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
let send_transport = transport.clone();
let send_encoder = encoder.clone();
let send_room = room.clone();
let send_metrics = state.metrics.clone();
let send_task = tokio::spawn(async move {
let mut frames_sent = 0u64;
while let Some(Ok(msg)) = ws_receiver.next().await {
@@ -302,6 +324,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
return;
}
}
send_metrics.frames_bridged.with_label_values(&["up"]).inc();
frames_sent += 1;
if frames_sent % 500 == 0 {
info!(room = %send_room, frames_sent, "browser → relay");
@@ -318,6 +341,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
let recv_transport = transport.clone();
let recv_decoder = decoder.clone();
let recv_room = room.clone();
let recv_metrics = state.metrics.clone();
let recv_task = tokio::spawn(async move {
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
let mut frames_recv = 0u64;
@@ -336,6 +360,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
error!("ws send: {e}");
return;
}
recv_metrics.frames_bridged.with_label_values(&["down"]).inc();
frames_recv += 1;
if frames_recv % 500 == 0 {
info!(room = %recv_room, frames_recv, "relay → browser");
@@ -356,5 +381,6 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) {
}
transport.close().await.ok();
state.metrics.active_connections.dec();
info!(room = %room, "session ended");
}

View File

@@ -0,0 +1,130 @@
//! Prometheus metrics for the WZP web bridge.
use prometheus::{
Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry,
TextEncoder,
};
/// Holds all Prometheus metrics for the web bridge.
#[derive(Clone)]
pub struct WebMetrics {
pub active_connections: IntGauge,
pub frames_bridged: IntCounterVec,
pub auth_failures: IntCounter,
pub handshake_latency: Histogram,
registry: Registry,
}
impl WebMetrics {
/// Create and register all web bridge metrics.
pub fn new() -> Self {
let registry = Registry::new();
let active_connections = IntGauge::with_opts(
Opts::new("wzp_web_active_connections", "Current WebSocket connections"),
)
.expect("metric");
registry
.register(Box::new(active_connections.clone()))
.expect("register");
let frames_bridged = IntCounterVec::new(
Opts::new("wzp_web_frames_bridged_total", "Audio frames bridged"),
&["direction"],
)
.expect("metric");
registry
.register(Box::new(frames_bridged.clone()))
.expect("register");
let auth_failures = IntCounter::with_opts(
Opts::new("wzp_web_auth_failures_total", "Browser auth failures"),
)
.expect("metric");
registry
.register(Box::new(auth_failures.clone()))
.expect("register");
let handshake_latency = Histogram::with_opts(
HistogramOpts::new(
"wzp_web_handshake_latency_seconds",
"Relay handshake time",
)
.buckets(vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]),
)
.expect("metric");
registry
.register(Box::new(handshake_latency.clone()))
.expect("register");
Self {
active_connections,
frames_bridged,
auth_failures,
handshake_latency,
registry,
}
}
/// Encode all metrics as Prometheus text exposition format.
pub fn gather(&self) -> String {
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
let mut buf = Vec::new();
encoder.encode(&metric_families, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
}
/// Axum handler that returns Prometheus text metrics.
pub async fn metrics_handler(
axum::extract::State(state): axum::extract::State<super::AppState>,
) -> String {
state.metrics.gather()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn web_metrics_register() {
let m = WebMetrics::new();
// Touch CounterVec labels so they appear in output
m.frames_bridged.with_label_values(&["up"]);
m.frames_bridged.with_label_values(&["down"]);
let output = m.gather();
assert!(
output.contains("wzp_web_active_connections"),
"missing active_connections"
);
assert!(
output.contains("wzp_web_frames_bridged_total"),
"missing frames_bridged"
);
assert!(
output.contains("wzp_web_auth_failures_total"),
"missing auth_failures"
);
assert!(
output.contains("wzp_web_handshake_latency_seconds"),
"missing handshake_latency"
);
}
#[test]
fn web_metrics_track_connections() {
let m = WebMetrics::new();
assert_eq!(m.active_connections.get(), 0);
m.active_connections.inc();
m.active_connections.inc();
assert_eq!(m.active_connections.get(), 2);
m.active_connections.dec();
assert_eq!(m.active_connections.get(), 1);
let output = m.gather();
assert!(output.contains("wzp_web_active_connections 1"));
}
}