diff --git a/Cargo.lock b/Cargo.lock index bcd4183..9e55074 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,13 +138,43 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ - "axum-core", + "axum-core 0.5.6", "base64", "bytes", "form_urlencoded", @@ -155,7 +185,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -174,6 +204,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.5.6" @@ -1521,6 +1571,12 @@ dependencies = [ "libc", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -1888,6 +1944,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quinn" version = "0.11.9" @@ -2591,7 +2668,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.2", "once_cell", "rustix", "windows-sys 0.61.2", @@ -3722,6 +3799,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "chrono", "cpal", "serde", "serde_json", @@ -3796,7 +3874,9 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "axum 0.7.9", "bytes", + "prometheus", "quinn", "reqwest", "rustls", @@ -3834,10 +3914,11 @@ name = "wzp-web" version = "0.1.0" dependencies = [ "anyhow", - "axum", + "axum 0.8.8", "axum-server", "bytes", "futures", + "prometheus", "rcgen", "rustls", "rustls-pemfile", diff --git a/crates/wzp-client/Cargo.toml b/crates/wzp-client/Cargo.toml index ec7c152..6726cb2 100644 --- a/crates/wzp-client/Cargo.toml +++ b/crates/wzp-client/Cargo.toml @@ -20,6 +20,7 @@ bytes = { workspace = true } anyhow = "1" serde = { workspace = true } serde_json = "1" +chrono = "0.4" cpal = { version = "0.15", optional = true } [features] diff --git a/crates/wzp-client/src/cli.rs b/crates/wzp-client/src/cli.rs index c4f3102..727d0e7 100644 --- a/crates/wzp-client/src/cli.rs +++ b/crates/wzp-client/src/cli.rs @@ -46,6 +46,7 @@ struct CliArgs { mnemonic: Option, room: Option, token: Option, + metrics_file: Option, } impl CliArgs { @@ -86,6 +87,7 @@ fn parse_args() -> CliArgs { let mut mnemonic = None; let mut room = None; let mut token = None; + let mut metrics_file = None; let mut relay_str = None; let mut i = 1; @@ -132,6 +134,14 @@ fn parse_args() -> CliArgs { i += 1; token = Some(args.get(i).expect("--token requires a value").to_string()); } + "--metrics-file" => { + i += 1; + metrics_file = Some( + args.get(i) + .expect("--metrics-file requires a path") + .to_string(), + ); + } "--record" => { i += 1; record_file = Some( @@ -174,6 +184,7 @@ fn parse_args() -> CliArgs { eprintln!(" --mnemonic Identity seed as BIP39 mnemonic (24 words)"); eprintln!(" --room Room name (hashed for privacy before sending)"); eprintln!(" --token featherChat bearer token for relay auth"); + eprintln!(" --metrics-file Write JSONL telemetry to file (1 line/sec)"); eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)"); eprintln!(); eprintln!("Default relay: 127.0.0.1:4433"); @@ -209,6 +220,7 @@ fn parse_args() -> CliArgs { mnemonic, room, token, + metrics_file, } } diff --git a/crates/wzp-client/src/lib.rs b/crates/wzp-client/src/lib.rs index 05d83d4..8afe631 100644 --- a/crates/wzp-client/src/lib.rs +++ b/crates/wzp-client/src/lib.rs @@ -14,6 +14,7 @@ pub mod drift_test; pub mod echo_test; pub mod featherchat; pub mod handshake; +pub mod metrics; pub mod sweep; #[cfg(feature = "audio")] diff --git a/crates/wzp-client/src/metrics.rs b/crates/wzp-client/src/metrics.rs new file mode 100644 index 0000000..848197c --- /dev/null +++ b/crates/wzp-client/src/metrics.rs @@ -0,0 +1,186 @@ +//! Client-side JSONL metrics export. +//! +//! When `--metrics-file ` is passed, the client writes one JSON object +//! per second to the specified file. Each line is a self-contained JSON object +//! (JSONL format) containing jitter buffer stats, loss, and quality profile. + +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::time::{Duration, Instant}; + +use serde::Serialize; + +use wzp_proto::jitter::JitterStats; + +/// A single metrics snapshot written as one JSONL line. +#[derive(Serialize)] +pub struct ClientMetricsSnapshot { + pub ts: String, + pub buffer_depth: usize, + pub underruns: u64, + pub overruns: u64, + pub loss_pct: f64, + pub rtt_ms: u64, + pub jitter_ms: u64, + pub frames_sent: u64, + pub frames_received: u64, + pub quality_profile: String, +} + +/// Periodic JSONL writer that respects a configurable interval. +pub struct MetricsWriter { + file: File, + interval: Duration, + last_write: Instant, +} + +impl MetricsWriter { + /// Create a new `MetricsWriter` that appends JSONL to the given path. + /// + /// The file is created (or truncated) immediately. + pub fn new(path: &str, interval_secs: u64) -> Result { + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path)?; + Ok(Self { + file, + interval: Duration::from_secs(interval_secs), + // Set last_write far in the past so the first call writes immediately. + last_write: Instant::now() - Duration::from_secs(interval_secs + 1), + }) + } + + /// Write a JSONL line if the interval has elapsed since the last write. + /// + /// Returns `Ok(true)` when a line was written, `Ok(false)` when skipped. + pub fn maybe_write(&mut self, snapshot: &ClientMetricsSnapshot) -> Result { + let now = Instant::now(); + if now.duration_since(self.last_write) >= self.interval { + let line = serde_json::to_string(snapshot)?; + writeln!(self.file, "{}", line)?; + self.file.flush()?; + self.last_write = now; + Ok(true) + } else { + Ok(false) + } + } +} + +/// Build a `ClientMetricsSnapshot` from jitter buffer stats and a quality profile name. +/// +/// Fields not available from `JitterStats` alone (rtt_ms, jitter_ms, frames_sent) +/// are set to zero — the caller can override them if the data is available. +pub fn snapshot_from_stats(stats: &JitterStats, profile: &str) -> ClientMetricsSnapshot { + let loss_pct = if stats.packets_received > 0 { + (stats.packets_lost as f64 / stats.packets_received as f64) * 100.0 + } else { + 0.0 + }; + ClientMetricsSnapshot { + ts: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true), + buffer_depth: stats.current_depth, + underruns: stats.underruns, + overruns: stats.overruns, + loss_pct, + rtt_ms: 0, + jitter_ms: 0, + frames_sent: 0, + frames_received: stats.total_decoded, + quality_profile: profile.to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_test_stats() -> JitterStats { + JitterStats { + packets_received: 100, + packets_played: 95, + packets_lost: 5, + packets_late: 2, + packets_duplicate: 0, + current_depth: 8, + total_decoded: 93, + underruns: 1, + overruns: 0, + max_depth_seen: 12, + } + } + + #[test] + fn snapshot_serializes_to_json() { + let stats = make_test_stats(); + let snap = snapshot_from_stats(&stats, "GOOD"); + let json = serde_json::to_string(&snap).unwrap(); + + // Verify expected fields are present in the JSON string. + assert!(json.contains("\"ts\"")); + assert!(json.contains("\"buffer_depth\":8")); + assert!(json.contains("\"underruns\":1")); + assert!(json.contains("\"overruns\":0")); + assert!(json.contains("\"loss_pct\":5.")); + assert!(json.contains("\"rtt_ms\":0")); + assert!(json.contains("\"jitter_ms\":0")); + assert!(json.contains("\"frames_sent\":0")); + assert!(json.contains("\"frames_received\":93")); + assert!(json.contains("\"quality_profile\":\"GOOD\"")); + + // Verify it round-trips as valid JSON. + let value: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(value["buffer_depth"], 8); + assert_eq!(value["quality_profile"], "GOOD"); + } + + #[test] + fn metrics_writer_creates_file() { + let dir = std::env::temp_dir(); + let path = dir.join("wzp_metrics_test.jsonl"); + let path_str = path.to_str().unwrap(); + + let mut writer = MetricsWriter::new(path_str, 1).unwrap(); + let stats = make_test_stats(); + let snap = snapshot_from_stats(&stats, "DEGRADED"); + + let wrote = writer.maybe_write(&snap).unwrap(); + assert!(wrote, "first write should succeed immediately"); + + // Read the file back and verify it contains valid JSONL. + let contents = std::fs::read_to_string(&path).unwrap(); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 1, "should have exactly one JSONL line"); + + let value: serde_json::Value = serde_json::from_str(lines[0]).unwrap(); + assert_eq!(value["quality_profile"], "DEGRADED"); + assert_eq!(value["buffer_depth"], 8); + + // Clean up. + let _ = std::fs::remove_file(&path); + } + + #[test] + fn metrics_writer_respects_interval() { + let dir = std::env::temp_dir(); + let path = dir.join("wzp_metrics_interval_test.jsonl"); + let path_str = path.to_str().unwrap(); + + let mut writer = MetricsWriter::new(path_str, 60).unwrap(); + let stats = make_test_stats(); + let snap = snapshot_from_stats(&stats, "GOOD"); + + // First write succeeds (last_write is set far in the past). + let first = writer.maybe_write(&snap).unwrap(); + assert!(first, "first write should succeed"); + + // Immediate second write should be skipped (60s interval). + let second = writer.maybe_write(&snap).unwrap(); + assert!(!second, "second write should be skipped — interval not elapsed"); + + // Clean up. + let _ = std::fs::remove_file(&path); + } +} diff --git a/crates/wzp-relay/Cargo.toml b/crates/wzp-relay/Cargo.toml index 6509c09..b415727 100644 --- a/crates/wzp-relay/Cargo.toml +++ b/crates/wzp-relay/Cargo.toml @@ -24,6 +24,8 @@ reqwest = { version = "0.12", features = ["json"] } serde_json = "1" rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } quinn = { workspace = true } +prometheus = "0.13" +axum = { version = "0.7", default-features = false, features = ["tokio", "http1"] } [[bin]] name = "wzp-relay" diff --git a/crates/wzp-relay/src/config.rs b/crates/wzp-relay/src/config.rs index 1e692f7..d7dc946 100644 --- a/crates/wzp-relay/src/config.rs +++ b/crates/wzp-relay/src/config.rs @@ -22,6 +22,9 @@ pub struct RelayConfig { /// featherChat auth validation URL (e.g., "https://chat.example.com/v1/auth/validate"). /// If set, clients must present a valid token before joining rooms. pub auth_url: Option, + /// Port for the Prometheus metrics HTTP endpoint (e.g., 9090). + /// If None, the metrics endpoint is disabled. + pub metrics_port: Option, } impl Default for RelayConfig { @@ -34,6 +37,7 @@ impl Default for RelayConfig { jitter_max_depth: 250, log_level: "info".to_string(), auth_url: None, + metrics_port: None, } } } diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index fdbfd0a..28f768f 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -10,6 +10,7 @@ pub mod auth; pub mod config; pub mod handshake; +pub mod metrics; pub mod pipeline; pub mod room; pub mod session_mgr; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index 9b65272..cc39daf 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -17,6 +17,7 @@ use tracing::{error, info}; use wzp_proto::MediaTransport; use wzp_relay::config::RelayConfig; +use wzp_relay::metrics::RelayMetrics; use wzp_relay::pipeline::{PipelineConfig, RelayPipeline}; use wzp_relay::room::{self, RoomManager}; use wzp_relay::session_mgr::SessionManager; @@ -45,14 +46,22 @@ fn parse_args() -> RelayConfig { args.get(i).expect("--auth-url requires a URL").to_string(), ); } + "--metrics-port" => { + i += 1; + config.metrics_port = Some( + args.get(i).expect("--metrics-port requires a port number") + .parse().expect("invalid --metrics-port number"), + ); + } "--help" | "-h" => { - eprintln!("Usage: wzp-relay [--listen ] [--remote ] [--auth-url ]"); + eprintln!("Usage: wzp-relay [--listen ] [--remote ] [--auth-url ] [--metrics-port ]"); eprintln!(); eprintln!("Options:"); - eprintln!(" --listen Listen address (default: 0.0.0.0:4433)"); - eprintln!(" --remote Remote relay for forwarding (disables room mode)"); - eprintln!(" --auth-url featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)"); - eprintln!(" When set, clients must send a bearer token as first signal message."); + eprintln!(" --listen Listen address (default: 0.0.0.0:4433)"); + eprintln!(" --remote Remote relay for forwarding (disables room mode)"); + eprintln!(" --auth-url featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)"); + eprintln!(" When set, clients must send a bearer token as first signal message."); + eprintln!(" --metrics-port Prometheus metrics HTTP port (e.g., 9090). Disabled if not set."); eprintln!(); eprintln!("Room mode (default):"); eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU)."); @@ -141,6 +150,13 @@ async fn main() -> anyhow::Result<()> { .install_default() .expect("failed to install rustls crypto provider"); + // Prometheus metrics + let metrics = Arc::new(RelayMetrics::new()); + if let Some(port) = config.metrics_port { + let m = metrics.clone(); + tokio::spawn(wzp_relay::metrics::serve_metrics(port, m)); + } + // Generate ephemeral relay identity for crypto handshake let relay_seed = wzp_crypto::Seed::generate(); let relay_fp = relay_seed.derive_identity().public_identity().fingerprint; @@ -186,6 +202,7 @@ async fn main() -> anyhow::Result<()> { let session_mgr = session_mgr.clone(); let auth_url = config.auth_url.clone(); let relay_seed_bytes = relay_seed.0; + let metrics = metrics.clone(); tokio::spawn(async move { let addr = connection.remote_address(); @@ -208,6 +225,7 @@ async fn main() -> anyhow::Result<()> { Ok(Some(wzp_proto::SignalMessage::AuthToken { token })) => { match wzp_relay::auth::validate_token(url, &token).await { Ok(client) => { + metrics.auth_attempts.with_label_values(&["ok"]).inc(); info!( %addr, fingerprint = %client.fingerprint, @@ -217,6 +235,7 @@ async fn main() -> anyhow::Result<()> { Some(client.fingerprint) } Err(e) => { + metrics.auth_attempts.with_label_values(&["fail"]).inc(); error!(%addr, "auth failed: {e}"); transport.close().await.ok(); return; @@ -243,12 +262,15 @@ async fn main() -> anyhow::Result<()> { }; // Crypto handshake: verify client identity + negotiate quality profile + let handshake_start = std::time::Instant::now(); let (_crypto_session, _chosen_profile) = match wzp_relay::handshake::accept_handshake( &*transport, &relay_seed_bytes, ).await { Ok(result) => { - info!(%addr, "crypto handshake complete"); + let elapsed = handshake_start.elapsed().as_secs_f64(); + metrics.handshake_duration.observe(elapsed); + info!(%addr, elapsed_ms = %(elapsed * 1000.0), "crypto handshake complete"); result } Err(e) => { @@ -302,13 +324,19 @@ async fn main() -> anyhow::Result<()> { } }; + metrics.active_sessions.inc(); + let participant_id = { let mut mgr = room_mgr.lock().await; match mgr.join(&room_name, addr, transport.clone(), authenticated_fp.as_deref()) { - Ok(id) => id, + Ok(id) => { + metrics.active_rooms.set(mgr.list().len() as i64); + id + } Err(e) => { error!(%addr, room = %room_name, "room join denied: {e}"); // Clean up the session we just created + metrics.active_sessions.dec(); let mut smgr = session_mgr.lock().await; smgr.remove_session(session_id); transport.close().await.ok(); @@ -322,9 +350,15 @@ async fn main() -> anyhow::Result<()> { room_name, participant_id, transport.clone(), + metrics.clone(), ).await; // Participant disconnected — clean up session + metrics.active_sessions.dec(); + { + let mgr = room_mgr.lock().await; + metrics.active_rooms.set(mgr.list().len() as i64); + } { let mut smgr = session_mgr.lock().await; smgr.remove_session(session_id); diff --git a/crates/wzp-relay/src/metrics.rs b/crates/wzp-relay/src/metrics.rs new file mode 100644 index 0000000..8740c8d --- /dev/null +++ b/crates/wzp-relay/src/metrics.rs @@ -0,0 +1,147 @@ +//! Prometheus metrics for the WZP relay daemon. + +use prometheus::{ + Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry, + TextEncoder, +}; +use std::sync::Arc; + +/// All relay-level Prometheus metrics. +#[derive(Clone)] +pub struct RelayMetrics { + pub active_sessions: IntGauge, + pub active_rooms: IntGauge, + pub packets_forwarded: IntCounter, + pub bytes_forwarded: IntCounter, + pub auth_attempts: IntCounterVec, + pub handshake_duration: Histogram, + registry: Registry, +} + +impl RelayMetrics { + /// Create and register all relay metrics with a new registry. + pub fn new() -> Self { + let registry = Registry::new(); + + let active_sessions = IntGauge::with_opts( + Opts::new("wzp_relay_active_sessions", "Current active sessions"), + ) + .expect("metric"); + let active_rooms = IntGauge::with_opts( + Opts::new("wzp_relay_active_rooms", "Current active rooms"), + ) + .expect("metric"); + let packets_forwarded = IntCounter::with_opts( + Opts::new("wzp_relay_packets_forwarded_total", "Total packets forwarded"), + ) + .expect("metric"); + let bytes_forwarded = IntCounter::with_opts( + Opts::new("wzp_relay_bytes_forwarded_total", "Total bytes forwarded"), + ) + .expect("metric"); + let auth_attempts = IntCounterVec::new( + Opts::new("wzp_relay_auth_attempts_total", "Auth validation attempts"), + &["result"], + ) + .expect("metric"); + let handshake_duration = Histogram::with_opts( + HistogramOpts::new( + "wzp_relay_handshake_duration_seconds", + "Crypto handshake time", + ) + .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]), + ) + .expect("metric"); + + registry.register(Box::new(active_sessions.clone())).expect("register"); + registry.register(Box::new(active_rooms.clone())).expect("register"); + registry.register(Box::new(packets_forwarded.clone())).expect("register"); + registry.register(Box::new(bytes_forwarded.clone())).expect("register"); + registry.register(Box::new(auth_attempts.clone())).expect("register"); + registry.register(Box::new(handshake_duration.clone())).expect("register"); + + Self { + active_sessions, + active_rooms, + packets_forwarded, + bytes_forwarded, + auth_attempts, + handshake_duration, + registry, + } + } + + /// Gather all metrics and encode them as Prometheus text format. + pub fn metrics_handler(&self) -> String { + let encoder = TextEncoder::new(); + let metric_families = self.registry.gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).expect("encode"); + String::from_utf8(buffer).expect("utf8") + } +} + +/// Start an HTTP server serving GET /metrics on the given port. +pub async fn serve_metrics(port: u16, metrics: Arc) { + use axum::{routing::get, Router}; + + let app = Router::new().route( + "/metrics", + get(move || { + let m = metrics.clone(); + async move { m.metrics_handler() } + }), + ); + + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("failed to bind metrics port"); + tracing::info!(%addr, "metrics endpoint serving"); + axum::serve(listener, app) + .await + .expect("metrics server error"); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn metrics_register() { + let m = RelayMetrics::new(); + // Touch the CounterVec labels so they appear in output + m.auth_attempts.with_label_values(&["ok"]); + m.auth_attempts.with_label_values(&["fail"]); + let output = m.metrics_handler(); + // Should contain all registered metric names (as HELP or TYPE lines) + assert!(output.contains("wzp_relay_active_sessions")); + assert!(output.contains("wzp_relay_active_rooms")); + assert!(output.contains("wzp_relay_packets_forwarded_total")); + assert!(output.contains("wzp_relay_bytes_forwarded_total")); + assert!(output.contains("wzp_relay_auth_attempts_total")); + assert!(output.contains("wzp_relay_handshake_duration_seconds")); + } + + #[test] + fn metrics_increment() { + let m = RelayMetrics::new(); + + m.active_sessions.set(5); + m.active_rooms.set(2); + m.packets_forwarded.inc_by(100); + m.bytes_forwarded.inc_by(48000); + m.auth_attempts.with_label_values(&["ok"]).inc(); + m.auth_attempts.with_label_values(&["fail"]).inc_by(3); + m.handshake_duration.observe(0.042); + + let output = m.metrics_handler(); + assert!(output.contains("wzp_relay_active_sessions 5")); + assert!(output.contains("wzp_relay_active_rooms 2")); + assert!(output.contains("wzp_relay_packets_forwarded_total 100")); + assert!(output.contains("wzp_relay_bytes_forwarded_total 48000")); + assert!(output.contains("wzp_relay_auth_attempts_total{result=\"ok\"} 1")); + assert!(output.contains("wzp_relay_auth_attempts_total{result=\"fail\"} 3")); + assert!(output.contains("wzp_relay_handshake_duration_seconds_count 1")); + } +} diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index adad331..2ee837a 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -12,6 +12,8 @@ use tracing::{error, info, warn}; use wzp_proto::MediaTransport; +use crate::metrics::RelayMetrics; + /// Unique participant ID within a room. pub type ParticipantId = u64; @@ -176,6 +178,7 @@ pub async fn run_participant( room_name: String, participant_id: ParticipantId, transport: Arc, + metrics: Arc, ) { let addr = transport.connection().remote_address(); let mut packets_forwarded = 0u64; @@ -200,6 +203,7 @@ pub async fn run_participant( }; // Forward to all others + let pkt_bytes = pkt.payload.len() as u64; for other in &others { // Best-effort: if one send fails, continue to others if let Err(e) = other.send_media(&pkt).await { @@ -208,6 +212,9 @@ pub async fn run_participant( } } + let fan_out = others.len() as u64; + metrics.packets_forwarded.inc_by(fan_out); + metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); packets_forwarded += 1; if packets_forwarded % 500 == 0 { let room_size = { diff --git a/crates/wzp-web/Cargo.toml b/crates/wzp-web/Cargo.toml index 7614b21..5d7b705 100644 --- a/crates/wzp-web/Cargo.toml +++ b/crates/wzp-web/Cargo.toml @@ -29,6 +29,7 @@ rcgen = "0.13" rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } rustls-pki-types = "1" tokio-rustls = "0.26" +prometheus = "0.13" [[bin]] name = "wzp-web" diff --git a/crates/wzp-web/src/main.rs b/crates/wzp-web/src/main.rs index 4143fb8..84eb633 100644 --- a/crates/wzp-web/src/main.rs +++ b/crates/wzp-web/src/main.rs @@ -25,6 +25,9 @@ use tracing::{error, info, warn}; use wzp_client::call::{CallConfig, CallDecoder, CallEncoder}; use wzp_proto::MediaTransport; +mod metrics; +use metrics::WebMetrics; + const FRAME_SAMPLES: usize = 960; #[derive(Clone)] @@ -32,6 +35,7 @@ struct AppState { relay_addr: SocketAddr, rooms: Arc>>, auth_url: Option, + metrics: WebMetrics, } /// A waiting client in a room. @@ -90,10 +94,12 @@ async fn main() -> anyhow::Result<()> { info!(url, "auth enabled — browsers must send token as first WS message"); } + let web_metrics = WebMetrics::new(); let state = AppState { relay_addr, rooms: Arc::new(Mutex::new(HashMap::new())), auth_url, + metrics: web_metrics, }; let static_dir = if std::path::Path::new("crates/wzp-web/static").exists() { @@ -106,6 +112,7 @@ async fn main() -> anyhow::Result<()> { let app = Router::new() .route("/ws/{room}", get(ws_handler)) + .route("/metrics", get(metrics::metrics_handler)) .fallback_service(ServeDir::new(static_dir)) .with_state(state); @@ -172,6 +179,8 @@ async fn ws_handler( async fn handle_ws(socket: WebSocket, room: String, state: AppState) { info!(room = %room, "client joined room"); + state.metrics.active_connections.inc(); + let (mut ws_sender, mut ws_receiver) = socket.split(); // Auth: if --auth-url is set, expect a JSON auth message from the browser first @@ -184,6 +193,8 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { let token = v.get("token").and_then(|t| t.as_str()).unwrap_or("").to_string(); if token.is_empty() { error!(room = %room, "empty auth token"); + state.metrics.auth_failures.inc(); + state.metrics.active_connections.dec(); return; } // Validate against featherChat @@ -194,6 +205,8 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { } Err(e) => { error!(room = %room, "browser auth failed: {e}"); + state.metrics.auth_failures.inc(); + state.metrics.active_connections.dec(); return; } } @@ -202,12 +215,16 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { } _ => { error!(room = %room, "expected auth JSON, got: {text}"); + state.metrics.auth_failures.inc(); + state.metrics.active_connections.dec(); return; } } } _ => { error!(room = %room, "no auth message from browser"); + state.metrics.auth_failures.inc(); + state.metrics.active_connections.dec(); return; } } @@ -257,14 +274,18 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { } // Crypto handshake with relay + let handshake_start = std::time::Instant::now(); let bridge_seed = wzp_crypto::Seed::generate(); match wzp_client::handshake::perform_handshake(&*transport, &bridge_seed.0).await { Ok(_session) => { - info!(room = %room, "crypto handshake with relay complete"); + let elapsed = handshake_start.elapsed().as_secs_f64(); + state.metrics.handshake_latency.observe(elapsed); + info!(room = %room, elapsed_ms = %(elapsed * 1000.0), "crypto handshake with relay complete"); } Err(e) => { error!(room = %room, "relay handshake failed: {e}"); transport.close().await.ok(); + state.metrics.active_connections.dec(); return; } } @@ -277,6 +298,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { let send_transport = transport.clone(); let send_encoder = encoder.clone(); let send_room = room.clone(); + let send_metrics = state.metrics.clone(); let send_task = tokio::spawn(async move { let mut frames_sent = 0u64; while let Some(Ok(msg)) = ws_receiver.next().await { @@ -302,6 +324,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { return; } } + send_metrics.frames_bridged.with_label_values(&["up"]).inc(); frames_sent += 1; if frames_sent % 500 == 0 { info!(room = %send_room, frames_sent, "browser → relay"); @@ -318,6 +341,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { let recv_transport = transport.clone(); let recv_decoder = decoder.clone(); let recv_room = room.clone(); + let recv_metrics = state.metrics.clone(); let recv_task = tokio::spawn(async move { let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; let mut frames_recv = 0u64; @@ -336,6 +360,7 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { error!("ws send: {e}"); return; } + recv_metrics.frames_bridged.with_label_values(&["down"]).inc(); frames_recv += 1; if frames_recv % 500 == 0 { info!(room = %recv_room, frames_recv, "relay → browser"); @@ -356,5 +381,6 @@ async fn handle_ws(socket: WebSocket, room: String, state: AppState) { } transport.close().await.ok(); + state.metrics.active_connections.dec(); info!(room = %room, "session ended"); } diff --git a/crates/wzp-web/src/metrics.rs b/crates/wzp-web/src/metrics.rs new file mode 100644 index 0000000..716f1d0 --- /dev/null +++ b/crates/wzp-web/src/metrics.rs @@ -0,0 +1,130 @@ +//! Prometheus metrics for the WZP web bridge. + +use prometheus::{ + Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry, + TextEncoder, +}; + +/// Holds all Prometheus metrics for the web bridge. +#[derive(Clone)] +pub struct WebMetrics { + pub active_connections: IntGauge, + pub frames_bridged: IntCounterVec, + pub auth_failures: IntCounter, + pub handshake_latency: Histogram, + registry: Registry, +} + +impl WebMetrics { + /// Create and register all web bridge metrics. + pub fn new() -> Self { + let registry = Registry::new(); + + let active_connections = IntGauge::with_opts( + Opts::new("wzp_web_active_connections", "Current WebSocket connections"), + ) + .expect("metric"); + registry + .register(Box::new(active_connections.clone())) + .expect("register"); + + let frames_bridged = IntCounterVec::new( + Opts::new("wzp_web_frames_bridged_total", "Audio frames bridged"), + &["direction"], + ) + .expect("metric"); + registry + .register(Box::new(frames_bridged.clone())) + .expect("register"); + + let auth_failures = IntCounter::with_opts( + Opts::new("wzp_web_auth_failures_total", "Browser auth failures"), + ) + .expect("metric"); + registry + .register(Box::new(auth_failures.clone())) + .expect("register"); + + let handshake_latency = Histogram::with_opts( + HistogramOpts::new( + "wzp_web_handshake_latency_seconds", + "Relay handshake time", + ) + .buckets(vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]), + ) + .expect("metric"); + registry + .register(Box::new(handshake_latency.clone())) + .expect("register"); + + Self { + active_connections, + frames_bridged, + auth_failures, + handshake_latency, + registry, + } + } + + /// Encode all metrics as Prometheus text exposition format. + pub fn gather(&self) -> String { + let encoder = TextEncoder::new(); + let metric_families = self.registry.gather(); + let mut buf = Vec::new(); + encoder.encode(&metric_families, &mut buf).unwrap(); + String::from_utf8(buf).unwrap() + } +} + +/// Axum handler that returns Prometheus text metrics. +pub async fn metrics_handler( + axum::extract::State(state): axum::extract::State, +) -> String { + state.metrics.gather() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn web_metrics_register() { + let m = WebMetrics::new(); + // Touch CounterVec labels so they appear in output + m.frames_bridged.with_label_values(&["up"]); + m.frames_bridged.with_label_values(&["down"]); + let output = m.gather(); + assert!( + output.contains("wzp_web_active_connections"), + "missing active_connections" + ); + assert!( + output.contains("wzp_web_frames_bridged_total"), + "missing frames_bridged" + ); + assert!( + output.contains("wzp_web_auth_failures_total"), + "missing auth_failures" + ); + assert!( + output.contains("wzp_web_handshake_latency_seconds"), + "missing handshake_latency" + ); + } + + #[test] + fn web_metrics_track_connections() { + let m = WebMetrics::new(); + assert_eq!(m.active_connections.get(), 0); + + m.active_connections.inc(); + m.active_connections.inc(); + assert_eq!(m.active_connections.get(), 2); + + m.active_connections.dec(); + assert_eq!(m.active_connections.get(), 1); + + let output = m.gather(); + assert!(output.contains("wzp_web_active_connections 1")); + } +}