diff --git a/crates/wzp-client/src/analyzer.rs b/crates/wzp-client/src/analyzer.rs index 19748de..7757520 100644 --- a/crates/wzp-client/src/analyzer.rs +++ b/crates/wzp-client/src/analyzer.rs @@ -25,12 +25,12 @@ use wzp_proto::{CodecId, MediaPacket, MediaTransport}; #[derive(Parser)] #[command(name = "wzp-analyzer", version)] struct Args { - /// Relay address (host:port) - relay: String, + /// Relay address (host:port) — required for live mode, ignored with --replay + relay: Option, - /// Room name to observe + /// Room name to observe — required for live mode, ignored with --replay #[arg(short, long)] - room: String, + room: Option, /// Auth token for relay #[arg(long)] @@ -51,6 +51,23 @@ struct Args { /// Disable TUI (print stats to stdout instead) #[arg(long)] no_tui: bool, + + /// Replay a captured .wzp file (offline analysis) + #[arg(long)] + replay: Option, + + /// Generate HTML report (from live session or replay) + #[arg(long)] + html: Option, + + /// Session key hex for decrypting payloads (enables audio decode) + // TODO(#17): Audio decode requires session key + nonce context. + // In SFU mode, payloads are E2E encrypted. Decoding requires + // either: (a) session key from both endpoints, or (b) running + // the analyzer as a trusted participant with its own key exchange. + // For now, header-only analysis provides loss%, jitter, codec stats. + #[arg(long)] + key: Option, } // --------------------------------------------------------------------------- @@ -221,6 +238,278 @@ impl CaptureWriter { } } +// --------------------------------------------------------------------------- +// Capture reader (for replay mode) +// --------------------------------------------------------------------------- + +struct CaptureReader { + reader: std::io::BufReader, + header: serde_json::Value, +} + +impl CaptureReader { + fn open(path: &str) -> anyhow::Result { + use std::io::Read; + let file = std::fs::File::open(path)?; + let mut reader = std::io::BufReader::new(file); + + // Read magic + let mut magic = [0u8; 4]; + reader.read_exact(&mut magic)?; + anyhow::ensure!(&magic == b"WZP\x01", "not a WZP capture file"); + + // Read header + let mut len_buf = [0u8; 4]; + reader.read_exact(&mut len_buf)?; + let header_len = u32::from_le_bytes(len_buf) as usize; + let mut header_bytes = vec![0u8; header_len]; + reader.read_exact(&mut header_bytes)?; + let header: serde_json::Value = serde_json::from_slice(&header_bytes)?; + + Ok(Self { reader, header }) + } + + fn next_packet(&mut self) -> anyhow::Result> { + use std::io::Read; + // Read timestamp + let mut ts_buf = [0u8; 8]; + match self.reader.read_exact(&mut ts_buf) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => return Err(e.into()), + } + let timestamp_us = u64::from_le_bytes(ts_buf); + + // Read packet + let mut len_buf = [0u8; 4]; + self.reader.read_exact(&mut len_buf)?; + let pkt_len = u32::from_le_bytes(len_buf) as usize; + let mut pkt_bytes = vec![0u8; pkt_len]; + self.reader.read_exact(&mut pkt_bytes)?; + + let pkt = MediaPacket::from_bytes(bytes::Bytes::from(pkt_bytes)) + .ok_or_else(|| anyhow::anyhow!("malformed packet in capture"))?; + + Ok(Some((timestamp_us, pkt))) + } +} + +// --------------------------------------------------------------------------- +// Timeline entry (for HTML report generation) +// --------------------------------------------------------------------------- + +struct TimelineEntry { + timestamp_us: u64, + stream_id: usize, + #[allow(dead_code)] + codec: CodecId, + #[allow(dead_code)] + seq: u16, + #[allow(dead_code)] + payload_len: usize, + loss_pct: f64, + jitter_ms: f64, +} + +// --------------------------------------------------------------------------- +// Replay mode (#15) +// --------------------------------------------------------------------------- + +async fn run_replay(path: &str, args: &Args) -> anyhow::Result<()> { + let mut reader = CaptureReader::open(path)?; + eprintln!( + "Replaying: {} (room: {})", + path, + reader + .header + .get("room") + .and_then(|v| v.as_str()) + .unwrap_or("?") + ); + + let mut participants: Vec = Vec::new(); + let mut total_packets: u64 = 0; + let start = Instant::now(); + let mut timeline: Vec = Vec::new(); + + while let Some((ts_us, pkt)) = reader.next_packet()? { + let now = Instant::now(); + let idx = find_or_create_participant(&mut participants, pkt.header.seq, pkt.header.codec_id); + participants[idx].ingest(&pkt, now); + total_packets += 1; + + // Record for HTML timeline + timeline.push(TimelineEntry { + timestamp_us: ts_us, + stream_id: idx, + codec: pkt.header.codec_id, + seq: pkt.header.seq, + payload_len: pkt.payload.len(), + loss_pct: participants[idx].loss_percent(), + jitter_ms: participants[idx].jitter_ms, + }); + } + + print_summary(&participants, total_packets, start.elapsed()); + + // Generate HTML if requested + if let Some(html_path) = &args.html { + generate_html_report(html_path, &participants, &timeline, total_packets, &reader.header)?; + eprintln!("HTML report: {}", html_path); + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// HTML report generation (#16) +// --------------------------------------------------------------------------- + +fn generate_html_report( + path: &str, + participants: &[ParticipantStats], + timeline: &[TimelineEntry], + total_packets: u64, + capture_header: &serde_json::Value, +) -> anyhow::Result<()> { + use std::io::Write as _; + let mut f = std::fs::File::create(path)?; + + let room = capture_header + .get("room") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let start_time = capture_header + .get("start_time") + .and_then(|v| v.as_str()) + .unwrap_or("?"); + + // Build per-stream loss/jitter timeline data for Chart.js + // Sample every 1 second (group timeline entries by second) + let max_ts = timeline.last().map(|e| e.timestamp_us).unwrap_or(0); + let duration_secs = (max_ts / 1_000_000) + 1; + + let mut loss_data: std::collections::HashMap> = + std::collections::HashMap::new(); + let mut jitter_data: std::collections::HashMap> = + std::collections::HashMap::new(); + + for stream_id in 0..participants.len() { + loss_data.insert(stream_id, vec![0.0; duration_secs as usize]); + jitter_data.insert(stream_id, vec![0.0; duration_secs as usize]); + } + + for entry in timeline { + let sec = (entry.timestamp_us / 1_000_000) as usize; + if sec < duration_secs as usize { + if let Some(losses) = loss_data.get_mut(&entry.stream_id) { + losses[sec] = entry.loss_pct; + } + if let Some(jitters) = jitter_data.get_mut(&entry.stream_id) { + jitters[sec] = entry.jitter_ms; + } + } + } + + let colors = [ + "#e74c3c", "#3498db", "#2ecc71", "#f39c12", "#9b59b6", "#1abc9c", + ]; + + // Build dataset JSON for charts + let mut loss_datasets = String::new(); + let mut jitter_datasets = String::new(); + for (i, p) in participants.iter().enumerate() { + let name = p.display_name(); + let color = colors[i % colors.len()]; + let loss_vals = loss_data + .get(&i) + .map(|v| format!("{:?}", v)) + .unwrap_or_default(); + let jitter_vals = jitter_data + .get(&i) + .map(|v| format!("{:?}", v)) + .unwrap_or_default(); + + loss_datasets.push_str(&format!( + "{{ label: '{}', data: {}, borderColor: '{}', fill: false }},\n", + name, loss_vals, color + )); + jitter_datasets.push_str(&format!( + "{{ label: '{}', data: {}, borderColor: '{}', fill: false }},\n", + name, jitter_vals, color + )); + } + + let labels: Vec = (0..duration_secs).map(|s| format!("{}s", s)).collect(); + let labels_json = format!("{:?}", labels); + + // Summary table rows + let mut summary_rows = String::new(); + for p in participants { + summary_rows.push_str(&format!( + "{}{:?}{}{:.1}%{:.0}ms{}\n", + p.display_name(), + p.codec, + p.packets, + p.loss_percent(), + p.jitter_ms, + p.codec_switches + )); + } + + write!( + f, + r#" + + +WZP Call Report — {room} + + + +

WZP Call Quality Report

+

Room: {room} | Start: {start_time} | Packets: {total_packets} | Duration: {duration_secs}s

+ +

Participant Summary

+ + +{summary_rows} +
NameCodecPacketsLossJitterCodec Switches
+ +

Packet Loss Over Time

+
+ +

Jitter Over Time

+
+ + +"# + )?; + + Ok(()) +} + // --------------------------------------------------------------------------- // No-TUI mode (print stats to stdout periodically) // --------------------------------------------------------------------------- @@ -488,10 +777,33 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); // Only init tracing subscriber in no-tui mode (it would corrupt the TUI otherwise) - if args.no_tui { + if args.no_tui || args.replay.is_some() { tracing_subscriber::fmt().init(); } + if let Some(ref key) = args.key { + eprintln!( + "Note: --key provided ({} chars) but audio decode is not yet implemented.", + key.len() + ); + eprintln!(" Header-only analysis (loss%, jitter, codec stats) will proceed."); + } + + // Replay mode: offline analysis of a .wzp capture file + if let Some(ref replay_path) = args.replay { + return run_replay(replay_path, &args).await; + } + + // Live mode requires relay and room + let relay = args + .relay + .as_deref() + .ok_or_else(|| anyhow::anyhow!("relay address required for live mode (use --replay for offline)"))?; + let room = args + .room + .as_deref() + .ok_or_else(|| anyhow::anyhow!("--room required for live mode (use --replay for offline)"))?; + // TLS crypto provider let _ = rustls::crypto::ring::default_provider().install_default(); @@ -510,7 +822,7 @@ async fn main() -> anyhow::Result<()> { }; // Connect to relay - let relay_addr: std::net::SocketAddr = args.relay.parse()?; + let relay_addr: std::net::SocketAddr = relay.parse()?; let bind_addr: std::net::SocketAddr = if relay_addr.is_ipv6() { "[::]:0".parse()? } else { @@ -518,7 +830,7 @@ async fn main() -> anyhow::Result<()> { }; let endpoint = wzp_transport::create_endpoint(bind_addr, None)?; let client_config = wzp_transport::client_config(); - let conn = wzp_transport::connect(&endpoint, relay_addr, &args.room, client_config).await?; + let conn = wzp_transport::connect(&endpoint, relay_addr, room, client_config).await?; let transport = Arc::new(wzp_transport::QuinnTransport::new(conn)); // Crypto handshake @@ -537,7 +849,7 @@ async fn main() -> anyhow::Result<()> { let mut capture_writer = args .capture .as_ref() - .map(|path| CaptureWriter::new(path, &args.room, &args.relay)) + .map(|path| CaptureWriter::new(path, room, relay)) .transpose()?; // Duration timeout