diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml
index 0eea970..166014a 100644
--- a/android/app/src/main/AndroidManifest.xml
+++ b/android/app/src/main/AndroidManifest.xml
@@ -29,5 +29,15 @@
android:name="com.wzp.service.CallService"
android:foregroundServiceType="microphone"
android:exported="false" />
+
+
+
+
diff --git a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt
index ca4987a..7126f66 100644
--- a/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt
+++ b/android/app/src/main/java/com/wzp/audio/AudioPipeline.kt
@@ -13,7 +13,14 @@ import android.media.audiofx.NoiseSuppressor
import android.util.Log
import androidx.core.content.ContextCompat
import com.wzp.engine.WzpEngine
+import java.io.BufferedOutputStream
+import java.io.File
+import java.io.FileOutputStream
+import java.io.OutputStreamWriter
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
import kotlin.math.pow
+import kotlin.math.sqrt
/**
* Audio pipeline that captures mic audio and plays received audio using
@@ -45,9 +52,17 @@ class AudioPipeline(private val context: Context) {
/** Capture (mic) gain in dB. 0 = unity. */
@Volatile
var captureGainDb: Float = 0f
+ /** Whether to attach hardware AEC. Must be set before start(). */
+ var aecEnabled: Boolean = true
+ /** Enable debug recording of PCM + RMS histogram to cache dir. */
+ var debugRecording: Boolean = true
private var captureThread: Thread? = null
private var playoutThread: Thread? = null
+ private val debugDir: File by lazy {
+ File(context.cacheDir, "wzp_debug").also { it.mkdirs() }
+ }
+
fun start(engine: WzpEngine) {
if (running) return
running = true
@@ -91,6 +106,15 @@ class AudioPipeline(private val context: Context) {
}
}
+ private fun computeRms(pcm: ShortArray, count: Int): Int {
+ var sumSq = 0.0
+ for (i in 0 until count) {
+ val s = pcm[i].toDouble()
+ sumSq += s * s
+ }
+ return sqrt(sumSq / count).toInt()
+ }
+
private fun parkThread() {
try {
Thread.sleep(Long.MAX_VALUE)
@@ -129,53 +153,86 @@ class AudioPipeline(private val context: Context) {
return
}
- // Attach hardware AEC if available
+ // Attach hardware AEC if available and enabled in settings
var aec: AcousticEchoCanceler? = null
- if (AcousticEchoCanceler.isAvailable()) {
- try {
- aec = AcousticEchoCanceler.create(recorder.audioSessionId)
- aec?.enabled = true
- Log.i(TAG, "AEC enabled (session=${recorder.audioSessionId})")
- } catch (e: Exception) {
- Log.w(TAG, "AEC init failed: ${e.message}")
+ var ns: NoiseSuppressor? = null
+ if (aecEnabled) {
+ if (AcousticEchoCanceler.isAvailable()) {
+ try {
+ aec = AcousticEchoCanceler.create(recorder.audioSessionId)
+ aec?.enabled = true
+ Log.i(TAG, "AEC enabled (session=${recorder.audioSessionId})")
+ } catch (e: Exception) {
+ Log.w(TAG, "AEC init failed: ${e.message}")
+ }
+ } else {
+ Log.w(TAG, "AEC not available on this device")
+ }
+
+ // Attach hardware noise suppressor if available
+ if (NoiseSuppressor.isAvailable()) {
+ try {
+ ns = NoiseSuppressor.create(recorder.audioSessionId)
+ ns?.enabled = true
+ Log.i(TAG, "NoiseSuppressor enabled")
+ } catch (e: Exception) {
+ Log.w(TAG, "NoiseSuppressor init failed: ${e.message}")
+ }
}
} else {
- Log.w(TAG, "AEC not available on this device")
- }
-
- // Attach hardware noise suppressor if available
- var ns: NoiseSuppressor? = null
- if (NoiseSuppressor.isAvailable()) {
- try {
- ns = NoiseSuppressor.create(recorder.audioSessionId)
- ns?.enabled = true
- Log.i(TAG, "NoiseSuppressor enabled")
- } catch (e: Exception) {
- Log.w(TAG, "NoiseSuppressor init failed: ${e.message}")
- }
+ Log.i(TAG, "AEC disabled by user setting")
}
recorder.startRecording()
Log.i(TAG, "capture started: ${SAMPLE_RATE}Hz mono, buf=$bufSize, aec=${aec?.enabled}, ns=${ns?.enabled}")
val pcm = ShortArray(FRAME_SAMPLES)
+ // Debug: PCM file + RMS CSV
+ var pcmOut: BufferedOutputStream? = null
+ var rmsCsv: OutputStreamWriter? = null
+ val byteConv = ByteBuffer.allocate(FRAME_SAMPLES * 2).order(ByteOrder.LITTLE_ENDIAN)
+ var frameIdx = 0L
+ if (debugRecording) {
+ try {
+ pcmOut = BufferedOutputStream(FileOutputStream(File(debugDir, "capture.pcm")), 65536)
+ rmsCsv = OutputStreamWriter(FileOutputStream(File(debugDir, "capture_rms.csv")))
+ rmsCsv.write("frame,time_ms,rms\n")
+ } catch (e: Exception) {
+ Log.w(TAG, "debug recording init failed: ${e.message}")
+ }
+ }
try {
while (running) {
val read = recorder.read(pcm, 0, FRAME_SAMPLES)
if (read > 0) {
applyGain(pcm, read, captureGainDb)
engine.writeAudio(pcm)
+
+ // Debug: write raw PCM + RMS
+ if (pcmOut != null) {
+ byteConv.clear()
+ for (i in 0 until read) byteConv.putShort(pcm[i])
+ pcmOut.write(byteConv.array(), 0, read * 2)
+ }
+ if (rmsCsv != null) {
+ val rms = computeRms(pcm, read)
+ val timeMs = frameIdx * FRAME_SAMPLES * 1000L / SAMPLE_RATE
+ rmsCsv.write("$frameIdx,$timeMs,$rms\n")
+ }
+ frameIdx++
} else if (read < 0) {
Log.e(TAG, "AudioRecord.read error: $read")
break
}
}
} finally {
+ pcmOut?.close()
+ rmsCsv?.close()
recorder.stop()
aec?.release()
ns?.release()
recorder.release()
- Log.i(TAG, "capture stopped")
+ Log.i(TAG, "capture stopped (frames=$frameIdx)")
}
}
@@ -211,24 +268,57 @@ class AudioPipeline(private val context: Context) {
Log.i(TAG, "playout started: ${SAMPLE_RATE}Hz mono, buf=$bufSize")
val pcm = ShortArray(FRAME_SAMPLES)
- val silence = ShortArray(FRAME_SAMPLES) // pre-allocated silence
+ val silence = ShortArray(FRAME_SAMPLES)
+ // Debug: PCM file + RMS CSV for playout
+ var pcmOut: BufferedOutputStream? = null
+ var rmsCsv: OutputStreamWriter? = null
+ val byteConv = ByteBuffer.allocate(FRAME_SAMPLES * 2).order(ByteOrder.LITTLE_ENDIAN)
+ var frameIdx = 0L
+ if (debugRecording) {
+ try {
+ pcmOut = BufferedOutputStream(FileOutputStream(File(debugDir, "playout.pcm")), 65536)
+ rmsCsv = OutputStreamWriter(FileOutputStream(File(debugDir, "playout_rms.csv")))
+ rmsCsv.write("frame,time_ms,rms\n")
+ } catch (e: Exception) {
+ Log.w(TAG, "debug playout recording init failed: ${e.message}")
+ }
+ }
try {
while (running) {
val read = engine.readAudio(pcm)
if (read >= FRAME_SAMPLES) {
applyGain(pcm, read, playoutGainDb)
track.write(pcm, 0, read)
+
+ // Debug: write raw PCM + RMS
+ if (pcmOut != null) {
+ byteConv.clear()
+ for (i in 0 until read) byteConv.putShort(pcm[i])
+ pcmOut.write(byteConv.array(), 0, read * 2)
+ }
+ if (rmsCsv != null) {
+ val rms = computeRms(pcm, read)
+ val timeMs = frameIdx * FRAME_SAMPLES * 1000L / SAMPLE_RATE
+ rmsCsv.write("$frameIdx,$timeMs,$rms\n")
+ }
+ frameIdx++
} else {
- // Not enough decoded audio — write silence to keep stream alive
track.write(silence, 0, FRAME_SAMPLES)
- // Sleep briefly to avoid busy-spinning
+ // Log silence frames to RMS as 0
+ if (rmsCsv != null) {
+ val timeMs = frameIdx * FRAME_SAMPLES * 1000L / SAMPLE_RATE
+ rmsCsv.write("$frameIdx,$timeMs,0\n")
+ }
+ frameIdx++
Thread.sleep(5)
}
}
} finally {
+ pcmOut?.close()
+ rmsCsv?.close()
track.stop()
track.release()
- Log.i(TAG, "playout stopped")
+ Log.i(TAG, "playout stopped (frames=$frameIdx)")
}
}
}
diff --git a/android/app/src/main/java/com/wzp/data/SettingsRepository.kt b/android/app/src/main/java/com/wzp/data/SettingsRepository.kt
index 2d2162c..28c41e9 100644
--- a/android/app/src/main/java/com/wzp/data/SettingsRepository.kt
+++ b/android/app/src/main/java/com/wzp/data/SettingsRepository.kt
@@ -27,6 +27,7 @@ class SettingsRepository(context: Context) {
private const val KEY_CAPTURE_GAIN = "capture_gain_db"
private const val KEY_PREFER_IPV6 = "prefer_ipv6"
private const val KEY_IDENTITY_SEED = "identity_seed_hex"
+ private const val KEY_AEC_ENABLED = "aec_enabled"
}
// --- Servers ---
@@ -112,6 +113,11 @@ class SettingsRepository(context: Context) {
fun savePreferIPv6(prefer: Boolean) { prefs.edit().putBoolean(KEY_PREFER_IPV6, prefer).apply() }
fun loadPreferIPv6(): Boolean = prefs.getBoolean(KEY_PREFER_IPV6, false)
+ // --- AEC ---
+
+ fun saveAecEnabled(enabled: Boolean) { prefs.edit().putBoolean(KEY_AEC_ENABLED, enabled).apply() }
+ fun loadAecEnabled(): Boolean = prefs.getBoolean(KEY_AEC_ENABLED, true)
+
// --- Identity seed ---
/**
diff --git a/android/app/src/main/java/com/wzp/debug/DebugReporter.kt b/android/app/src/main/java/com/wzp/debug/DebugReporter.kt
new file mode 100644
index 0000000..38c32d7
--- /dev/null
+++ b/android/app/src/main/java/com/wzp/debug/DebugReporter.kt
@@ -0,0 +1,189 @@
+package com.wzp.debug
+
+import android.content.Context
+import android.util.Log
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.withContext
+import java.io.BufferedOutputStream
+import java.io.ByteArrayOutputStream
+import java.io.File
+import java.io.FileInputStream
+import java.io.FileOutputStream
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.util.Locale
+import java.util.zip.ZipEntry
+import java.util.zip.ZipOutputStream
+
+/**
+ * Collects call debug data (audio recordings, logs, histograms, stats)
+ * into a zip file for email sharing.
+ */
+class DebugReporter(private val context: Context) {
+
+ companion object {
+ private const val TAG = "DebugReporter"
+ private const val SAMPLE_RATE = 48000
+ }
+
+ /**
+ * Build a zip with all debug data.
+ * Returns the zip File on success, or null on failure.
+ */
+ suspend fun collectZip(
+ callDurationSecs: Double,
+ finalStatsJson: String,
+ aecEnabled: Boolean,
+ alias: String,
+ server: String,
+ room: String
+ ): File? = withContext(Dispatchers.IO) {
+ try {
+ val debugDir = File(context.cacheDir, "wzp_debug")
+ val timestamp = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.US).format(Date())
+ val zipFile = File(context.cacheDir, "wzp_debug_${timestamp}.zip")
+
+ ZipOutputStream(BufferedOutputStream(FileOutputStream(zipFile))).use { zos ->
+ // 1. Call metadata
+ val meta = buildString {
+ appendLine("=== WZ Phone Debug Report ===")
+ appendLine("Timestamp: $timestamp")
+ appendLine("Alias: $alias")
+ appendLine("Server: $server")
+ appendLine("Room: $room")
+ appendLine("Duration: ${"%.1f".format(callDurationSecs)}s")
+ appendLine("AEC: ${if (aecEnabled) "ON" else "OFF"}")
+ appendLine("Device: ${android.os.Build.MANUFACTURER} ${android.os.Build.MODEL}")
+ appendLine("Android: ${android.os.Build.VERSION.RELEASE} (API ${android.os.Build.VERSION.SDK_INT})")
+ appendLine()
+ appendLine("=== Final Stats ===")
+ appendLine(finalStatsJson)
+ }
+ addTextEntry(zos, "meta.txt", meta)
+
+ // 2. Logcat — WZP-related tags
+ val logcat = collectLogcat()
+ addTextEntry(zos, "logcat.txt", logcat)
+
+ // 3. Capture audio (mic) → WAV
+ val captureRaw = File(debugDir, "capture.pcm")
+ if (captureRaw.exists() && captureRaw.length() > 0) {
+ addWavEntry(zos, "capture.wav", captureRaw)
+ Log.i(TAG, "capture.pcm: ${captureRaw.length()} bytes -> WAV")
+ }
+
+ // 4. Playout audio (speaker) → WAV
+ val playoutRaw = File(debugDir, "playout.pcm")
+ if (playoutRaw.exists() && playoutRaw.length() > 0) {
+ addWavEntry(zos, "playout.wav", playoutRaw)
+ Log.i(TAG, "playout.pcm: ${playoutRaw.length()} bytes -> WAV")
+ }
+
+ // 5. RMS histogram CSV
+ val captureHist = File(debugDir, "capture_rms.csv")
+ if (captureHist.exists()) addFileEntry(zos, "capture_rms.csv", captureHist)
+ val playoutHist = File(debugDir, "playout_rms.csv")
+ if (playoutHist.exists()) addFileEntry(zos, "playout_rms.csv", playoutHist)
+ }
+
+ Log.i(TAG, "zip created: ${zipFile.length()} bytes (${zipFile.length() / 1024}KB)")
+
+ // Clean up raw debug files (keep zip)
+ debugDir.listFiles()?.forEach { it.delete() }
+
+ zipFile
+ } catch (e: Exception) {
+ Log.e(TAG, "debug report failed", e)
+ null
+ }
+ }
+
+ /** Clean up any leftover debug files from a previous session. */
+ fun prepareForCall() {
+ val debugDir = File(context.cacheDir, "wzp_debug")
+ if (debugDir.exists()) {
+ debugDir.listFiles()?.forEach { it.delete() }
+ }
+ debugDir.mkdirs()
+ // Also clean up old zip files
+ context.cacheDir.listFiles()?.filter { it.name.startsWith("wzp_debug_") }?.forEach { it.delete() }
+ }
+
+ private fun collectLogcat(): String {
+ return try {
+ val process = Runtime.getRuntime().exec(
+ arrayOf(
+ "logcat", "-d",
+ "-t", "5000",
+ "--format", "threadtime"
+ )
+ )
+ val output = process.inputStream.bufferedReader().readText()
+ process.waitFor()
+ output.lines()
+ .filter { line ->
+ line.contains("wzp", ignoreCase = true) ||
+ line.contains("WzpEngine") ||
+ line.contains("AudioPipeline") ||
+ line.contains("WzpCall") ||
+ line.contains("CallService") ||
+ line.contains("AudioTrack") ||
+ line.contains("AudioRecord") ||
+ line.contains("AcousticEchoCanceler") ||
+ line.contains("NoiseSuppressor") ||
+ line.contains("FATAL") ||
+ line.contains("ANR") ||
+ line.contains("AudioFlinger") ||
+ line.contains("DebugReporter") ||
+ line.contains("QUIC") ||
+ line.contains("quinn")
+ }
+ .joinToString("\n")
+ } catch (e: Exception) {
+ "Failed to collect logcat: ${e.message}"
+ }
+ }
+
+ private fun addWavEntry(zos: ZipOutputStream, name: String, pcmFile: File) {
+ val dataSize = pcmFile.length().toInt()
+ val byteRate = SAMPLE_RATE * 1 * 16 / 8
+ val blockAlign = 1 * 16 / 8
+
+ zos.putNextEntry(ZipEntry(name))
+
+ // Write WAV header (44 bytes)
+ val header = ByteBuffer.allocate(44).order(ByteOrder.LITTLE_ENDIAN)
+ header.put("RIFF".toByteArray())
+ header.putInt(36 + dataSize)
+ header.put("WAVE".toByteArray())
+ header.put("fmt ".toByteArray())
+ header.putInt(16)
+ header.putShort(1) // PCM
+ header.putShort(1) // mono
+ header.putInt(SAMPLE_RATE)
+ header.putInt(byteRate)
+ header.putShort(blockAlign.toShort())
+ header.putShort(16) // bits per sample
+ header.put("data".toByteArray())
+ header.putInt(dataSize)
+ zos.write(header.array())
+
+ // Stream PCM data directly (avoids loading entire file into memory)
+ FileInputStream(pcmFile).use { it.copyTo(zos) }
+ zos.closeEntry()
+ }
+
+ private fun addTextEntry(zos: ZipOutputStream, name: String, content: String) {
+ zos.putNextEntry(ZipEntry(name))
+ zos.write(content.toByteArray())
+ zos.closeEntry()
+ }
+
+ private fun addFileEntry(zos: ZipOutputStream, name: String, file: File) {
+ zos.putNextEntry(ZipEntry(name))
+ FileInputStream(file).use { it.copyTo(zos) }
+ zos.closeEntry()
+ }
+}
diff --git a/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt b/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt
index a2e46a9..f651ae2 100644
--- a/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt
+++ b/android/app/src/main/java/com/wzp/ui/call/CallActivity.kt
@@ -1,8 +1,10 @@
package com.wzp.ui.call
import android.Manifest
+import android.content.Intent
import android.content.pm.PackageManager
import android.os.Bundle
+import android.util.Log
import android.widget.Toast
import androidx.activity.ComponentActivity
import androidx.activity.compose.setContent
@@ -21,7 +23,12 @@ import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import androidx.compose.ui.platform.LocalContext
import androidx.core.content.ContextCompat
+import androidx.core.content.FileProvider
+import androidx.lifecycle.Lifecycle
+import androidx.lifecycle.lifecycleScope
+import androidx.lifecycle.repeatOnLifecycle
import com.wzp.ui.settings.SettingsScreen
+import kotlinx.coroutines.launch
/**
* Main activity hosting the in-call Compose UI.
@@ -31,6 +38,10 @@ import com.wzp.ui.settings.SettingsScreen
*/
class CallActivity : ComponentActivity() {
+ companion object {
+ private const val TAG = "CallActivity"
+ }
+
private val viewModel: CallViewModel by viewModels()
private val audioPermissionLauncher = registerForActivityResult(
@@ -69,6 +80,45 @@ class CallActivity : ComponentActivity() {
) {
audioPermissionLauncher.launch(Manifest.permission.RECORD_AUDIO)
}
+
+ // Watch for debug zip ready → launch email intent
+ lifecycleScope.launch {
+ repeatOnLifecycle(Lifecycle.State.STARTED) {
+ viewModel.debugZipReady.collect { zipFile ->
+ if (zipFile != null && zipFile.exists()) {
+ Log.i(TAG, "debug zip ready: ${zipFile.absolutePath} (${zipFile.length()} bytes)")
+ launchEmailIntent(zipFile)
+ viewModel.onDebugReportSent()
+ }
+ }
+ }
+ }
+ }
+
+ private fun launchEmailIntent(zipFile: java.io.File) {
+ try {
+ val authority = "${applicationContext.packageName}.fileprovider"
+ Log.i(TAG, "FileProvider authority: $authority, file: ${zipFile.absolutePath}")
+ val uri = FileProvider.getUriForFile(this, authority, zipFile)
+ Log.i(TAG, "FileProvider URI: $uri")
+
+ val intent = Intent(Intent.ACTION_SEND).apply {
+ type = "message/rfc822"
+ putExtra(Intent.EXTRA_EMAIL, arrayOf("manwefarm@gmail.com"))
+ putExtra(Intent.EXTRA_SUBJECT, "WZ Phone Debug Report - ${zipFile.name}")
+ putExtra(
+ Intent.EXTRA_TEXT,
+ "Debug report attached.\n\nContains: call recordings (WAV), RMS histograms (CSV), logcat, stats."
+ )
+ putExtra(Intent.EXTRA_STREAM, uri)
+ addFlags(Intent.FLAG_GRANT_READ_URI_PERMISSION)
+ }
+ startActivity(Intent.createChooser(intent, "Send debug report"))
+ Log.i(TAG, "email intent launched")
+ } catch (e: Exception) {
+ Log.e(TAG, "email intent failed", e)
+ Toast.makeText(this, "Failed to launch email: ${e.message}", Toast.LENGTH_LONG).show()
+ }
}
override fun onDestroy() {
diff --git a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt
index 7a3f53d..30bd7e4 100644
--- a/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt
+++ b/android/app/src/main/java/com/wzp/ui/call/CallViewModel.kt
@@ -7,6 +7,7 @@ import androidx.lifecycle.viewModelScope
import com.wzp.audio.AudioPipeline
import com.wzp.audio.AudioRouteManager
import com.wzp.data.SettingsRepository
+import com.wzp.debug.DebugReporter
import com.wzp.engine.CallStats
import com.wzp.service.CallService
import com.wzp.engine.WzpCallback
@@ -18,6 +19,7 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
+import java.io.File
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
@@ -33,6 +35,10 @@ class CallViewModel : ViewModel(), WzpCallback {
private var audioStarted = false
private var appContext: Context? = null
private var settings: SettingsRepository? = null
+ private var debugReporter: DebugReporter? = null
+ private var lastStatsJson: String = "{}"
+ private var lastCallDuration: Double = 0.0
+ private var lastCallServer: String = ""
private val _callState = MutableStateFlow(0)
val callState: StateFlow get() = _callState.asStateFlow()
@@ -76,6 +82,21 @@ class CallViewModel : ViewModel(), WzpCallback {
private val _seedHex = MutableStateFlow("")
val seedHex: StateFlow = _seedHex.asStateFlow()
+ private val _aecEnabled = MutableStateFlow(true)
+ val aecEnabled: StateFlow = _aecEnabled.asStateFlow()
+
+ /** True when a call just ended and debug report can be sent. */
+ private val _debugReportAvailable = MutableStateFlow(false)
+ val debugReportAvailable: StateFlow = _debugReportAvailable.asStateFlow()
+
+ /** Status: null=idle, "Preparing..."=in progress, "ready"=zip ready, "Error:..."=failed */
+ private val _debugReportStatus = MutableStateFlow(null)
+ val debugReportStatus: StateFlow = _debugReportStatus.asStateFlow()
+
+ /** The zip file ready to be emailed. Set by sendDebugReport, consumed by Activity. */
+ private val _debugZipReady = MutableStateFlow(null)
+ val debugZipReady: StateFlow = _debugZipReady.asStateFlow()
+
private var statsJob: Job? = null
companion object {
@@ -96,6 +117,9 @@ class CallViewModel : ViewModel(), WzpCallback {
if (audioRouteManager == null) {
audioRouteManager = AudioRouteManager(appCtx)
}
+ if (debugReporter == null) {
+ debugReporter = DebugReporter(appCtx)
+ }
if (settings == null) {
settings = SettingsRepository(appCtx)
loadSettings()
@@ -114,6 +138,7 @@ class CallViewModel : ViewModel(), WzpCallback {
_playoutGainDb.value = s.loadPlayoutGain()
_captureGainDb.value = s.loadCaptureGain()
_seedHex.value = s.getOrCreateSeedHex()
+ _aecEnabled.value = s.loadAecEnabled()
}
fun selectServer(index: Int) {
@@ -184,6 +209,11 @@ class CallViewModel : ViewModel(), WzpCallback {
settings?.saveSeedHex(hex)
}
+ fun setAecEnabled(enabled: Boolean) {
+ _aecEnabled.value = enabled
+ settings?.saveAecEnabled(enabled)
+ }
+
/**
* Resolve DNS hostname to IP address on the Kotlin/Android side,
* since Rust's DNS resolution may not work on Android.
@@ -222,6 +252,7 @@ class CallViewModel : ViewModel(), WzpCallback {
/** Tear down engine and audio. Pass stopService=true to also stop the foreground service. */
private fun teardown(stopService: Boolean = true) {
Log.i(TAG, "teardown: stopping audio, stopService=$stopService")
+ val hadCall = audioStarted
CallService.onStopFromNotification = null
stopAudio()
stopStatsPolling()
@@ -231,6 +262,9 @@ class CallViewModel : ViewModel(), WzpCallback {
engine = null
engineInitialized = false
_callState.value = 0
+ if (hadCall) {
+ _debugReportAvailable.value = true
+ }
if (stopService) {
try { appContext?.let { CallService.stop(it) } } catch (_: Exception) {}
}
@@ -241,6 +275,10 @@ class CallViewModel : ViewModel(), WzpCallback {
val serverEntry = _servers.value[_selectedServer.value]
val room = _roomName.value
Log.i(TAG, "startCall: server=${serverEntry.address} room=$room")
+ _debugReportAvailable.value = false
+ _debugReportStatus.value = null
+ lastCallServer = serverEntry.address
+ debugReporter?.prepareForCall()
try {
// Teardown previous call but don't stop the service (we're about to restart it)
teardown(stopService = false)
@@ -305,6 +343,40 @@ class CallViewModel : ViewModel(), WzpCallback {
fun clearError() { _errorMessage.value = null }
+ fun sendDebugReport() {
+ val reporter = debugReporter ?: return
+ _debugReportStatus.value = "Preparing debug report..."
+ viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) {
+ val zipFile = reporter.collectZip(
+ callDurationSecs = lastCallDuration,
+ finalStatsJson = lastStatsJson,
+ aecEnabled = _aecEnabled.value,
+ alias = _alias.value,
+ server = lastCallServer,
+ room = _roomName.value
+ )
+ if (zipFile != null) {
+ _debugZipReady.value = zipFile
+ _debugReportStatus.value = "ready"
+ } else {
+ _debugReportStatus.value = "Error: failed to create zip"
+ }
+ _debugReportAvailable.value = false
+ }
+ }
+
+ /** Called by Activity after email intent is launched. */
+ fun onDebugReportSent() {
+ _debugZipReady.value = null
+ _debugReportStatus.value = null
+ }
+
+ fun dismissDebugReport() {
+ _debugReportAvailable.value = false
+ _debugReportStatus.value = null
+ _debugZipReady.value = null
+ }
+
// WzpCallback
override fun onCallStateChanged(state: Int) { _callState.value = state }
override fun onQualityTierChanged(tier: Int) { _qualityTier.value = tier }
@@ -318,6 +390,7 @@ class CallViewModel : ViewModel(), WzpCallback {
audioPipeline = AudioPipeline(ctx).also {
it.playoutGainDb = _playoutGainDb.value
it.captureGainDb = _captureGainDb.value
+ it.aecEnabled = _aecEnabled.value
it.start(e)
}
audioRouteManager?.register()
@@ -342,7 +415,9 @@ class CallViewModel : ViewModel(), WzpCallback {
val json = engine?.getStats() ?: "{}"
if (json.isNotEmpty()) {
Log.d(TAG, "raw: $json")
+ lastStatsJson = json
val s = CallStats.fromJson(json)
+ lastCallDuration = s.durationSecs
_stats.value = s
if (s.state != 0) {
_callState.value = s.state
diff --git a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt
index 2d149ca..0bf6260 100644
--- a/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt
+++ b/android/app/src/main/java/com/wzp/ui/call/InCallScreen.kt
@@ -24,7 +24,6 @@ import androidx.compose.material3.ButtonDefaults
import androidx.compose.material3.FilledIconButton
import androidx.compose.material3.FilledTonalIconButton
import androidx.compose.material3.IconButtonDefaults
-import androidx.compose.material3.LinearProgressIndicator
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.OutlinedButton
import androidx.compose.material3.OutlinedTextField
@@ -69,6 +68,8 @@ fun InCallScreen(
val preferIPv6 by viewModel.preferIPv6.collectAsState()
val playoutGainDb by viewModel.playoutGainDb.collectAsState()
val captureGainDb by viewModel.captureGainDb.collectAsState()
+ val debugReportAvailable by viewModel.debugReportAvailable.collectAsState()
+ val debugReportStatus by viewModel.debugReportStatus.collectAsState()
var showAddServerDialog by remember { mutableStateOf(false) }
@@ -228,6 +229,17 @@ fun InCallScreen(
color = MaterialTheme.colorScheme.error
)
}
+
+ // Debug report card — shown after call ends
+ if (debugReportAvailable || debugReportStatus != null) {
+ Spacer(modifier = Modifier.height(24.dp))
+ DebugReportCard(
+ available = debugReportAvailable,
+ status = debugReportStatus,
+ onSend = { viewModel.sendDebugReport() },
+ onDismiss = { viewModel.dismissDebugReport() }
+ )
+ }
} else {
// In-call UI
Spacer(modifier = Modifier.height(16.dp))
@@ -442,15 +454,20 @@ private fun AudioLevelBar(audioLevel: Int) {
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Spacer(modifier = Modifier.height(4.dp))
- LinearProgressIndicator(
- progress = level,
+ Box(
modifier = Modifier
.fillMaxWidth(0.6f)
.height(6.dp)
- .clip(RoundedCornerShape(3.dp)),
- color = MaterialTheme.colorScheme.primary,
- trackColor = MaterialTheme.colorScheme.surfaceVariant,
- )
+ .clip(RoundedCornerShape(3.dp))
+ .background(MaterialTheme.colorScheme.surfaceVariant)
+ ) {
+ Box(
+ modifier = Modifier
+ .fillMaxWidth(level)
+ .height(6.dp)
+ .background(MaterialTheme.colorScheme.primary)
+ )
+ }
}
}
@@ -602,3 +619,70 @@ private fun StatItem(label: String, value: String) {
)
}
}
+
+@Composable
+private fun DebugReportCard(
+ available: Boolean,
+ status: String?,
+ onSend: () -> Unit,
+ onDismiss: () -> Unit
+) {
+ Surface(
+ modifier = Modifier.fillMaxWidth(),
+ color = MaterialTheme.colorScheme.surfaceVariant.copy(alpha = 0.7f),
+ shape = RoundedCornerShape(12.dp)
+ ) {
+ Column(
+ modifier = Modifier.padding(16.dp),
+ horizontalAlignment = Alignment.CenterHorizontally
+ ) {
+ Text(
+ text = "Debug Report",
+ style = MaterialTheme.typography.titleSmall.copy(fontWeight = FontWeight.Bold),
+ color = MaterialTheme.colorScheme.onSurface
+ )
+ Spacer(modifier = Modifier.height(4.dp))
+ Text(
+ text = "Email call recordings, logs & stats for analysis",
+ style = MaterialTheme.typography.bodySmall,
+ color = MaterialTheme.colorScheme.onSurfaceVariant,
+ textAlign = TextAlign.Center
+ )
+
+ Spacer(modifier = Modifier.height(12.dp))
+
+ when {
+ status != null && status.startsWith("Error") -> {
+ Text(
+ text = status,
+ style = MaterialTheme.typography.bodySmall,
+ color = MaterialTheme.colorScheme.error
+ )
+ Spacer(modifier = Modifier.height(8.dp))
+ Row(horizontalArrangement = Arrangement.spacedBy(8.dp)) {
+ OutlinedButton(onClick = onSend) { Text("Retry") }
+ TextButton(onClick = onDismiss) { Text("Dismiss") }
+ }
+ }
+ status != null && status != "ready" -> {
+ // Preparing zip...
+ Text(
+ text = status,
+ style = MaterialTheme.typography.bodySmall,
+ color = MaterialTheme.colorScheme.onSurfaceVariant
+ )
+ }
+ available -> {
+ Row(horizontalArrangement = Arrangement.spacedBy(8.dp)) {
+ Button(onClick = onSend) {
+ Text("Email Report")
+ }
+ TextButton(onClick = onDismiss) {
+ Text("Skip")
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt b/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt
index f990be4..6a083c2 100644
--- a/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt
+++ b/android/app/src/main/java/com/wzp/ui/settings/SettingsScreen.kt
@@ -69,6 +69,7 @@ fun SettingsScreen(
val currentPreferIPv6 by viewModel.preferIPv6.collectAsState()
val currentPlayoutGain by viewModel.playoutGainDb.collectAsState()
val currentCaptureGain by viewModel.captureGainDb.collectAsState()
+ val currentAecEnabled by viewModel.aecEnabled.collectAsState()
// Draft state — initialized from current values
var draftAlias by remember { mutableStateOf(currentAlias) }
@@ -79,6 +80,7 @@ fun SettingsScreen(
var draftPreferIPv6 by remember { mutableStateOf(currentPreferIPv6) }
var draftPlayoutGain by remember { mutableFloatStateOf(currentPlayoutGain) }
var draftCaptureGain by remember { mutableFloatStateOf(currentCaptureGain) }
+ var draftAecEnabled by remember { mutableStateOf(currentAecEnabled) }
// Track if anything changed
val hasChanges = draftAlias != currentAlias ||
@@ -88,7 +90,8 @@ fun SettingsScreen(
draftRoomName != currentRoomName ||
draftPreferIPv6 != currentPreferIPv6 ||
draftPlayoutGain != currentPlayoutGain ||
- draftCaptureGain != currentCaptureGain
+ draftCaptureGain != currentCaptureGain ||
+ draftAecEnabled != currentAecEnabled
var showAddServerDialog by remember { mutableStateOf(false) }
var showRestoreKeyDialog by remember { mutableStateOf(false) }
@@ -130,6 +133,7 @@ fun SettingsScreen(
viewModel.setPreferIPv6(draftPreferIPv6)
viewModel.setPlayoutGainDb(draftPlayoutGain)
viewModel.setCaptureGainDb(draftCaptureGain)
+ viewModel.setAecEnabled(draftAecEnabled)
Toast.makeText(context, "Settings saved", Toast.LENGTH_SHORT).show()
onBack()
},
@@ -204,6 +208,29 @@ fun SettingsScreen(
onGainChange = { draftCaptureGain = Math.round(it).toFloat() }
)
+ Spacer(modifier = Modifier.height(12.dp))
+
+ Row(
+ verticalAlignment = Alignment.CenterVertically,
+ modifier = Modifier.fillMaxWidth()
+ ) {
+ Column(modifier = Modifier.weight(1f)) {
+ Text(
+ text = "Echo Cancellation (AEC)",
+ style = MaterialTheme.typography.bodyMedium
+ )
+ Text(
+ text = "Disable if audio sounds distorted",
+ style = MaterialTheme.typography.bodySmall,
+ color = MaterialTheme.colorScheme.onSurfaceVariant
+ )
+ }
+ Switch(
+ checked = draftAecEnabled,
+ onCheckedChange = { draftAecEnabled = it }
+ )
+ }
+
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
diff --git a/android/app/src/main/res/xml/file_paths.xml b/android/app/src/main/res/xml/file_paths.xml
new file mode 100644
index 0000000..45fce9e
--- /dev/null
+++ b/android/app/src/main/res/xml/file_paths.xml
@@ -0,0 +1,4 @@
+
+
+
+
diff --git a/crates/wzp-android/src/engine.rs b/crates/wzp-android/src/engine.rs
index 6f25efb..ea20fb6 100644
--- a/crates/wzp-android/src/engine.rs
+++ b/crates/wzp-android/src/engine.rs
@@ -321,8 +321,18 @@ async fn run_call(
let mut block_id: u8 = 0;
// Send task: capture ring → Opus encode → FEC → MediaPackets
+ //
+ // IMPORTANT: send_media() uses quinn's send_datagram() which is
+ // synchronous and returns Err(Blocked) when the congestion window
+ // is full. We MUST NOT break on send errors — that would kill the
+ // entire call. Instead we drop the packet and keep going.
let send_task = async {
info!("send task started (Opus + RaptorQ FEC)");
+ let mut send_errors: u64 = 0;
+ let mut last_send_error_log = Instant::now();
+ let mut last_stats_log = Instant::now();
+ let mut frames_sent: u64 = 0;
+ let mut frames_dropped: u64 = 0;
loop {
if !state.running.load(Ordering::Relaxed) {
break;
@@ -380,11 +390,24 @@ async fn run_call(
quality_report: None,
};
- // Send source packet
+ // Send source packet — drop on error, never break
if let Err(e) = transport.send_media(&source_pkt).await {
- error!("send error: {e}");
- break;
+ send_errors += 1;
+ frames_dropped += 1;
+ // Log first few errors, then throttle to once per second
+ if send_errors <= 3 || last_send_error_log.elapsed().as_secs() >= 1 {
+ warn!(
+ seq = s,
+ send_errors,
+ frames_dropped,
+ "send_media error (dropping packet): {e}"
+ );
+ last_send_error_log = Instant::now();
+ }
+ // Don't feed to FEC either — the source is lost
+ continue;
}
+ frames_sent += 1;
// Feed encoded frame to FEC encoder
if let Err(e) = fec_enc.add_source_symbol(encoded) {
@@ -418,9 +441,11 @@ async fn run_call(
payload: Bytes::from(repair_data),
quality_report: None,
};
- if let Err(e) = transport.send_media(&repair_pkt).await {
- error!("send repair error: {e}");
- break;
+ // Drop repair packets on error — never break
+ if let Err(_e) = transport.send_media(&repair_pkt).await {
+ send_errors += 1;
+ frames_dropped += 1;
+ // Don't log every repair failure — source error log covers it
}
}
if repair_count > 0 && (block_id % 50 == 0 || block_id == 0) {
@@ -442,10 +467,21 @@ async fn run_call(
frame_in_block = 0;
}
- if s % 500 == 0 {
- info!(seq = s, block_id, frame_in_block, "sending");
+ // Periodic stats every 5 seconds
+ if last_stats_log.elapsed().as_secs() >= 5 {
+ info!(
+ seq = s,
+ block_id,
+ frames_sent,
+ frames_dropped,
+ send_errors,
+ ring_avail = state.capture_ring.available(),
+ "send stats"
+ );
+ last_stats_log = Instant::now();
}
}
+ info!(frames_sent, frames_dropped, send_errors, "send task ended");
};
// Pre-allocate decode buffer
@@ -455,6 +491,10 @@ async fn run_call(
let recv_task = async {
let mut frames_decoded: u64 = 0;
let mut fec_recovered: u64 = 0;
+ let mut recv_errors: u64 = 0;
+ let mut last_recv_instant = Instant::now();
+ let mut max_recv_gap_ms: u64 = 0;
+ let mut last_stats_log = Instant::now();
info!("recv task started (Opus + RaptorQ FEC)");
loop {
if !state.running.load(Ordering::Relaxed) {
@@ -462,6 +502,21 @@ async fn run_call(
}
match transport_recv.recv_media().await {
Ok(Some(pkt)) => {
+ // Track recv gaps — large gaps indicate network or relay issues
+ let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
+ last_recv_instant = Instant::now();
+ if recv_gap_ms > max_recv_gap_ms {
+ max_recv_gap_ms = recv_gap_ms;
+ }
+ if recv_gap_ms > 500 {
+ warn!(
+ recv_gap_ms,
+ seq = pkt.header.seq,
+ is_repair = pkt.header.is_repair,
+ "large recv gap — possible network stall"
+ );
+ }
+
let is_repair = pkt.header.is_repair;
let pkt_block = pkt.header.fec_block;
let pkt_symbol = pkt.header.fec_symbol;
@@ -478,7 +533,6 @@ async fn run_call(
if !is_repair {
match decoder.decode(&pkt.payload, &mut decode_buf) {
Ok(samples) => {
- // AGC on playout — normalizes received audio volume
playout_agc.process_frame(&mut decode_buf[..samples]);
state.playout_ring.write(&decode_buf[..samples]);
frames_decoded += 1;
@@ -493,13 +547,8 @@ async fn run_call(
}
}
- // Try FEC recovery for this block
- // (useful when source packets were lost but repair arrived)
+ // Try FEC recovery
if let Ok(Some(recovered_frames)) = fec_dec.try_decode(pkt_block) {
- // FEC recovered the block — any previously missing frames
- // are now available. In a full jitter buffer implementation,
- // we'd insert recovered frames at the right position.
- // For now, log recovery for telemetry.
fec_recovered += recovered_frames.len() as u64;
if fec_recovered % 50 == 1 {
info!(
@@ -516,24 +565,45 @@ async fn run_call(
fec_dec.expire_before(pkt_block.wrapping_sub(3));
}
- if frames_decoded == 1 || frames_decoded % 500 == 0 {
- info!(frames_decoded, fec_recovered, "recv stats");
- }
-
let mut stats = state.stats.lock().unwrap();
stats.frames_decoded = frames_decoded;
stats.fec_recovered = fec_recovered;
+ drop(stats);
+
+ // Periodic stats every 5 seconds
+ if last_stats_log.elapsed().as_secs() >= 5 {
+ info!(
+ frames_decoded,
+ fec_recovered,
+ recv_errors,
+ max_recv_gap_ms,
+ playout_avail = state.playout_ring.available(),
+ "recv stats"
+ );
+ max_recv_gap_ms = 0;
+ last_stats_log = Instant::now();
+ }
}
Ok(None) => {
- info!("relay disconnected");
+ info!(frames_decoded, fec_recovered, "relay disconnected (stream ended)");
break;
}
Err(e) => {
- error!("recv error: {e}");
- break;
+ recv_errors += 1;
+ // Transient errors: log and keep going
+ let msg = e.to_string();
+ if msg.contains("closed") || msg.contains("reset") {
+ error!(recv_errors, "recv fatal: {e}");
+ break;
+ }
+ // Non-fatal: log throttled
+ if recv_errors <= 3 || recv_errors % 50 == 0 {
+ warn!(recv_errors, "recv error (continuing): {e}");
+ }
}
}
}
+ info!(frames_decoded, fec_recovered, recv_errors, "recv task ended");
};
// Stats task — polls path quality + quinn RTT every 500ms
diff --git a/crates/wzp-relay/src/room.rs b/crates/wzp-relay/src/room.rs
index 616538f..70850ad 100644
--- a/crates/wzp-relay/src/room.rs
+++ b/crates/wzp-relay/src/room.rs
@@ -10,7 +10,7 @@ use std::time::Duration;
use bytes::Bytes;
use tokio::sync::Mutex;
-use tracing::{error, info, warn};
+use tracing::{debug, error, info, trace, warn};
use wzp_proto::packet::TrunkFrame;
use wzp_proto::MediaTransport;
@@ -375,55 +375,121 @@ async fn run_participant_plain(
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
+ let mut last_recv_instant = std::time::Instant::now();
+ let mut max_recv_gap_ms = 0u64;
+ let mut max_forward_ms = 0u64;
+ let mut send_errors = 0u64;
+ let mut last_log_instant = std::time::Instant::now();
+
+ info!(
+ room = %room_name,
+ participant = participant_id,
+ %addr,
+ session = session_id,
+ "forwarding loop started (plain)"
+ );
loop {
+ let recv_start = std::time::Instant::now();
let pkt = match transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
- info!(%addr, participant = participant_id, "disconnected");
+ info!(%addr, participant = participant_id, forwarded = packets_forwarded, "disconnected (stream ended)");
break;
}
Err(e) => {
let msg = e.to_string();
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
- info!(%addr, participant = participant_id, "connection closed: {e}");
+ info!(%addr, participant = participant_id, forwarded = packets_forwarded, "connection closed: {e}");
} else {
- error!(%addr, participant = participant_id, "recv error: {e}");
+ error!(%addr, participant = participant_id, forwarded = packets_forwarded, "recv error: {e}");
}
break;
}
};
+ let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
+ last_recv_instant = std::time::Instant::now();
+ if recv_gap_ms > max_recv_gap_ms {
+ max_recv_gap_ms = recv_gap_ms;
+ }
+ // Log if recv gap is suspiciously large (>200ms = missed ~10 packets)
+ if recv_gap_ms > 200 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ recv_gap_ms,
+ seq = pkt.header.seq,
+ "large recv gap"
+ );
+ }
+
// Update per-session quality metrics if a quality report is present
if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report);
}
// Get current list of other participants
+ let lock_start = std::time::Instant::now();
let others = {
let mgr = room_mgr.lock().await;
mgr.others(&room_name, participant_id)
};
+ let lock_ms = lock_start.elapsed().as_millis() as u64;
+ if lock_ms > 10 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ lock_ms,
+ "slow room_mgr lock"
+ );
+ }
// Forward to all others
+ let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
match other {
ParticipantSender::Quic(t) => {
- let _ = t.send_media(&pkt).await;
+ if let Err(e) = t.send_media(&pkt).await {
+ send_errors += 1;
+ if send_errors <= 5 || send_errors % 100 == 0 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ peer = %t.connection().remote_address(),
+ total_send_errors = send_errors,
+ "send_media error: {e}"
+ );
+ }
+ }
}
ParticipantSender::WebSocket(_) => {
- // WS clients receive raw payload bytes
let _ = other.send_raw(&pkt.payload).await;
}
}
}
+ let fwd_ms = fwd_start.elapsed().as_millis() as u64;
+ if fwd_ms > max_forward_ms {
+ max_forward_ms = fwd_ms;
+ }
+ if fwd_ms > 50 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ fwd_ms,
+ fan_out = others.len(),
+ "slow forward"
+ );
+ }
let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
- if packets_forwarded % 500 == 0 {
+
+ // Periodic stats log every 5 seconds
+ if last_log_instant.elapsed() >= Duration::from_secs(5) {
let room_size = {
let mgr = room_mgr.lock().await;
mgr.room_size(&room_name)
@@ -433,8 +499,15 @@ async fn run_participant_plain(
participant = participant_id,
forwarded = packets_forwarded,
room_size,
+ fan_out,
+ max_recv_gap_ms,
+ max_forward_ms,
+ send_errors,
"participant stats"
);
+ max_recv_gap_ms = 0;
+ max_forward_ms = 0;
+ last_log_instant = std::time::Instant::now();
}
}
@@ -459,6 +532,19 @@ async fn run_participant_trunked(
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
+ let mut last_recv_instant = std::time::Instant::now();
+ let mut max_recv_gap_ms = 0u64;
+ let mut max_forward_ms = 0u64;
+ let mut send_errors = 0u64;
+ let mut last_log_instant = std::time::Instant::now();
+
+ info!(
+ room = %room_name,
+ participant = participant_id,
+ %addr,
+ session = session_id,
+ "forwarding loop started (trunked)"
+ );
// Per-peer TrunkedForwarders, keyed by the raw pointer of the peer
// transport (stable for the Arc's lifetime). We use the remote address
@@ -480,24 +566,50 @@ async fn run_participant_trunked(
let pkt = match result {
Ok(Some(pkt)) => pkt,
Ok(None) => {
- info!(%addr, participant = participant_id, "disconnected");
+ info!(%addr, participant = participant_id, forwarded = packets_forwarded, "disconnected (stream ended)");
break;
}
Err(e) => {
- error!(%addr, participant = participant_id, "recv error: {e}");
+ error!(%addr, participant = participant_id, forwarded = packets_forwarded, "recv error: {e}");
break;
}
};
+ let recv_gap_ms = last_recv_instant.elapsed().as_millis() as u64;
+ last_recv_instant = std::time::Instant::now();
+ if recv_gap_ms > max_recv_gap_ms {
+ max_recv_gap_ms = recv_gap_ms;
+ }
+ if recv_gap_ms > 200 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ recv_gap_ms,
+ seq = pkt.header.seq,
+ "large recv gap (trunked)"
+ );
+ }
+
if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report);
}
+ let lock_start = std::time::Instant::now();
let others = {
let mgr = room_mgr.lock().await;
mgr.others(&room_name, participant_id)
};
+ let lock_ms = lock_start.elapsed().as_millis() as u64;
+ if lock_ms > 10 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ lock_ms,
+ "slow room_mgr lock (trunked)"
+ );
+ }
+ let fwd_start = std::time::Instant::now();
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
match other {
@@ -507,21 +619,44 @@ async fn run_participant_trunked(
.entry(peer_addr)
.or_insert_with(|| TrunkedForwarder::new(t.clone(), sid_bytes));
if let Err(e) = fwd.send(&pkt).await {
- let _ = e;
+ send_errors += 1;
+ if send_errors <= 5 || send_errors % 100 == 0 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ peer = %peer_addr,
+ total_send_errors = send_errors,
+ "trunked send error: {e}"
+ );
+ }
}
}
ParticipantSender::WebSocket(_) => {
- // WS clients bypass trunking — send raw payload directly
let _ = other.send_raw(&pkt.payload).await;
}
}
}
+ let fwd_ms = fwd_start.elapsed().as_millis() as u64;
+ if fwd_ms > max_forward_ms {
+ max_forward_ms = fwd_ms;
+ }
+ if fwd_ms > 50 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ fwd_ms,
+ fan_out = others.len(),
+ "slow forward (trunked)"
+ );
+ }
let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
- if packets_forwarded % 500 == 0 {
+
+ // Periodic stats every 5 seconds
+ if last_log_instant.elapsed() >= Duration::from_secs(5) {
let room_size = {
let mgr = room_mgr.lock().await;
mgr.room_size(&room_name)
@@ -531,15 +666,30 @@ async fn run_participant_trunked(
participant = participant_id,
forwarded = packets_forwarded,
room_size,
+ fan_out,
+ max_recv_gap_ms,
+ max_forward_ms,
+ send_errors,
"participant stats (trunked)"
);
+ max_recv_gap_ms = 0;
+ max_forward_ms = 0;
+ last_log_instant = std::time::Instant::now();
}
}
_ = flush_interval.tick() => {
for fwd in forwarders.values_mut() {
if let Err(e) = fwd.flush().await {
- let _ = e;
+ send_errors += 1;
+ if send_errors <= 5 || send_errors % 100 == 0 {
+ warn!(
+ room = %room_name,
+ participant = participant_id,
+ total_send_errors = send_errors,
+ "trunk flush error: {e}"
+ );
+ }
}
}
}