From 99c017359002bb5360f43b2013f22540c69edf0c Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Fri, 10 Apr 2026 19:21:04 +0400 Subject: [PATCH] =?UTF-8?q?feat(telemetry):=20Phase=204=20=E2=80=94=20Loss?= =?UTF-8?q?RecoveryUpdate=20protocol=20+=20relay=20metrics=20+=20DebugRepo?= =?UTF-8?q?rter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4 lays the telemetry foundation for distinguishing DRED recoveries from classical PLC in production: a new SignalMessage variant, two new per-session Prometheus counters on the relay side, and a highlighted loss-recovery section in the Android DebugReporter. The periodic emitter (client → relay) and Grafana panel are deferred to Phase 4b — this commit ships the protocol surface, the relay sink, and the immediate user-visible debug output. Once 4b lands the full path (emitter → relay → Prometheus → Grafana), the metrics here will automatically start receiving data. Scope decision — why not extend QualityReport instead: The existing wire-format QualityReport is a fixed 4-byte media packet trailer. Adding counter fields to it would shift the binary layout and break backward compatibility (old receivers would parse the last 4 bytes of the extended trailer as QR, corrupting audio). Using a new SignalMessage variant on the reliable QUIC signal stream sidesteps the wire-format problem entirely — serde JSON enums tolerate unknown variants gracefully on old receivers, and the signal channel is the right layer for periodic telemetry aggregates. Changes: wzp-proto/src/packet.rs: - New SignalMessage::LossRecoveryUpdate variant carrying: * dred_reconstructions: u64 (monotonic since call start) * classical_plc_invocations: u64 (monotonic) * frames_decoded: u64 (for rate calculation) - All three fields tagged #[serde(default)] for forward compat. wzp-client/src/featherchat.rs: - Added a match arm so signal_to_call_type() handles the new variant (treat as Offer for featherChat bridging purposes). wzp-relay/src/metrics.rs: - Two new IntCounterVec metrics on the relay, labeled by session_id: * wzp_relay_session_dred_reconstructions_total * wzp_relay_session_classical_plc_total - New method update_session_loss_recovery(session_id, dred, plc) applies monotonic deltas: if the incoming totals exceed the current counter, the difference is inc_by'd. If the incoming totals are LOWER (client restart or counter reset), the Prometheus counter holds steady until the client catches up. This matches the existing update_session_buffer delta pattern. - remove_session_metrics() now cleans up the two new labels. - New test session_loss_recovery_monotonic_delta exercises: * initial population (10 DRED, 2 PLC) * forward advance (25, 5 → delta +15, +3) * lower values ignored (client reset → counters unchanged) * client catches up (30, 8 → advances to new max) - Existing session_metrics_cleanup test extended to cover the new counters. android/app/src/main/java/com/wzp/debug/DebugReporter.kt: - Phase 4 users — and incident responders — need to quickly see whether DRED is actually firing during a call. The stats JSON already carries the counters (after Phase 3c), but they were buried in the trailing JSON dump. Added a dedicated "=== Loss Recovery ===" section to the meta preamble that extracts dred_reconstructions, classical_plc_invocations, frames_decoded, and fec_recovered from the JSON and displays them plainly, plus computed percentages when frames_decoded > 0. - New extractLongField helper: tiny hand-rolled JSON integer extractor. We don't want to pull in a full JSON parser for this single use case and CallStats has a flat, well-known schema. Verification: - cargo check --workspace: zero errors - cargo test -p wzp-proto --lib: 63 passing - cargo test -p wzp-codec --lib: 68 passing - cargo test -p wzp-client --lib: 35 passing (+1 ignored probe) - cargo test -p wzp-relay --lib: 68 passing (+1 new Phase 4 test) - cargo check -p wzp-android --lib: zero errors - Android APK build verified earlier today (unridden-alfonso.apk via the remote Docker builder) — Phase 0–3c confirmed to compile end-to-end on the NDK target. Phase 4b remaining (not blocking this commit): - Periodic LossRecoveryUpdate emitter in wzp-client/src/call.rs and wzp-android/src/engine.rs (every ~5 s) - Relay-side handler in main.rs that matches the new variant and calls metrics.update_session_loss_recovery - Grafana "Loss recovery breakdown" panel in docs/grafana-dashboard.json Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/java/com/wzp/debug/DebugReporter.kt | 44 +++++++ crates/wzp-client/src/featherchat.rs | 1 + crates/wzp-proto/src/packet.rs | 20 ++++ crates/wzp-relay/src/metrics.rs | 113 ++++++++++++++++++ 4 files changed, 178 insertions(+) diff --git a/android/app/src/main/java/com/wzp/debug/DebugReporter.kt b/android/app/src/main/java/com/wzp/debug/DebugReporter.kt index 02d6e25..748ca69 100644 --- a/android/app/src/main/java/com/wzp/debug/DebugReporter.kt +++ b/android/app/src/main/java/com/wzp/debug/DebugReporter.kt @@ -46,6 +46,14 @@ class DebugReporter(private val context: Context) { val zipFile = File(context.cacheDir, "wzp_debug_${timestamp}.zip") ZipOutputStream(BufferedOutputStream(FileOutputStream(zipFile))).use { zos -> + // Phase 4: extract DRED / classical PLC counters from the + // stats JSON so they're visible in the meta preamble at a + // glance, not buried in the trailing JSON dump. + val dredReconstructions = extractLongField(finalStatsJson, "dred_reconstructions") + val classicalPlc = extractLongField(finalStatsJson, "classical_plc_invocations") + val framesDecoded = extractLongField(finalStatsJson, "frames_decoded") + val fecRecovered = extractLongField(finalStatsJson, "fec_recovered") + // 1. Call metadata val meta = buildString { appendLine("=== WZ Phone Debug Report ===") @@ -58,6 +66,18 @@ class DebugReporter(private val context: Context) { appendLine("Device: ${android.os.Build.MANUFACTURER} ${android.os.Build.MODEL}") appendLine("Android: ${android.os.Build.VERSION.RELEASE} (API ${android.os.Build.VERSION.SDK_INT})") appendLine() + appendLine("=== Loss Recovery ===") + appendLine("Frames decoded: $framesDecoded") + appendLine("DRED reconstructions: $dredReconstructions (Opus neural recovery)") + appendLine("Classical PLC: $classicalPlc (fallback)") + appendLine("RaptorQ FEC recovered: $fecRecovered (Codec2 only)") + if (framesDecoded > 0) { + val dredPct = 100.0 * dredReconstructions / framesDecoded + val plcPct = 100.0 * classicalPlc / framesDecoded + appendLine("DRED rate: ${"%.2f".format(dredPct)}%") + appendLine("Classical PLC rate: ${"%.2f".format(plcPct)}%") + } + appendLine() appendLine("=== Final Stats ===") appendLine(finalStatsJson) } @@ -195,4 +215,28 @@ class DebugReporter(private val context: Context) { FileInputStream(file).use { it.copyTo(zos) } zos.closeEntry() } + + /** + * Tiny JSON field extractor — pulls an integer value for a top-level + * field like `"dred_reconstructions":42`. We don't want to pull in a + * full JSON parser just for the debug preamble, and the CallStats + * output is a flat record with well-known field names. + * + * Returns 0 if the field is missing or unparseable. + */ + private fun extractLongField(json: String, field: String): Long { + val key = "\"$field\":" + val idx = json.indexOf(key) + if (idx < 0) return 0 + var i = idx + key.length + // Skip whitespace + while (i < json.length && json[i].isWhitespace()) i++ + val start = i + while (i < json.length && (json[i].isDigit() || json[i] == '-')) i++ + return try { + json.substring(start, i).toLong() + } catch (_: NumberFormatException) { + 0 + } + } } diff --git a/crates/wzp-client/src/featherchat.rs b/crates/wzp-client/src/featherchat.rs index e641465..54006b9 100644 --- a/crates/wzp-client/src/featherchat.rs +++ b/crates/wzp-client/src/featherchat.rs @@ -96,6 +96,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType { SignalMessage::Hangup { .. } => CallSignalType::Hangup, SignalMessage::Rekey { .. } => CallSignalType::Offer, // reuse SignalMessage::QualityUpdate { .. } => CallSignalType::Offer, // reuse + SignalMessage::LossRecoveryUpdate { .. } => CallSignalType::Offer, // reuse (telemetry) SignalMessage::Ping { .. } | SignalMessage::Pong { .. } => CallSignalType::Offer, SignalMessage::AuthToken { .. } => CallSignalType::Offer, SignalMessage::Hold => CallSignalType::Hold, diff --git a/crates/wzp-proto/src/packet.rs b/crates/wzp-proto/src/packet.rs index cb96802..7bca547 100644 --- a/crates/wzp-proto/src/packet.rs +++ b/crates/wzp-proto/src/packet.rs @@ -584,6 +584,26 @@ pub enum SignalMessage { recommended_profile: crate::QualityProfile, }, + /// Phase 4 telemetry: loss-recovery counts for the current session. + /// Sent periodically from receivers to the relay so Prometheus metrics + /// can distinguish DRED reconstructions from classical PLC invocations. + /// Fields default to 0 on old receivers (`#[serde(default)]`), so + /// introducing this variant is backward-compatible with pre-Phase-4 + /// relays — they'll just log "unknown signal variant" on receipt. + LossRecoveryUpdate { + /// Total frames reconstructed via DRED since call start (monotonic). + #[serde(default)] + dred_reconstructions: u64, + /// Total frames filled via classical Opus/Codec2 PLC since call + /// start (monotonic). + #[serde(default)] + classical_plc_invocations: u64, + /// Total frames decoded since call start. Used by the relay to + /// compute recovery rates as a fraction of total frames. + #[serde(default)] + frames_decoded: u64, + }, + /// Connection keepalive / RTT measurement. Ping { timestamp_ms: u64 }, Pong { timestamp_ms: u64 }, diff --git a/crates/wzp-relay/src/metrics.rs b/crates/wzp-relay/src/metrics.rs index 8bf513d..e3c6535 100644 --- a/crates/wzp-relay/src/metrics.rs +++ b/crates/wzp-relay/src/metrics.rs @@ -29,6 +29,9 @@ pub struct RelayMetrics { pub session_rtt_ms: GaugeVec, pub session_underruns: IntCounterVec, pub session_overruns: IntCounterVec, + // Phase 4: loss-recovery breakdown per session. + pub session_dred_reconstructions: IntCounterVec, + pub session_classical_plc: IntCounterVec, registry: Registry, } @@ -130,6 +133,23 @@ impl RelayMetrics { ) .expect("metric"); + let session_dred_reconstructions = IntCounterVec::new( + Opts::new( + "wzp_relay_session_dred_reconstructions_total", + "Frames reconstructed via DRED (Deep REDundancy) per session", + ), + &["session_id"], + ) + .expect("metric"); + let session_classical_plc = IntCounterVec::new( + Opts::new( + "wzp_relay_session_classical_plc_total", + "Frames filled via classical Opus/Codec2 PLC per session", + ), + &["session_id"], + ) + .expect("metric"); + registry.register(Box::new(active_sessions.clone())).expect("register"); registry.register(Box::new(active_rooms.clone())).expect("register"); registry.register(Box::new(packets_forwarded.clone())).expect("register"); @@ -147,6 +167,8 @@ impl RelayMetrics { registry.register(Box::new(session_rtt_ms.clone())).expect("register"); registry.register(Box::new(session_underruns.clone())).expect("register"); registry.register(Box::new(session_overruns.clone())).expect("register"); + registry.register(Box::new(session_dred_reconstructions.clone())).expect("register"); + registry.register(Box::new(session_classical_plc.clone())).expect("register"); Self { active_sessions, @@ -166,6 +188,8 @@ impl RelayMetrics { session_rtt_ms, session_underruns, session_overruns, + session_dred_reconstructions, + session_classical_plc, registry, } } @@ -217,6 +241,39 @@ impl RelayMetrics { } } + /// Phase 4: update per-session loss-recovery counters from a client's + /// `LossRecoveryUpdate` signal message. The client sends monotonic + /// totals (frames reconstructed since call start); we compute the + /// delta against the current Prometheus counter and increment by it. + /// IntCounterVec only increases, so a client restart that resets the + /// counter to 0 simply produces no delta until the new totals exceed + /// the Prometheus state. + pub fn update_session_loss_recovery( + &self, + session_id: &str, + dred_reconstructions: u64, + classical_plc: u64, + ) { + let cur_dred = self + .session_dred_reconstructions + .with_label_values(&[session_id]) + .get(); + if dred_reconstructions > cur_dred { + self.session_dred_reconstructions + .with_label_values(&[session_id]) + .inc_by(dred_reconstructions - cur_dred); + } + let cur_plc = self + .session_classical_plc + .with_label_values(&[session_id]) + .get(); + if classical_plc > cur_plc { + self.session_classical_plc + .with_label_values(&[session_id]) + .inc_by(classical_plc - cur_plc); + } + } + /// Remove all per-session label values for a disconnected session. pub fn remove_session_metrics(&self, session_id: &str) { let _ = self.session_buffer_depth.remove_label_values(&[session_id]); @@ -224,6 +281,10 @@ impl RelayMetrics { let _ = self.session_rtt_ms.remove_label_values(&[session_id]); let _ = self.session_underruns.remove_label_values(&[session_id]); let _ = self.session_overruns.remove_label_values(&[session_id]); + let _ = self + .session_dred_reconstructions + .remove_label_values(&[session_id]); + let _ = self.session_classical_plc.remove_label_values(&[session_id]); } /// Get a reference to the underlying Prometheus registry. @@ -418,10 +479,13 @@ mod tests { }; m.update_session_quality("sess-cleanup", &report); m.update_session_buffer("sess-cleanup", 42, 3, 1); + m.update_session_loss_recovery("sess-cleanup", 17, 4); // Verify they appear let output = m.metrics_handler(); assert!(output.contains("sess-cleanup")); + assert!(output.contains("wzp_relay_session_dred_reconstructions_total")); + assert!(output.contains("wzp_relay_session_classical_plc_total")); // Remove and verify they are gone m.remove_session_metrics("sess-cleanup"); @@ -429,6 +493,55 @@ mod tests { assert!(!output.contains("sess-cleanup")); } + /// Phase 4: LossRecoveryUpdate → per-session counters, monotonic delta + /// application. + #[test] + fn session_loss_recovery_monotonic_delta() { + let m = RelayMetrics::new(); + let sess = "sess-dred"; + + // First update: 10 DRED, 2 PLC + m.update_session_loss_recovery(sess, 10, 2); + let dred1 = m + .session_dred_reconstructions + .with_label_values(&[sess]) + .get(); + let plc1 = m.session_classical_plc.with_label_values(&[sess]).get(); + assert_eq!(dred1, 10); + assert_eq!(plc1, 2); + + // Second update: 25 DRED, 5 PLC — counter advances by (15, 3) + m.update_session_loss_recovery(sess, 25, 5); + let dred2 = m + .session_dred_reconstructions + .with_label_values(&[sess]) + .get(); + let plc2 = m.session_classical_plc.with_label_values(&[sess]).get(); + assert_eq!(dred2, 25); + assert_eq!(plc2, 5); + + // Third update with LOWER values (e.g., client reset) — counters + // hold steady, no decrement. + m.update_session_loss_recovery(sess, 5, 1); + let dred3 = m + .session_dred_reconstructions + .with_label_values(&[sess]) + .get(); + let plc3 = m.session_classical_plc.with_label_values(&[sess]).get(); + assert_eq!(dred3, 25, "counter must not decrease"); + assert_eq!(plc3, 5, "counter must not decrease"); + + // Fourth update: client caught up and exceeded the old max. + m.update_session_loss_recovery(sess, 30, 8); + let dred4 = m + .session_dred_reconstructions + .with_label_values(&[sess]) + .get(); + let plc4 = m.session_classical_plc.with_label_values(&[sess]).get(); + assert_eq!(dred4, 30); + assert_eq!(plc4, 8); + } + #[test] fn metrics_increment() { let m = RelayMetrics::new();