From 20922455bd6e556874e44fc6dfd1c7cfb5ecaf76 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 6 Apr 2026 07:38:56 +0000 Subject: [PATCH] fix: send task crash on QUIC congestion + AEC toggle + debug reporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: send_media() returns Err(Blocked) when QUIC congestion window is full. The send task treated ANY send error as fatal (break), killing the entire call. Now send errors drop the packet and continue. Also hardened recv task to survive transient errors and added health logging (recv gap tracking, periodic stats) to both send and recv. Relay: added comprehensive debug logging — recv gaps, lock contention, forward latency, send errors — all per-participant with 5s stats. Other changes: - AEC toggle in Settings (persisted, applied on next call) - Debug report: records call audio (WAV), RMS histogram (CSV), logcat, stats. Emailed as zip via Android share intent after call ends. - Replaced LinearProgressIndicator with Box (compose version compat) - FileProvider for sharing debug zip attachments Co-Authored-By: Claude Opus 4.6 (1M context) --- android/app/src/main/AndroidManifest.xml | 10 + .../main/java/com/wzp/audio/AudioPipeline.kt | 142 ++++++++++--- .../java/com/wzp/data/SettingsRepository.kt | 6 + .../main/java/com/wzp/debug/DebugReporter.kt | 189 ++++++++++++++++++ .../main/java/com/wzp/ui/call/CallActivity.kt | 50 +++++ .../java/com/wzp/ui/call/CallViewModel.kt | 75 +++++++ .../main/java/com/wzp/ui/call/InCallScreen.kt | 98 ++++++++- .../com/wzp/ui/settings/SettingsScreen.kt | 29 ++- android/app/src/main/res/xml/file_paths.xml | 4 + crates/wzp-android/src/engine.rs | 114 +++++++++-- crates/wzp-relay/src/room.rs | 176 ++++++++++++++-- 11 files changed, 824 insertions(+), 69 deletions(-) create mode 100644 android/app/src/main/java/com/wzp/debug/DebugReporter.kt create mode 100644 android/app/src/main/res/xml/file_paths.xml diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml index 0eea970..166014a 100644 --- a/android/app/src/main/AndroidManifest.xml +++ b/android/app/src/main/AndroidManifest.xml @@ -29,5 +29,15 @@ android:name="com.wzp.service.CallService" android:foregroundServiceType="microphone" android:exported="false" /> + + + + diff --git a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt index ca4987a..7126f66 100644 --- a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt +++ b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt @@ -13,7 +13,14 @@ import android.media.audiofx.NoiseSuppressor import android.util.Log import androidx.core.content.ContextCompat import com.wzp.engine.WzpEngine +import java.io.BufferedOutputStream +import java.io.File +import java.io.FileOutputStream +import java.io.OutputStreamWriter +import java.nio.ByteBuffer +import java.nio.ByteOrder import kotlin.math.pow +import kotlin.math.sqrt /** * Audio pipeline that captures mic audio and plays received audio using @@ -45,9 +52,17 @@ class AudioPipeline(private val context: Context) { /** Capture (mic) gain in dB. 0 = unity. */ @Volatile var captureGainDb: Float = 0f + /** Whether to attach hardware AEC. Must be set before start(). */ + var aecEnabled: Boolean = true + /** Enable debug recording of PCM + RMS histogram to cache dir. */ + var debugRecording: Boolean = true private var captureThread: Thread? = null private var playoutThread: Thread? = null + private val debugDir: File by lazy { + File(context.cacheDir, "wzp_debug").also { it.mkdirs() } + } + fun start(engine: WzpEngine) { if (running) return running = true @@ -91,6 +106,15 @@ class AudioPipeline(private val context: Context) { } } + private fun computeRms(pcm: ShortArray, count: Int): Int { + var sumSq = 0.0 + for (i in 0 until count) { + val s = pcm[i].toDouble() + sumSq += s * s + } + return sqrt(sumSq / count).toInt() + } + private fun parkThread() { try { Thread.sleep(Long.MAX_VALUE) @@ -129,53 +153,86 @@ class AudioPipeline(private val context: Context) { return } - // Attach hardware AEC if available + // Attach hardware AEC if available and enabled in settings var aec: AcousticEchoCanceler? = null - if (AcousticEchoCanceler.isAvailable()) { - try { - aec = AcousticEchoCanceler.create(recorder.audioSessionId) - aec?.enabled = true - Log.i(TAG, "AEC enabled (session=${recorder.audioSessionId})") - } catch (e: Exception) { - Log.w(TAG, "AEC init failed: ${e.message}") + var ns: NoiseSuppressor? = null + if (aecEnabled) { + if (AcousticEchoCanceler.isAvailable()) { + try { + aec = AcousticEchoCanceler.create(recorder.audioSessionId) + aec?.enabled = true + Log.i(TAG, "AEC enabled (session=${recorder.audioSessionId})") + } catch (e: Exception) { + Log.w(TAG, "AEC init failed: ${e.message}") + } + } else { + Log.w(TAG, "AEC not available on this device") + } + + // Attach hardware noise suppressor if available + if (NoiseSuppressor.isAvailable()) { + try { + ns = NoiseSuppressor.create(recorder.audioSessionId) + ns?.enabled = true + Log.i(TAG, "NoiseSuppressor enabled") + } catch (e: Exception) { + Log.w(TAG, "NoiseSuppressor init failed: ${e.message}") + } } } else { - Log.w(TAG, "AEC not available on this device") - } - - // Attach hardware noise suppressor if available - var ns: NoiseSuppressor? = null - if (NoiseSuppressor.isAvailable()) { - try { - ns = NoiseSuppressor.create(recorder.audioSessionId) - ns?.enabled = true - Log.i(TAG, "NoiseSuppressor enabled") - } catch (e: Exception) { - Log.w(TAG, "NoiseSuppressor init failed: ${e.message}") - } + Log.i(TAG, "AEC disabled by user setting") } recorder.startRecording() Log.i(TAG, "capture started: ${SAMPLE_RATE}Hz mono, buf=$bufSize, aec=${aec?.enabled}, ns=${ns?.enabled}") val pcm = ShortArray(FRAME_SAMPLES) + // Debug: PCM file + RMS CSV + var pcmOut: BufferedOutputStream? = null + var rmsCsv: OutputStreamWriter? = null + val byteConv = ByteBuffer.allocate(FRAME_SAMPLES * 2).order(ByteOrder.LITTLE_ENDIAN) + var frameIdx = 0L + if (debugRecording) { + try { + pcmOut = BufferedOutputStream(FileOutputStream(File(debugDir, "capture.pcm")), 65536) + rmsCsv = OutputStreamWriter(FileOutputStream(File(debugDir, "capture_rms.csv"))) + rmsCsv.write("frame,time_ms,rms\n") + } catch (e: Exception) { + Log.w(TAG, "debug recording init failed: ${e.message}") + } + } try { while (running) { val read = recorder.read(pcm, 0, FRAME_SAMPLES) if (read > 0) { applyGain(pcm, read, captureGainDb) engine.writeAudio(pcm) + + // Debug: write raw PCM + RMS + if (pcmOut != null) { + byteConv.clear() + for (i in 0 until read) byteConv.putShort(pcm[i]) + pcmOut.write(byteConv.array(), 0, read * 2) + } + if (rmsCsv != null) { + val rms = computeRms(pcm, read) + val timeMs = frameIdx * FRAME_SAMPLES * 1000L / SAMPLE_RATE + rmsCsv.write("$frameIdx,$timeMs,$rms\n") + } + frameIdx++ } else if (read < 0) { Log.e(TAG, "AudioRecord.read error: $read") break } } } finally { + pcmOut?.close() + rmsCsv?.close() recorder.stop() aec?.release() ns?.release() recorder.release() - Log.i(TAG, "capture stopped") + Log.i(TAG, "capture stopped (frames=$frameIdx)") } } @@ -211,24 +268,57 @@ class AudioPipeline(private val context: Context) { Log.i(TAG, "playout started: ${SAMPLE_RATE}Hz mono, buf=$bufSize") val pcm = ShortArray(FRAME_SAMPLES) - val silence = ShortArray(FRAME_SAMPLES) // pre-allocated silence + val silence = ShortArray(FRAME_SAMPLES) + // Debug: PCM file + RMS CSV for playout + var pcmOut: BufferedOutputStream? = null + var rmsCsv: OutputStreamWriter? = null + val byteConv = ByteBuffer.allocate(FRAME_SAMPLES * 2).order(ByteOrder.LITTLE_ENDIAN) + var frameIdx = 0L + if (debugRecording) { + try { + pcmOut = BufferedOutputStream(FileOutputStream(File(debugDir, "playout.pcm")), 65536) + rmsCsv = OutputStreamWriter(FileOutputStream(File(debugDir, "playout_rms.csv"))) + rmsCsv.write("frame,time_ms,rms\n") + } catch (e: Exception) { + Log.w(TAG, "debug playout recording init failed: ${e.message}") + } + } try { while (running) { val read = engine.readAudio(pcm) if (read >= FRAME_SAMPLES) { applyGain(pcm, read, playoutGainDb) track.write(pcm, 0, read) + + // Debug: write raw PCM + RMS + if (pcmOut != null) { + byteConv.clear() + for (i in 0 until read) byteConv.putShort(pcm[i]) + pcmOut.write(byteConv.array(), 0, read * 2) + } + if (rmsCsv != null) { + val rms = computeRms(pcm, read) + val timeMs = frameIdx * FRAME_SAMPLES * 1000L / SAMPLE_RATE + rmsCsv.write("$frameIdx,$timeMs,$rms\n") + } + frameIdx++ } else { - // Not enough decoded audio — write silence to keep stream alive track.write(silence, 0, FRAME_SAMPLES) - // Sleep briefly to avoid busy-spinning + // Log silence frames to RMS as 0 + if (rmsCsv != null) { + val timeMs = frameIdx * FRAME_SAMPLES * 1000L / SAMPLE_RATE + rmsCsv.write("$frameIdx,$timeMs,0\n") + } + frameIdx++ Thread.sleep(5) } } } finally { + pcmOut?.close() + rmsCsv?.close() track.stop() track.release() - Log.i(TAG, "playout stopped") + Log.i(TAG, "playout stopped (frames=$frameIdx)") } } } diff --git a/android/app/src/main/java/com/wzp/data/SettingsRepository.kt b/android/app/src/main/java/com/wzp/data/SettingsRepository.kt index 2d2162c..28c41e9 100644 --- a/android/app/src/main/java/com/wzp/data/SettingsRepository.kt +++ b/android/app/src/main/java/com/wzp/data/SettingsRepository.kt @@ -27,6 +27,7 @@ class SettingsRepository(context: Context) { private const val KEY_CAPTURE_GAIN = "capture_gain_db" private const val KEY_PREFER_IPV6 = "prefer_ipv6" private const val KEY_IDENTITY_SEED = "identity_seed_hex" + private const val KEY_AEC_ENABLED = "aec_enabled" } // --- Servers --- @@ -112,6 +113,11 @@ class SettingsRepository(context: Context) { fun savePreferIPv6(prefer: Boolean) { prefs.edit().putBoolean(KEY_PREFER_IPV6, prefer).apply() } fun loadPreferIPv6(): Boolean = prefs.getBoolean(KEY_PREFER_IPV6, false) + // --- AEC --- + + fun saveAecEnabled(enabled: Boolean) { prefs.edit().putBoolean(KEY_AEC_ENABLED, enabled).apply() } + fun loadAecEnabled(): Boolean = prefs.getBoolean(KEY_AEC_ENABLED, true) + // --- Identity seed --- /** diff --git a/android/app/src/main/java/com/wzp/debug/DebugReporter.kt b/android/app/src/main/java/com/wzp/debug/DebugReporter.kt new file mode 100644 index 0000000..38c32d7 --- /dev/null +++ b/android/app/src/main/java/com/wzp/debug/DebugReporter.kt @@ -0,0 +1,189 @@ +package com.wzp.debug + +import android.content.Context +import android.util.Log +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import java.io.BufferedOutputStream +import java.io.ByteArrayOutputStream +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.text.SimpleDateFormat +import java.util.Date +import java.util.Locale +import java.util.zip.ZipEntry +import java.util.zip.ZipOutputStream + +/** + * Collects call debug data (audio recordings, logs, histograms, stats) + * into a zip file for email sharing. + */ +class DebugReporter(private val context: Context) { + + companion object { + private const val TAG = "DebugReporter" + private const val SAMPLE_RATE = 48000 + } + + /** + * Build a zip with all debug data. + * Returns the zip File on success, or null on failure. + */ + suspend fun collectZip( + callDurationSecs: Double, + finalStatsJson: String, + aecEnabled: Boolean, + alias: String, + server: String, + room: String + ): File? = withContext(Dispatchers.IO) { + try { + val debugDir = File(context.cacheDir, "wzp_debug") + val timestamp = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.US).format(Date()) + val zipFile = File(context.cacheDir, "wzp_debug_${timestamp}.zip") + + ZipOutputStream(BufferedOutputStream(FileOutputStream(zipFile))).use { zos -> + // 1. Call metadata + val meta = buildString { + appendLine("=== WZ Phone Debug Report ===") + appendLine("Timestamp: $timestamp") + appendLine("Alias: $alias") + appendLine("Server: $server") + appendLine("Room: $room") + appendLine("Duration: ${"%.1f".format(callDurationSecs)}s") + appendLine("AEC: ${if (aecEnabled) "ON" else "OFF"}") + appendLine("Device: ${android.os.Build.MANUFACTURER} ${android.os.Build.MODEL}") + appendLine("Android: ${android.os.Build.VERSION.RELEASE} (API ${android.os.Build.VERSION.SDK_INT})") + appendLine() + appendLine("=== Final Stats ===") + appendLine(finalStatsJson) + } + addTextEntry(zos, "meta.txt", meta) + + // 2. Logcat — WZP-related tags + val logcat = collectLogcat() + addTextEntry(zos, "logcat.txt", logcat) + + // 3. Capture audio (mic) → WAV + val captureRaw = File(debugDir, "capture.pcm") + if (captureRaw.exists() && captureRaw.length() > 0) { + addWavEntry(zos, "capture.wav", captureRaw) + Log.i(TAG, "capture.pcm: ${captureRaw.length()} bytes -> WAV") + } + + // 4. Playout audio (speaker) → WAV + val playoutRaw = File(debugDir, "playout.pcm") + if (playoutRaw.exists() && playoutRaw.length() > 0) { + addWavEntry(zos, "playout.wav", playoutRaw) + Log.i(TAG, "playout.pcm: ${playoutRaw.length()} bytes -> WAV") + } + + // 5. RMS histogram CSV + val captureHist = File(debugDir, "capture_rms.csv") + if (captureHist.exists()) addFileEntry(zos, "capture_rms.csv", captureHist) + val playoutHist = File(debugDir, "playout_rms.csv") + if (playoutHist.exists()) addFileEntry(zos, "playout_rms.csv", playoutHist) + } + + Log.i(TAG, "zip created: ${zipFile.length()} bytes (${zipFile.length() / 1024}KB)") + + // Clean up raw debug files (keep zip) + debugDir.listFiles()?.forEach { it.delete() } + + zipFile + } catch (e: Exception) { + Log.e(TAG, "debug report failed", e) + null + } + } + + /** Clean up any leftover debug files from a previous session. */ + fun prepareForCall() { + val debugDir = File(context.cacheDir, "wzp_debug") + if (debugDir.exists()) { + debugDir.listFiles()?.forEach { it.delete() } + } + debugDir.mkdirs() + // Also clean up old zip files + context.cacheDir.listFiles()?.filter { it.name.startsWith("wzp_debug_") }?.forEach { it.delete() } + } + + private fun collectLogcat(): String { + return try { + val process = Runtime.getRuntime().exec( + arrayOf( + "logcat", "-d", + "-t", "5000", + "--format", "threadtime" + ) + ) + val output = process.inputStream.bufferedReader().readText() + process.waitFor() + output.lines() + .filter { line -> + line.contains("wzp", ignoreCase = true) || + line.contains("WzpEngine") || + line.contains("AudioPipeline") || + line.contains("WzpCall") || + line.contains("CallService") || + line.contains("AudioTrack") || + line.contains("AudioRecord") || + line.contains("AcousticEchoCanceler") || + line.contains("NoiseSuppressor") || + line.contains("FATAL") || + line.contains("ANR") || + line.contains("AudioFlinger") || + line.contains("DebugReporter") || + line.contains("QUIC") || + line.contains("quinn") + } + .joinToString("\n") + } catch (e: Exception) { + "Failed to collect logcat: ${e.message}" + } + } + + private fun addWavEntry(zos: ZipOutputStream, name: String, pcmFile: File) { + val dataSize = pcmFile.length().toInt() + val byteRate = SAMPLE_RATE * 1 * 16 / 8 + val blockAlign = 1 * 16 / 8 + + zos.putNextEntry(ZipEntry(name)) + + // Write WAV header (44 bytes) + val header = ByteBuffer.allocate(44).order(ByteOrder.LITTLE_ENDIAN) + header.put("RIFF".toByteArray()) + header.putInt(36 + dataSize) + header.put("WAVE".toByteArray()) + header.put("fmt ".toByteArray()) + header.putInt(16) + header.putShort(1) // PCM + header.putShort(1) // mono + header.putInt(SAMPLE_RATE) + header.putInt(byteRate) + header.putShort(blockAlign.toShort()) + header.putShort(16) // bits per sample + header.put("data".toByteArray()) + header.putInt(dataSize) + zos.write(header.array()) + + // Stream PCM data directly (avoids loading entire file into memory) + FileInputStream(pcmFile).use { it.copyTo(zos) } + zos.closeEntry() + } + + private fun addTextEntry(zos: ZipOutputStream, name: String, content: String) { + zos.putNextEntry(ZipEntry(name)) + zos.write(content.toByteArray()) + zos.closeEntry() + } + + private fun addFileEntry(zos: ZipOutputStream, name: String, file: File) { + zos.putNextEntry(ZipEntry(name)) + FileInputStream(file).use { it.copyTo(zos) } + zos.closeEntry() + } +} diff --git a/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt b/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt index a2e46a9..f651ae2 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt @@ -1,8 +1,10 @@ package com.wzp.ui.call import android.Manifest +import android.content.Intent import android.content.pm.PackageManager import android.os.Bundle +import android.util.Log import android.widget.Toast import androidx.activity.ComponentActivity import androidx.activity.compose.setContent @@ -21,7 +23,12 @@ import androidx.compose.runtime.remember import androidx.compose.runtime.setValue import androidx.compose.ui.platform.LocalContext import androidx.core.content.ContextCompat +import androidx.core.content.FileProvider +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.lifecycleScope +import androidx.lifecycle.repeatOnLifecycle import com.wzp.ui.settings.SettingsScreen +import kotlinx.coroutines.launch /** * Main activity hosting the in-call Compose UI. @@ -31,6 +38,10 @@ import com.wzp.ui.settings.SettingsScreen */ class CallActivity : ComponentActivity() { + companion object { + private const val TAG = "CallActivity" + } + private val viewModel: CallViewModel by viewModels() private val audioPermissionLauncher = registerForActivityResult( @@ -69,6 +80,45 @@ class CallActivity : ComponentActivity() { ) { audioPermissionLauncher.launch(Manifest.permission.RECORD_AUDIO) } + + // Watch for debug zip ready → launch email intent + lifecycleScope.launch { + repeatOnLifecycle(Lifecycle.State.STARTED) { + viewModel.debugZipReady.collect { zipFile -> + if (zipFile != null && zipFile.exists()) { + Log.i(TAG, "debug zip ready: ${zipFile.absolutePath} (${zipFile.length()} bytes)") + launchEmailIntent(zipFile) + viewModel.onDebugReportSent() + } + } + } + } + } + + private fun launchEmailIntent(zipFile: java.io.File) { + try { + val authority = "${applicationContext.packageName}.fileprovider" + Log.i(TAG, "FileProvider authority: $authority, file: ${zipFile.absolutePath}") + val uri = FileProvider.getUriForFile(this, authority, zipFile) + Log.i(TAG, "FileProvider URI: $uri") + + val intent = Intent(Intent.ACTION_SEND).apply { + type = "message/rfc822" + putExtra(Intent.EXTRA_EMAIL, arrayOf("manwefarm@gmail.com")) + putExtra(Intent.EXTRA_SUBJECT, "WZ Phone Debug Report - ${zipFile.name}") + putExtra( + Intent.EXTRA_TEXT, + "Debug report attached.\n\nContains: call recordings (WAV), RMS histograms (CSV), logcat, stats." + ) + putExtra(Intent.EXTRA_STREAM, uri) + addFlags(Intent.FLAG_GRANT_READ_URI_PERMISSION) + } + startActivity(Intent.createChooser(intent, "Send debug report")) + Log.i(TAG, "email intent launched") + } catch (e: Exception) { + Log.e(TAG, "email intent failed", e) + Toast.makeText(this, "Failed to launch email: ${e.message}", Toast.LENGTH_LONG).show() + } } override fun onDestroy() { diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt index 7a3f53d..30bd7e4 100644 --- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt +++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt @@ -7,6 +7,7 @@ import androidx.lifecycle.viewModelScope import com.wzp.audio.AudioPipeline import com.wzp.audio.AudioRouteManager import com.wzp.data.SettingsRepository +import com.wzp.debug.DebugReporter import com.wzp.engine.CallStats import com.wzp.service.CallService import com.wzp.engine.WzpCallback @@ -18,6 +19,7 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import java.io.File import java.net.Inet4Address import java.net.Inet6Address import java.net.InetAddress @@ -33,6 +35,10 @@ class CallViewModel : ViewModel(), WzpCallback { private var audioStarted = false private var appContext: Context? = null private var settings: SettingsRepository? = null + private var debugReporter: DebugReporter? = null + private var lastStatsJson: String = "{}" + private var lastCallDuration: Double = 0.0 + private var lastCallServer: String = "" private val _callState = MutableStateFlow(0) val callState: StateFlow get() = _callState.asStateFlow() @@ -76,6 +82,21 @@ class CallViewModel : ViewModel(), WzpCallback { private val _seedHex = MutableStateFlow("") val seedHex: StateFlow = _seedHex.asStateFlow() + private val _aecEnabled = MutableStateFlow(true) + val aecEnabled: StateFlow = _aecEnabled.asStateFlow() + + /** True when a call just ended and debug report can be sent. */ + private val _debugReportAvailable = MutableStateFlow(false) + val debugReportAvailable: StateFlow = _debugReportAvailable.asStateFlow() + + /** Status: null=idle, "Preparing..."=in progress, "ready"=zip ready, "Error:..."=failed */ + private val _debugReportStatus = MutableStateFlow(null) + val debugReportStatus: StateFlow = _debugReportStatus.asStateFlow() + + /** The zip file ready to be emailed. Set by sendDebugReport, consumed by Activity. */ + private val _debugZipReady = MutableStateFlow(null) + val debugZipReady: StateFlow = _debugZipReady.asStateFlow() + private var statsJob: Job? = null companion object { @@ -96,6 +117,9 @@ class CallViewModel : ViewModel(), WzpCallback { if (audioRouteManager == null) { audioRouteManager = AudioRouteManager(appCtx) } + if (debugReporter == null) { + debugReporter = DebugReporter(appCtx) + } if (settings == null) { settings = SettingsRepository(appCtx) loadSettings() @@ -114,6 +138,7 @@ class CallViewModel : ViewModel(), WzpCallback { _playoutGainDb.value = s.loadPlayoutGain() _captureGainDb.value = s.loadCaptureGain() _seedHex.value = s.getOrCreateSeedHex() + _aecEnabled.value = s.loadAecEnabled() } fun selectServer(index: Int) { @@ -184,6 +209,11 @@ class CallViewModel : ViewModel(), WzpCallback { settings?.saveSeedHex(hex) } + fun setAecEnabled(enabled: Boolean) { + _aecEnabled.value = enabled + settings?.saveAecEnabled(enabled) + } + /** * Resolve DNS hostname to IP address on the Kotlin/Android side, * since Rust's DNS resolution may not work on Android. @@ -222,6 +252,7 @@ class CallViewModel : ViewModel(), WzpCallback { /** Tear down engine and audio. Pass stopService=true to also stop the foreground service. */ private fun teardown(stopService: Boolean = true) { Log.i(TAG, "teardown: stopping audio, stopService=$stopService") + val hadCall = audioStarted CallService.onStopFromNotification = null stopAudio() stopStatsPolling() @@ -231,6 +262,9 @@ class CallViewModel : ViewModel(), WzpCallback { engine = null engineInitialized = false _callState.value = 0 + if (hadCall) { + _debugReportAvailable.value = true + } if (stopService) { try { appContext?.let { CallService.stop(it) } } catch (_: Exception) {} } @@ -241,6 +275,10 @@ class CallViewModel : ViewModel(), WzpCallback { val serverEntry = _servers.value[_selectedServer.value] val room = _roomName.value Log.i(TAG, "startCall: server=${serverEntry.address} room=$room") + _debugReportAvailable.value = false + _debugReportStatus.value = null + lastCallServer = serverEntry.address + debugReporter?.prepareForCall() try { // Teardown previous call but don't stop the service (we're about to restart it) teardown(stopService = false) @@ -305,6 +343,40 @@ class CallViewModel : ViewModel(), WzpCallback { fun clearError() { _errorMessage.value = null } + fun sendDebugReport() { + val reporter = debugReporter ?: return + _debugReportStatus.value = "Preparing debug report..." + viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) { + val zipFile = reporter.collectZip( + callDurationSecs = lastCallDuration, + finalStatsJson = lastStatsJson, + aecEnabled = _aecEnabled.value, + alias = _alias.value, + server = lastCallServer, + room = _roomName.value + ) + if (zipFile != null) { + _debugZipReady.value = zipFile + _debugReportStatus.value = "ready" + } else { + _debugReportStatus.value = "Error: failed to create zip" + } + _debugReportAvailable.value = false + } + } + + /** Called by Activity after email intent is launched. */ + fun onDebugReportSent() { + _debugZipReady.value = null + _debugReportStatus.value = null + } + + fun dismissDebugReport() { + _debugReportAvailable.value = false + _debugReportStatus.value = null + _debugZipReady.value = null + } + // WzpCallback override fun onCallStateChanged(state: Int) { _callState.value = state } override fun onQualityTierChanged(tier: Int) { _qualityTier.value = tier } @@ -318,6 +390,7 @@ class CallViewModel : ViewModel(), WzpCallback { audioPipeline = AudioPipeline(ctx).also { it.playoutGainDb = _playoutGainDb.value it.captureGainDb = _captureGainDb.value + it.aecEnabled = _aecEnabled.value it.start(e) } audioRouteManager?.register() @@ -342,7 +415,9 @@ class CallViewModel : ViewModel(), WzpCallback { val json = engine?.getStats() ?: "{}" if (json.isNotEmpty()) { Log.d(TAG, "raw: $json") + lastStatsJson = json val s = CallStats.fromJson(json) + lastCallDuration = s.durationSecs _stats.value = s if (s.state != 0) { _callState.value = s.state diff --git a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt index 2d149ca..0bf6260 100644 --- a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt +++ b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt @@ -24,7 +24,6 @@ import androidx.compose.material3.ButtonDefaults import androidx.compose.material3.FilledIconButton import androidx.compose.material3.FilledTonalIconButton import androidx.compose.material3.IconButtonDefaults -import androidx.compose.material3.LinearProgressIndicator import androidx.compose.material3.MaterialTheme import androidx.compose.material3.OutlinedButton import androidx.compose.material3.OutlinedTextField @@ -69,6 +68,8 @@ fun InCallScreen( val preferIPv6 by viewModel.preferIPv6.collectAsState() val playoutGainDb by viewModel.playoutGainDb.collectAsState() val captureGainDb by viewModel.captureGainDb.collectAsState() + val debugReportAvailable by viewModel.debugReportAvailable.collectAsState() + val debugReportStatus by viewModel.debugReportStatus.collectAsState() var showAddServerDialog by remember { mutableStateOf(false) } @@ -228,6 +229,17 @@ fun InCallScreen( color = MaterialTheme.colorScheme.error ) } + + // Debug report card — shown after call ends + if (debugReportAvailable || debugReportStatus != null) { + Spacer(modifier = Modifier.height(24.dp)) + DebugReportCard( + available = debugReportAvailable, + status = debugReportStatus, + onSend = { viewModel.sendDebugReport() }, + onDismiss = { viewModel.dismissDebugReport() } + ) + } } else { // In-call UI Spacer(modifier = Modifier.height(16.dp)) @@ -442,15 +454,20 @@ private fun AudioLevelBar(audioLevel: Int) { color = MaterialTheme.colorScheme.onSurfaceVariant ) Spacer(modifier = Modifier.height(4.dp)) - LinearProgressIndicator( - progress = level, + Box( modifier = Modifier .fillMaxWidth(0.6f) .height(6.dp) - .clip(RoundedCornerShape(3.dp)), - color = MaterialTheme.colorScheme.primary, - trackColor = MaterialTheme.colorScheme.surfaceVariant, - ) + .clip(RoundedCornerShape(3.dp)) + .background(MaterialTheme.colorScheme.surfaceVariant) + ) { + Box( + modifier = Modifier + .fillMaxWidth(level) + .height(6.dp) + .background(MaterialTheme.colorScheme.primary) + ) + } } } @@ -602,3 +619,70 @@ private fun StatItem(label: String, value: String) { ) } } + +@Composable +private fun DebugReportCard( + available: Boolean, + status: String?, + onSend: () -> Unit, + onDismiss: () -> Unit +) { + Surface( + modifier = Modifier.fillMaxWidth(), + color = MaterialTheme.colorScheme.surfaceVariant.copy(alpha = 0.7f), + shape = RoundedCornerShape(12.dp) + ) { + Column( + modifier = Modifier.padding(16.dp), + horizontalAlignment = Alignment.CenterHorizontally + ) { + Text( + text = "Debug Report", + style = MaterialTheme.typography.titleSmall.copy(fontWeight = FontWeight.Bold), + color = MaterialTheme.colorScheme.onSurface + ) + Spacer(modifier = Modifier.height(4.dp)) + Text( + text = "Email call recordings, logs & stats for analysis", + style = MaterialTheme.typography.bodySmall, + color = MaterialTheme.colorScheme.onSurfaceVariant, + textAlign = TextAlign.Center + ) + + Spacer(modifier = Modifier.height(12.dp)) + + when { + status != null && status.startsWith("Error") -> { + Text( + text = status, + style = MaterialTheme.typography.bodySmall, + color = MaterialTheme.colorScheme.error + ) + Spacer(modifier = Modifier.height(8.dp)) + Row(horizontalArrangement = Arrangement.spacedBy(8.dp)) { + OutlinedButton(onClick = onSend) { Text("Retry") } + TextButton(onClick = onDismiss) { Text("Dismiss") } + } + } + status != null && status != "ready" -> { + // Preparing zip... + Text( + text = status, + style = MaterialTheme.typography.bodySmall, + color = MaterialTheme.colorScheme.onSurfaceVariant + ) + } + available -> { + Row(horizontalArrangement = Arrangement.spacedBy(8.dp)) { + Button(onClick = onSend) { + Text("Email Report") + } + TextButton(onClick = onDismiss) { + Text("Skip") + } + } + } + } + } + } +} diff --git a/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt b/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt index f990be4..6a083c2 100644 --- a/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt +++ b/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt @@ -69,6 +69,7 @@ fun SettingsScreen( val currentPreferIPv6 by viewModel.preferIPv6.collectAsState() val currentPlayoutGain by viewModel.playoutGainDb.collectAsState() val currentCaptureGain by viewModel.captureGainDb.collectAsState() + val currentAecEnabled by viewModel.aecEnabled.collectAsState() // Draft state — initialized from current values var draftAlias by remember { mutableStateOf(currentAlias) } @@ -79,6 +80,7 @@ fun SettingsScreen( var draftPreferIPv6 by remember { mutableStateOf(currentPreferIPv6) } var draftPlayoutGain by remember { mutableFloatStateOf(currentPlayoutGain) } var draftCaptureGain by remember { mutableFloatStateOf(currentCaptureGain) } + var draftAecEnabled by remember { mutableStateOf(currentAecEnabled) } // Track if anything changed val hasChanges = draftAlias != currentAlias || @@ -88,7 +90,8 @@ fun SettingsScreen( draftRoomName != currentRoomName || draftPreferIPv6 != currentPreferIPv6 || draftPlayoutGain != currentPlayoutGain || - draftCaptureGain != currentCaptureGain + draftCaptureGain != currentCaptureGain || + draftAecEnabled != currentAecEnabled var showAddServerDialog by remember { mutableStateOf(false) } var showRestoreKeyDialog by remember { mutableStateOf(false) } @@ -130,6 +133,7 @@ fun SettingsScreen( viewModel.setPreferIPv6(draftPreferIPv6) viewModel.setPlayoutGainDb(draftPlayoutGain) viewModel.setCaptureGainDb(draftCaptureGain) + viewModel.setAecEnabled(draftAecEnabled) Toast.makeText(context, "Settings saved", Toast.LENGTH_SHORT).show() onBack() }, @@ -204,6 +208,29 @@ fun SettingsScreen( onGainChange = { draftCaptureGain = Math.round(it).toFloat() } ) + Spacer(modifier = Modifier.height(12.dp)) + + Row( + verticalAlignment = Alignment.CenterVertically, + modifier = Modifier.fillMaxWidth() + ) { + Column(modifier = Modifier.weight(1f)) { + Text( + text = "Echo Cancellation (AEC)", + style = MaterialTheme.typography.bodyMedium + ) + Text( + text = "Disable if audio sounds distorted", + style = MaterialTheme.typography.bodySmall, + color = MaterialTheme.colorScheme.onSurfaceVariant + ) + } + Switch( + checked = draftAecEnabled, + onCheckedChange = { draftAecEnabled = it } + ) + } + Spacer(modifier = Modifier.height(24.dp)) Divider() Spacer(modifier = Modifier.height(16.dp)) diff --git a/android/app/src/main/res/xml/file_paths.xml b/android/app/src/main/res/xml/file_paths.xml new file mode 100644 index 0000000..45fce9e --- /dev/null +++ b/android/app/src/main/res/xml/file_paths.xml @@ -0,0 +1,4 @@ + + + + diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs index 6f25efb..ea20fb6 100644 --- a/crates/wzp-android/src/engine.rs +++ b/crates/wzp-android/src/engine.rs @@ -321,8 +321,18 @@ async fn run_call( let mut block_id: u8 = 0; // Send task: capture ring → Opus encode → FEC → MediaPackets + // + // IMPORTANT: send_media() uses quinn's send_datagram() which is + // synchronous and returns Err(Blocked) when the congestion window + // is full. We MUST NOT break on send errors — that would kill the + // entire call. Instead we drop the packet and keep going. let send_task = async { info!("send task started (Opus + RaptorQ FEC)"); + let mut send_errors: u64 = 0; + let mut last_send_error_log = Instant::now(); + let mut last_stats_log = Instant::now(); + let mut frames_sent: u64 = 0; + let mut frames_dropped: u64 = 0; loop { if !state.running.load(Ordering::Relaxed) { break; @@ -380,11 +390,24 @@ async fn run_call( quality_report: None, }; - // Send source packet + // Send source packet — drop on error, never break if let Err(e) = transport.send_media(&source_pkt).await { - error!("send error: {e}"); - break; + send_errors += 1; + frames_dropped += 1; + // Log first few errors, then throttle to once per second + if send_errors <= 3 || last_send_error_log.elapsed().as_secs() >= 1 { + warn!( + seq = s, + send_errors, + frames_dropped, + "send_media error (dropping packet): {e}" + ); + last_send_error_log = Instant::now(); + } + // Don't feed to FEC either — the source is lost + continue; } + frames_sent += 1; // Feed encoded frame to FEC encoder if let Err(e) = fec_enc.add_source_symbol(encoded) { @@ -418,9 +441,11 @@ async fn run_call( payload: Bytes::from(repair_data), quality_report: None, }; - if let Err(e) = transport.send_media(&repair_pkt).await { - error!("send repair error: {e}"); - break; + // Drop repair packets on error — never break + if let Err(_e) = transport.send_media(&repair_pkt).await { + send_errors += 1; + frames_dropped += 1; + // Don't log every repair failure — source error log covers it } } if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) { @@ -442,10 +467,21 @@ async fn run_call( frame_in_block = 0; } - if s % 500 == 0 { - info!(seq = s, block_id, frame_in_block, "sending"); + // Periodic stats every 5 seconds + if last_stats_log.elapsed().as_secs() >= 5 { + info!( + seq = s, + block_id, + frames_sent, + frames_dropped, + send_errors, + ring_avail = state.capture_ring.available(), + "send stats" + ); + last_stats_log = Instant::now(); } } + info!(frames_sent, frames_dropped, send_errors, "send task ended"); }; // Pre-allocate decode buffer @@ -455,6 +491,10 @@ async fn run_call( let recv_task = async { let mut frames_decoded: u64 = 0; let mut fec_recovered: u64 = 0; + let mut recv_errors: u64 = 0; + let mut last_recv_instant = Instant::now(); + let mut max_recv_gap_ms: u64 = 0; + let mut last_stats_log = Instant::now(); info!("recv task started (Opus + RaptorQ FEC)"); loop { if !state.running.load(Ordering::Relaxed) { @@ -462,6 +502,21 @@ async fn run_call( } match transport_recv.recv_media().await { Ok(Some(pkt)) => { + // Track recv gaps — large gaps indicate network or relay issues + let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; + last_recv_instant = Instant::now(); + if recv_gap_ms > max_recv_gap_ms { + max_recv_gap_ms = recv_gap_ms; + } + if recv_gap_ms > 500 { + warn!( + recv_gap_ms, + seq = pkt.header.seq, + is_repair = pkt.header.is_repair, + "large recv gap — possible network stall" + ); + } + let is_repair = pkt.header.is_repair; let pkt_block = pkt.header.fec_block; let pkt_symbol = pkt.header.fec_symbol; @@ -478,7 +533,6 @@ async fn run_call( if !is_repair { match decoder.decode(&pkt.payload, &mut decode_buf) { Ok(samples) => { - // AGC on playout — normalizes received audio volume playout_agc.process_frame(&mut decode_buf[..samples]); state.playout_ring.write(&decode_buf[..samples]); frames_decoded += 1; @@ -493,13 +547,8 @@ async fn run_call( } } - // Try FEC recovery for this block - // (useful when source packets were lost but repair arrived) + // Try FEC recovery if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) { - // FEC recovered the block — any previously missing frames - // are now available. In a full jitter buffer implementation, - // we'd insert recovered frames at the right position. - // For now, log recovery for telemetry. fec_recovered += recovered_frames.len() as u64; if fec_recovered % 50 == 1 { info!( @@ -516,24 +565,45 @@ async fn run_call( fec_dec.expire_before(pkt_block.wrapping_sub(3)); } - if frames_decoded == 1 || frames_decoded % 500 == 0 { - info!(frames_decoded, fec_recovered, "recv stats"); - } - let mut stats = state.stats.lock().unwrap(); stats.frames_decoded = frames_decoded; stats.fec_recovered = fec_recovered; + drop(stats); + + // Periodic stats every 5 seconds + if last_stats_log.elapsed().as_secs() >= 5 { + info!( + frames_decoded, + fec_recovered, + recv_errors, + max_recv_gap_ms, + playout_avail = state.playout_ring.available(), + "recv stats" + ); + max_recv_gap_ms = 0; + last_stats_log = Instant::now(); + } } Ok(None) => { - info!("relay disconnected"); + info!(frames_decoded, fec_recovered, "relay disconnected (stream ended)"); break; } Err(e) => { - error!("recv error: {e}"); - break; + recv_errors += 1; + // Transient errors: log and keep going + let msg = e.to_string(); + if msg.contains("closed") || msg.contains("reset") { + error!(recv_errors, "recv fatal: {e}"); + break; + } + // Non-fatal: log throttled + if recv_errors <= 3 || recv_errors % 50 == 0 { + warn!(recv_errors, "recv error (continuing): {e}"); + } } } } + info!(frames_decoded, fec_recovered, recv_errors, "recv task ended"); }; // Stats task — polls path quality + quinn RTT every 500ms diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs index 616538f..70850ad 100644 --- a/crates/wzp-relay/src/room.rs +++ b/crates/wzp-relay/src/room.rs @@ -10,7 +10,7 @@ use std::time::Duration; use bytes::Bytes; use tokio::sync::Mutex; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use wzp_proto::packet::TrunkFrame; use wzp_proto::MediaTransport; @@ -375,55 +375,121 @@ async fn run_participant_plain( ) { let addr = transport.connection().remote_address(); let mut packets_forwarded = 0u64; + let mut last_recv_instant = std::time::Instant::now(); + let mut max_recv_gap_ms = 0u64; + let mut max_forward_ms = 0u64; + let mut send_errors = 0u64; + let mut last_log_instant = std::time::Instant::now(); + + info!( + room = %room_name, + participant = participant_id, + %addr, + session = session_id, + "forwarding loop started (plain)" + ); loop { + let recv_start = std::time::Instant::now(); let pkt = match transport.recv_media().await { Ok(Some(pkt)) => pkt, Ok(None) => { - info!(%addr, participant = participant_id, "disconnected"); + info!(%addr, participant = participant_id, forwarded = packets_forwarded, "disconnected (stream ended)"); break; } Err(e) => { let msg = e.to_string(); if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") { - info!(%addr, participant = participant_id, "connection closed: {e}"); + info!(%addr, participant = participant_id, forwarded = packets_forwarded, "connection closed: {e}"); } else { - error!(%addr, participant = participant_id, "recv error: {e}"); + error!(%addr, participant = participant_id, forwarded = packets_forwarded, "recv error: {e}"); } break; } }; + let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; + last_recv_instant = std::time::Instant::now(); + if recv_gap_ms > max_recv_gap_ms { + max_recv_gap_ms = recv_gap_ms; + } + // Log if recv gap is suspiciously large (>200ms = missed ~10 packets) + if recv_gap_ms > 200 { + warn!( + room = %room_name, + participant = participant_id, + recv_gap_ms, + seq = pkt.header.seq, + "large recv gap" + ); + } + // Update per-session quality metrics if a quality report is present if let Some(ref report) = pkt.quality_report { metrics.update_session_quality(session_id, report); } // Get current list of other participants + let lock_start = std::time::Instant::now(); let others = { let mgr = room_mgr.lock().await; mgr.others(&room_name, participant_id) }; + let lock_ms = lock_start.elapsed().as_millis() as u64; + if lock_ms > 10 { + warn!( + room = %room_name, + participant = participant_id, + lock_ms, + "slow room_mgr lock" + ); + } // Forward to all others + let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; for other in &others { match other { ParticipantSender::Quic(t) => { - let _ = t.send_media(&pkt).await; + if let Err(e) = t.send_media(&pkt).await { + send_errors += 1; + if send_errors <= 5 || send_errors % 100 == 0 { + warn!( + room = %room_name, + participant = participant_id, + peer = %t.connection().remote_address(), + total_send_errors = send_errors, + "send_media error: {e}" + ); + } + } } ParticipantSender::WebSocket(_) => { - // WS clients receive raw payload bytes let _ = other.send_raw(&pkt.payload).await; } } } + let fwd_ms = fwd_start.elapsed().as_millis() as u64; + if fwd_ms > max_forward_ms { + max_forward_ms = fwd_ms; + } + if fwd_ms > 50 { + warn!( + room = %room_name, + participant = participant_id, + fwd_ms, + fan_out = others.len(), + "slow forward" + ); + } let fan_out = others.len() as u64; metrics.packets_forwarded.inc_by(fan_out); metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); packets_forwarded += 1; - if packets_forwarded % 500 == 0 { + + // Periodic stats log every 5 seconds + if last_log_instant.elapsed() >= Duration::from_secs(5) { let room_size = { let mgr = room_mgr.lock().await; mgr.room_size(&room_name) @@ -433,8 +499,15 @@ async fn run_participant_plain( participant = participant_id, forwarded = packets_forwarded, room_size, + fan_out, + max_recv_gap_ms, + max_forward_ms, + send_errors, "participant stats" ); + max_recv_gap_ms = 0; + max_forward_ms = 0; + last_log_instant = std::time::Instant::now(); } } @@ -459,6 +532,19 @@ async fn run_participant_trunked( let addr = transport.connection().remote_address(); let mut packets_forwarded = 0u64; + let mut last_recv_instant = std::time::Instant::now(); + let mut max_recv_gap_ms = 0u64; + let mut max_forward_ms = 0u64; + let mut send_errors = 0u64; + let mut last_log_instant = std::time::Instant::now(); + + info!( + room = %room_name, + participant = participant_id, + %addr, + session = session_id, + "forwarding loop started (trunked)" + ); // Per-peer TrunkedForwarders, keyed by the raw pointer of the peer // transport (stable for the Arc's lifetime). We use the remote address @@ -480,24 +566,50 @@ async fn run_participant_trunked( let pkt = match result { Ok(Some(pkt)) => pkt, Ok(None) => { - info!(%addr, participant = participant_id, "disconnected"); + info!(%addr, participant = participant_id, forwarded = packets_forwarded, "disconnected (stream ended)"); break; } Err(e) => { - error!(%addr, participant = participant_id, "recv error: {e}"); + error!(%addr, participant = participant_id, forwarded = packets_forwarded, "recv error: {e}"); break; } }; + let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64; + last_recv_instant = std::time::Instant::now(); + if recv_gap_ms > max_recv_gap_ms { + max_recv_gap_ms = recv_gap_ms; + } + if recv_gap_ms > 200 { + warn!( + room = %room_name, + participant = participant_id, + recv_gap_ms, + seq = pkt.header.seq, + "large recv gap (trunked)" + ); + } + if let Some(ref report) = pkt.quality_report { metrics.update_session_quality(session_id, report); } + let lock_start = std::time::Instant::now(); let others = { let mgr = room_mgr.lock().await; mgr.others(&room_name, participant_id) }; + let lock_ms = lock_start.elapsed().as_millis() as u64; + if lock_ms > 10 { + warn!( + room = %room_name, + participant = participant_id, + lock_ms, + "slow room_mgr lock (trunked)" + ); + } + let fwd_start = std::time::Instant::now(); let pkt_bytes = pkt.payload.len() as u64; for other in &others { match other { @@ -507,21 +619,44 @@ async fn run_participant_trunked( .entry(peer_addr) .or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes)); if let Err(e) = fwd.send(&pkt).await { - let _ = e; + send_errors += 1; + if send_errors <= 5 || send_errors % 100 == 0 { + warn!( + room = %room_name, + participant = participant_id, + peer = %peer_addr, + total_send_errors = send_errors, + "trunked send error: {e}" + ); + } } } ParticipantSender::WebSocket(_) => { - // WS clients bypass trunking — send raw payload directly let _ = other.send_raw(&pkt.payload).await; } } } + let fwd_ms = fwd_start.elapsed().as_millis() as u64; + if fwd_ms > max_forward_ms { + max_forward_ms = fwd_ms; + } + if fwd_ms > 50 { + warn!( + room = %room_name, + participant = participant_id, + fwd_ms, + fan_out = others.len(), + "slow forward (trunked)" + ); + } let fan_out = others.len() as u64; metrics.packets_forwarded.inc_by(fan_out); metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); packets_forwarded += 1; - if packets_forwarded % 500 == 0 { + + // Periodic stats every 5 seconds + if last_log_instant.elapsed() >= Duration::from_secs(5) { let room_size = { let mgr = room_mgr.lock().await; mgr.room_size(&room_name) @@ -531,15 +666,30 @@ async fn run_participant_trunked( participant = participant_id, forwarded = packets_forwarded, room_size, + fan_out, + max_recv_gap_ms, + max_forward_ms, + send_errors, "participant stats (trunked)" ); + max_recv_gap_ms = 0; + max_forward_ms = 0; + last_log_instant = std::time::Instant::now(); } } _ = flush_interval.tick() => { for fwd in forwarders.values_mut() { if let Err(e) = fwd.flush().await { - let _ = e; + send_errors += 1; + if send_errors <= 5 || send_errors % 100 == 0 { + warn!( + room = %room_name, + participant = participant_id, + total_send_errors = send_errors, + "trunk flush error: {e}" + ); + } } } }