diff --git a/crates/wzp-relay/src/config.rs b/crates/wzp-relay/src/config.rs index d7dc946..b13eea5 100644 --- a/crates/wzp-relay/src/config.rs +++ b/crates/wzp-relay/src/config.rs @@ -25,6 +25,10 @@ pub struct RelayConfig { /// Port for the Prometheus metrics HTTP endpoint (e.g., 9090). /// If None, the metrics endpoint is disabled. pub metrics_port: Option, + /// Peer relay addresses to probe for health monitoring. + /// Each target gets a persistent QUIC connection sending 1 Ping/s. + #[serde(default)] + pub probe_targets: Vec, } impl Default for RelayConfig { @@ -38,6 +42,7 @@ impl Default for RelayConfig { log_level: "info".to_string(), auth_url: None, metrics_port: None, + probe_targets: Vec::new(), } } } diff --git a/crates/wzp-relay/src/lib.rs b/crates/wzp-relay/src/lib.rs index 28f768f..ead05f1 100644 --- a/crates/wzp-relay/src/lib.rs +++ b/crates/wzp-relay/src/lib.rs @@ -12,6 +12,7 @@ pub mod config; pub mod handshake; pub mod metrics; pub mod pipeline; +pub mod probe; pub mod room; pub mod session_mgr; diff --git a/crates/wzp-relay/src/main.rs b/crates/wzp-relay/src/main.rs index cc39daf..6365469 100644 --- a/crates/wzp-relay/src/main.rs +++ b/crates/wzp-relay/src/main.rs @@ -53,8 +53,16 @@ fn parse_args() -> RelayConfig { .parse().expect("invalid --metrics-port number"), ); } + "--probe" => { + i += 1; + let addr: SocketAddr = args.get(i) + .expect("--probe requires an address") + .parse() + .expect("invalid --probe address"); + config.probe_targets.push(addr); + } "--help" | "-h" => { - eprintln!("Usage: wzp-relay [--listen ] [--remote ] [--auth-url ] [--metrics-port ]"); + eprintln!("Usage: wzp-relay [--listen ] [--remote ] [--auth-url ] [--metrics-port ] [--probe ]..."); eprintln!(); eprintln!("Options:"); eprintln!(" --listen Listen address (default: 0.0.0.0:4433)"); @@ -62,6 +70,7 @@ fn parse_args() -> RelayConfig { eprintln!(" --auth-url featherChat auth endpoint (e.g., https://chat.example.com/v1/auth/validate)"); eprintln!(" When set, clients must send a bearer token as first signal message."); eprintln!(" --metrics-port Prometheus metrics HTTP port (e.g., 9090). Disabled if not set."); + eprintln!(" --probe Peer relay to probe for health monitoring (repeatable)."); eprintln!(); eprintln!("Room mode (default):"); eprintln!(" Clients join rooms by name. Packets forwarded to all others (SFU)."); @@ -183,6 +192,14 @@ async fn main() -> anyhow::Result<()> { // Session manager — enforces max concurrent sessions let session_mgr = Arc::new(Mutex::new(SessionManager::new(config.max_sessions))); + // Spawn inter-relay health probes + for target in &config.probe_targets { + let probe_config = wzp_relay::probe::ProbeConfig::new(*target); + let runner = wzp_relay::probe::ProbeRunner::new(probe_config, metrics.registry()); + info!(target = %target, "spawning inter-relay health probe"); + tokio::spawn(async move { runner.run().await }); + } + if let Some(ref url) = config.auth_url { info!(url, "auth enabled — clients must present featherChat token"); } else { @@ -217,6 +234,37 @@ async fn main() -> anyhow::Result<()> { let transport = Arc::new(wzp_transport::QuinnTransport::new(connection)); + // Probe connections use SNI "_probe" to identify themselves. + // They skip auth + handshake and just do Ping->Pong. + if room_name == "_probe" { + info!(%addr, "probe connection detected, entering Ping/Pong responder"); + loop { + match transport.recv_signal().await { + Ok(Some(wzp_proto::SignalMessage::Ping { timestamp_ms })) => { + if let Err(e) = transport.send_signal( + &wzp_proto::SignalMessage::Pong { timestamp_ms }, + ).await { + error!(%addr, "probe pong send error: {e}"); + break; + } + } + Ok(Some(_)) => { + // Ignore non-Ping signals on probe connections + } + Ok(None) => { + info!(%addr, "probe connection closed"); + break; + } + Err(e) => { + error!(%addr, "probe recv error: {e}"); + break; + } + } + } + transport.close().await.ok(); + return; + } + // Auth check: if --auth-url is set, expect first signal message to be a token // Auth: if --auth-url is set, expect AuthToken as first signal let authenticated_fp: Option = if let Some(ref url) = auth_url { @@ -345,15 +393,21 @@ async fn main() -> anyhow::Result<()> { } }; + let session_id_str: String = session_id + .iter() + .map(|b| format!("{b:02x}")) + .collect(); room::run_participant( room_mgr.clone(), room_name, participant_id, transport.clone(), metrics.clone(), + &session_id_str, ).await; - // Participant disconnected — clean up session + // Participant disconnected — clean up per-session metrics + metrics.remove_session_metrics(&session_id_str); metrics.active_sessions.dec(); { let mgr = room_mgr.lock().await; diff --git a/crates/wzp-relay/src/metrics.rs b/crates/wzp-relay/src/metrics.rs index 8740c8d..a5674c9 100644 --- a/crates/wzp-relay/src/metrics.rs +++ b/crates/wzp-relay/src/metrics.rs @@ -1,9 +1,10 @@ //! Prometheus metrics for the WZP relay daemon. use prometheus::{ - Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry, - TextEncoder, + Encoder, GaugeVec, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, + Opts, Registry, TextEncoder, }; +use wzp_proto::packet::QualityReport; use std::sync::Arc; /// All relay-level Prometheus metrics. @@ -15,6 +16,12 @@ pub struct RelayMetrics { pub bytes_forwarded: IntCounter, pub auth_attempts: IntCounterVec, pub handshake_duration: Histogram, + // Per-session metrics + pub session_buffer_depth: IntGaugeVec, + pub session_loss_pct: GaugeVec, + pub session_rtt_ms: GaugeVec, + pub session_underruns: IntCounterVec, + pub session_overruns: IntCounterVec, registry: Registry, } @@ -53,12 +60,58 @@ impl RelayMetrics { ) .expect("metric"); + let session_buffer_depth = IntGaugeVec::new( + Opts::new( + "wzp_relay_session_jitter_buffer_depth", + "Buffer depth per session", + ), + &["session_id"], + ) + .expect("metric"); + let session_loss_pct = GaugeVec::new( + Opts::new( + "wzp_relay_session_loss_pct", + "Packet loss percentage per session", + ), + &["session_id"], + ) + .expect("metric"); + let session_rtt_ms = GaugeVec::new( + Opts::new( + "wzp_relay_session_rtt_ms", + "Round-trip time per session", + ), + &["session_id"], + ) + .expect("metric"); + let session_underruns = IntCounterVec::new( + Opts::new( + "wzp_relay_session_underruns_total", + "Jitter buffer underruns per session", + ), + &["session_id"], + ) + .expect("metric"); + let session_overruns = IntCounterVec::new( + Opts::new( + "wzp_relay_session_overruns_total", + "Jitter buffer overruns per session", + ), + &["session_id"], + ) + .expect("metric"); + registry.register(Box::new(active_sessions.clone())).expect("register"); registry.register(Box::new(active_rooms.clone())).expect("register"); registry.register(Box::new(packets_forwarded.clone())).expect("register"); registry.register(Box::new(bytes_forwarded.clone())).expect("register"); registry.register(Box::new(auth_attempts.clone())).expect("register"); registry.register(Box::new(handshake_duration.clone())).expect("register"); + registry.register(Box::new(session_buffer_depth.clone())).expect("register"); + registry.register(Box::new(session_loss_pct.clone())).expect("register"); + registry.register(Box::new(session_rtt_ms.clone())).expect("register"); + registry.register(Box::new(session_underruns.clone())).expect("register"); + registry.register(Box::new(session_overruns.clone())).expect("register"); Self { active_sessions, @@ -67,10 +120,77 @@ impl RelayMetrics { bytes_forwarded, auth_attempts, handshake_duration, + session_buffer_depth, + session_loss_pct, + session_rtt_ms, + session_underruns, + session_overruns, registry, } } + /// Update per-session quality metrics from a QualityReport. + pub fn update_session_quality(&self, session_id: &str, report: &QualityReport) { + self.session_loss_pct + .with_label_values(&[session_id]) + .set(report.loss_percent() as f64); + self.session_rtt_ms + .with_label_values(&[session_id]) + .set(report.rtt_ms() as f64); + } + + /// Update per-session buffer metrics. + pub fn update_session_buffer( + &self, + session_id: &str, + depth: usize, + underruns: u64, + overruns: u64, + ) { + self.session_buffer_depth + .with_label_values(&[session_id]) + .set(depth as i64); + // IntCounterVec doesn't have a `set` — we inc by the delta. + // Since these are cumulative from the jitter buffer, we use inc_by + // with the current totals. To avoid double-counting, callers should + // track previous values externally. For simplicity the relay reports + // the absolute value each tick; counters only go up so we take the + // max(0, new - current) approach. + let cur_underruns = self + .session_underruns + .with_label_values(&[session_id]) + .get(); + if underruns > cur_underruns as u64 { + self.session_underruns + .with_label_values(&[session_id]) + .inc_by(underruns - cur_underruns as u64); + } + let cur_overruns = self + .session_overruns + .with_label_values(&[session_id]) + .get(); + if overruns > cur_overruns as u64 { + self.session_overruns + .with_label_values(&[session_id]) + .inc_by(overruns - cur_overruns as u64); + } + } + + /// Remove all per-session label values for a disconnected session. + pub fn remove_session_metrics(&self, session_id: &str) { + let _ = self.session_buffer_depth.remove_label_values(&[session_id]); + let _ = self.session_loss_pct.remove_label_values(&[session_id]); + let _ = self.session_rtt_ms.remove_label_values(&[session_id]); + let _ = self.session_underruns.remove_label_values(&[session_id]); + let _ = self.session_overruns.remove_label_values(&[session_id]); + } + + /// Get a reference to the underlying Prometheus registry. + /// Probe metrics are registered on this same registry so they appear in /metrics output. + pub fn registry(&self) -> &Registry { + &self.registry + } + /// Gather all metrics and encode them as Prometheus text format. pub fn metrics_handler(&self) -> String { let encoder = TextEncoder::new(); @@ -123,6 +243,46 @@ mod tests { assert!(output.contains("wzp_relay_handshake_duration_seconds")); } + #[test] + fn session_quality_update() { + let m = RelayMetrics::new(); + let report = QualityReport { + loss_pct: 128, // ~50% + rtt_4ms: 25, // 100ms + jitter_ms: 10, + bitrate_cap_kbps: 200, + }; + m.update_session_quality("sess-abc", &report); + + let output = m.metrics_handler(); + assert!(output.contains("wzp_relay_session_loss_pct{session_id=\"sess-abc\"}")); + assert!(output.contains("wzp_relay_session_rtt_ms{session_id=\"sess-abc\"}")); + // Verify rtt value (25 * 4 = 100) + assert!(output.contains("wzp_relay_session_rtt_ms{session_id=\"sess-abc\"} 100")); + } + + #[test] + fn session_metrics_cleanup() { + let m = RelayMetrics::new(); + let report = QualityReport { + loss_pct: 50, + rtt_4ms: 10, + jitter_ms: 5, + bitrate_cap_kbps: 100, + }; + m.update_session_quality("sess-cleanup", &report); + m.update_session_buffer("sess-cleanup", 42, 3, 1); + + // Verify they appear + let output = m.metrics_handler(); + assert!(output.contains("sess-cleanup")); + + // Remove and verify they are gone + m.remove_session_metrics("sess-cleanup"); + let output = m.metrics_handler(); + assert!(!output.contains("sess-cleanup")); + } + #[test] fn metrics_increment() { let m = RelayMetrics::new(); diff --git a/crates/wzp-relay/src/probe.rs b/crates/wzp-relay/src/probe.rs new file mode 100644 index 0000000..b62747e --- /dev/null +++ b/crates/wzp-relay/src/probe.rs @@ -0,0 +1,420 @@ +//! Inter-relay health probe. +//! +//! A `ProbeRunner` maintains a persistent QUIC connection to a peer relay, +//! sends 1 Ping/s, and measures RTT, loss, and jitter. Results are exported +//! as Prometheus gauges with a `target` label. + +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use prometheus::{Gauge, IntGauge, Opts, Registry}; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +use wzp_proto::{MediaTransport, SignalMessage}; + +/// Configuration for a single probe target. +#[derive(Clone, Debug)] +pub struct ProbeConfig { + pub target: SocketAddr, + pub interval: Duration, +} + +impl ProbeConfig { + pub fn new(target: SocketAddr) -> Self { + Self { + target, + interval: Duration::from_secs(1), + } + } +} + +/// Prometheus metrics for one probe target. +pub struct ProbeMetrics { + pub rtt_ms: Gauge, + pub loss_pct: Gauge, + pub jitter_ms: Gauge, + pub up: IntGauge, +} + +impl ProbeMetrics { + /// Register probe metrics with the given `target` label value. + pub fn register(target: &str, registry: &Registry) -> Self { + let rtt_ms = Gauge::with_opts( + Opts::new("wzp_probe_rtt_ms", "RTT to peer relay in ms") + .const_label("target", target), + ) + .expect("probe metric"); + + let loss_pct = Gauge::with_opts( + Opts::new("wzp_probe_loss_pct", "Packet loss to peer relay in %") + .const_label("target", target), + ) + .expect("probe metric"); + + let jitter_ms = Gauge::with_opts( + Opts::new("wzp_probe_jitter_ms", "Jitter to peer relay in ms") + .const_label("target", target), + ) + .expect("probe metric"); + + let up = IntGauge::with_opts( + Opts::new("wzp_probe_up", "1 if peer relay is reachable, 0 if not") + .const_label("target", target), + ) + .expect("probe metric"); + + registry.register(Box::new(rtt_ms.clone())).expect("register"); + registry.register(Box::new(loss_pct.clone())).expect("register"); + registry.register(Box::new(jitter_ms.clone())).expect("register"); + registry.register(Box::new(up.clone())).expect("register"); + + Self { + rtt_ms, + loss_pct, + jitter_ms, + up, + } + } +} + +/// Sliding window for tracking probe results over the last N pings. +pub struct SlidingWindow { + /// Capacity (number of pings to track). + capacity: usize, + /// Timestamps of sent pings (ms since epoch) in order. + sent: VecDeque, + /// RTT values for received pongs (ms). None = no pong received yet. + rtts: VecDeque>, +} + +impl SlidingWindow { + pub fn new(capacity: usize) -> Self { + Self { + capacity, + sent: VecDeque::with_capacity(capacity), + rtts: VecDeque::with_capacity(capacity), + } + } + + /// Record a sent ping. + pub fn record_sent(&mut self, timestamp_ms: u64) { + if self.sent.len() >= self.capacity { + self.sent.pop_front(); + self.rtts.pop_front(); + } + self.sent.push_back(timestamp_ms); + self.rtts.push_back(None); + } + + /// Record a received pong. Returns the computed RTT in ms, or None if + /// the timestamp doesn't match any pending ping. + pub fn record_pong(&mut self, timestamp_ms: u64, now_ms: u64) -> Option { + // Find the sent ping with this timestamp + for (i, &sent_ts) in self.sent.iter().enumerate() { + if sent_ts == timestamp_ms { + let rtt = (now_ms as f64) - (sent_ts as f64); + self.rtts[i] = Some(rtt); + return Some(rtt); + } + } + None + } + + /// Compute loss percentage (0.0-100.0) from the current window. + /// A ping is considered lost if it has no matching pong. + pub fn loss_pct(&self) -> f64 { + if self.sent.is_empty() { + return 0.0; + } + let total = self.rtts.len() as f64; + let lost = self.rtts.iter().filter(|r| r.is_none()).count() as f64; + (lost / total) * 100.0 + } + + /// Compute jitter as the standard deviation of RTT values (ms). + /// Only considers pings that received a pong. + pub fn jitter_ms(&self) -> f64 { + let rtts: Vec = self.rtts.iter().filter_map(|r| *r).collect(); + if rtts.len() < 2 { + return 0.0; + } + let mean = rtts.iter().sum::() / rtts.len() as f64; + let variance = rtts.iter().map(|r| (r - mean).powi(2)).sum::() / rtts.len() as f64; + variance.sqrt() + } + + /// Return the most recent RTT value, if any. + pub fn latest_rtt(&self) -> Option { + self.rtts.iter().rev().find_map(|r| *r) + } +} + +/// Runs a health probe against a single peer relay. +pub struct ProbeRunner { + config: ProbeConfig, + metrics: ProbeMetrics, +} + +impl ProbeRunner { + /// Create a new probe runner, registering metrics with the given registry. + pub fn new(config: ProbeConfig, registry: &Registry) -> Self { + let target_str = config.target.to_string(); + let metrics = ProbeMetrics::register(&target_str, registry); + Self { config, metrics } + } + + /// Run the probe forever. This function never returns under normal operation. + /// It connects to the target relay, sends Ping every `interval`, and processes + /// Pong replies to compute RTT, loss, and jitter. + pub async fn run(&self) -> ! { + loop { + info!(target = %self.config.target, "probe connecting..."); + match self.run_session().await { + Ok(()) => { + // Session ended cleanly (shouldn't happen in practice) + warn!(target = %self.config.target, "probe session ended, reconnecting in 5s"); + } + Err(e) => { + error!(target = %self.config.target, "probe session error: {e}, reconnecting in 5s"); + } + } + self.metrics.up.set(0); + self.metrics.rtt_ms.set(0.0); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + /// Run one probe session (one QUIC connection). Returns when the connection drops. + async fn run_session(&self) -> anyhow::Result<()> { + // Create a client-only endpoint on an ephemeral port + let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let endpoint = wzp_transport::create_endpoint(bind_addr, None)?; + let client_cfg = wzp_transport::client_config(); + let conn = wzp_transport::connect( + &endpoint, + self.config.target, + "_probe", + client_cfg, + ) + .await?; + + let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); + self.metrics.up.set(1); + info!(target = %self.config.target, "probe connected"); + + let window = Arc::new(Mutex::new(SlidingWindow::new(60))); + + // Spawn recv task for pong messages + let recv_transport = transport.clone(); + let recv_window = window.clone(); + let rtt_gauge = self.metrics.rtt_ms.clone(); + let loss_gauge = self.metrics.loss_pct.clone(); + let jitter_gauge = self.metrics.jitter_ms.clone(); + let up_gauge = self.metrics.up.clone(); + + let recv_handle = tokio::spawn(async move { + loop { + match recv_transport.recv_signal().await { + Ok(Some(SignalMessage::Pong { timestamp_ms })) => { + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let mut w = recv_window.lock().await; + if let Some(rtt) = w.record_pong(timestamp_ms, now_ms) { + rtt_gauge.set(rtt); + } + loss_gauge.set(w.loss_pct()); + jitter_gauge.set(w.jitter_ms()); + } + Ok(Some(_)) => { + // Ignore non-Pong signals + } + Ok(None) => { + info!("probe recv: connection closed"); + up_gauge.set(0); + break; + } + Err(e) => { + error!("probe recv error: {e}"); + up_gauge.set(0); + break; + } + } + } + }); + + // Send ping loop + let mut interval = tokio::time::interval(self.config.interval); + loop { + interval.tick().await; + + if recv_handle.is_finished() { + // Recv task died — connection is lost + return Ok(()); + } + + let timestamp_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + { + let mut w = window.lock().await; + w.record_sent(timestamp_ms); + } + + if let Err(e) = transport + .send_signal(&SignalMessage::Ping { timestamp_ms }) + .await + { + error!(target = %self.config.target, "probe ping send error: {e}"); + recv_handle.abort(); + return Err(e.into()); + } + } + } +} + +/// Handle an incoming Ping signal by replying with a Pong carrying the same timestamp. +/// Returns true if the message was a Ping and was handled, false otherwise. +pub async fn handle_ping( + transport: &wzp_transport::QuinnTransport, + msg: &SignalMessage, +) -> bool { + if let SignalMessage::Ping { timestamp_ms } = msg { + if let Err(e) = transport + .send_signal(&SignalMessage::Pong { + timestamp_ms: *timestamp_ms, + }) + .await + { + warn!("failed to send Pong reply: {e}"); + } + true + } else { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use prometheus::Encoder; + + #[test] + fn probe_metrics_register() { + let registry = Registry::new(); + let _metrics = ProbeMetrics::register("127.0.0.1:4433", ®istry); + + let encoder = prometheus::TextEncoder::new(); + let families = registry.gather(); + let mut buf = Vec::new(); + encoder.encode(&families, &mut buf).unwrap(); + let output = String::from_utf8(buf).unwrap(); + + assert!(output.contains("wzp_probe_rtt_ms"), "missing wzp_probe_rtt_ms"); + assert!(output.contains("wzp_probe_loss_pct"), "missing wzp_probe_loss_pct"); + assert!(output.contains("wzp_probe_jitter_ms"), "missing wzp_probe_jitter_ms"); + assert!(output.contains("wzp_probe_up"), "missing wzp_probe_up"); + assert!( + output.contains("target=\"127.0.0.1:4433\""), + "missing target label" + ); + } + + #[test] + fn rtt_calculation() { + let mut window = SlidingWindow::new(60); + + // Send a ping at t=1000 + window.record_sent(1000); + // Receive pong at t=1050 => RTT = 50ms + let rtt = window.record_pong(1000, 1050); + assert_eq!(rtt, Some(50.0)); + + // Send at t=2000, receive at t=2030 => RTT = 30ms + window.record_sent(2000); + let rtt = window.record_pong(2000, 2030); + assert_eq!(rtt, Some(30.0)); + + assert_eq!(window.latest_rtt(), Some(30.0)); + + // Unknown timestamp returns None + let rtt = window.record_pong(9999, 10000); + assert!(rtt.is_none()); + } + + #[test] + fn loss_calculation() { + let mut window = SlidingWindow::new(10); + + // Send 10 pings + for i in 0..10 { + window.record_sent(i * 1000); + } + + // Receive pongs for 7 out of 10 (miss indices 2, 5, 8) + for i in 0..10u64 { + if i == 2 || i == 5 || i == 8 { + continue; // lost + } + window.record_pong(i * 1000, i * 1000 + 40); + } + + // 3 out of 10 lost = 30% + let loss = window.loss_pct(); + assert!((loss - 30.0).abs() < 0.01, "expected ~30%, got {loss}"); + } + + #[test] + fn jitter_calculation() { + let mut window = SlidingWindow::new(10); + + // Send 4 pings with known RTTs: 10, 20, 30, 40 + // Mean = 25, variance = ((15^2 + 5^2 + 5^2 + 15^2) / 4) = (225+25+25+225)/4 = 125 + // std dev = sqrt(125) ≈ 11.18 + let rtts = [10.0, 20.0, 30.0, 40.0]; + for (i, rtt) in rtts.iter().enumerate() { + let sent = (i as u64) * 1000; + window.record_sent(sent); + window.record_pong(sent, sent + *rtt as u64); + } + + let jitter = window.jitter_ms(); + assert!( + (jitter - 11.18).abs() < 0.1, + "expected jitter ~11.18ms, got {jitter}" + ); + } + + #[test] + fn sliding_window_eviction() { + let mut window = SlidingWindow::new(5); + + // Fill window + for i in 0..5 { + window.record_sent(i * 1000); + } + assert_eq!(window.sent.len(), 5); + + // Add one more — oldest should be evicted + window.record_sent(5000); + assert_eq!(window.sent.len(), 5); + assert_eq!(*window.sent.front().unwrap(), 1000); + + // All 5 are unanswered + assert!((window.loss_pct() - 100.0).abs() < 0.01); + } + + #[test] + fn empty_window_edge_cases() { + let window = SlidingWindow::new(60); + assert_eq!(window.loss_pct(), 0.0); + assert_eq!(window.jitter_ms(), 0.0); + assert!(window.latest_rtt().is_none()); + } +} diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 2ee837a..4ded10a 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -179,6 +179,7 @@ pub async fn run_participant( participant_id: ParticipantId, transport: Arc, metrics: Arc, + session_id: &str, ) { let addr = transport.connection().remote_address(); let mut packets_forwarded = 0u64; @@ -196,6 +197,11 @@ pub async fn run_participant( } }; + // Update per-session quality metrics if a quality report is present + if let Some(ref report) = pkt.quality_report { + metrics.update_session_quality(session_id, report); + } + // Get current list of other participants let others = { let mgr = room_mgr.lock().await;