4 Commits

Author SHA1 Message Date
Siavash Sameni
ba29d8354f fix: send alias via CallOffer handshake (match Android approach)
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m44s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 09:10:07 +04:00
Siavash Sameni
0908507a7a Merge remote-tracking branch 'origin/feat/android-voip-client' into feat/desktop-audio-rewrite 2026-04-06 09:04:55 +04:00
Siavash Sameni
860c90394d feat: rewrite desktop audio I/O with lock-free ring buffers
- Replace Mutex-based CPAL callbacks with atomic SPSC ring buffers
- Proper async send/recv loops (no block_on), 20ms playout tick
- Add signal task for RoomUpdate presence display
- Add --alias, --raw-room flags and key persistence (~/.wzp/identity)
- Add SetAlias signal variant and relay-side handling
- Graceful Ctrl+C shutdown with force-quit on second press

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 09:04:51 +04:00
Claude
0835c36d0f feat: settings page with persistence, client alias in handshake, fix null fingerprints
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m34s
- Add SettingsScreen with identity (alias, key backup/restore), audio defaults,
  server management, network prefs, and default room
- SettingsRepository persists all settings via SharedPreferences
- Auto-generate random display names on first launch (e.g. "Swift Wolf")
- Thread alias through CallOffer → relay handshake → RoomUpdate broadcast
- Derive caller fingerprint from identity key in relay handshake (fixes null
  fingerprints when --auth-url is not set)
- Persist identity seed for stable fingerprints across reconnects
- Add alias field to SignalMessage::CallOffer (serde default for backward compat)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 03:56:33 +00:00
18 changed files with 1239 additions and 226 deletions

View File

@@ -0,0 +1,135 @@
package com.wzp.data
import android.content.Context
import android.content.SharedPreferences
import com.wzp.ui.call.ServerEntry
import org.json.JSONArray
import org.json.JSONObject
import java.security.SecureRandom
/**
* Persists user settings via SharedPreferences.
*
* Stores: servers, default server index, room name, alias, gain values,
* IPv6 preference, and the identity seed (hex-encoded 32 bytes).
*/
class SettingsRepository(context: Context) {
private val prefs: SharedPreferences =
context.applicationContext.getSharedPreferences("wzp_settings", Context.MODE_PRIVATE)
companion object {
private const val KEY_SERVERS = "servers_json"
private const val KEY_SELECTED_SERVER = "selected_server"
private const val KEY_ROOM = "room_name"
private const val KEY_ALIAS = "alias"
private const val KEY_PLAYOUT_GAIN = "playout_gain_db"
private const val KEY_CAPTURE_GAIN = "capture_gain_db"
private const val KEY_PREFER_IPV6 = "prefer_ipv6"
private const val KEY_IDENTITY_SEED = "identity_seed_hex"
}
// --- Servers ---
fun saveServers(servers: List<ServerEntry>) {
val arr = JSONArray()
servers.forEach { entry ->
arr.put(JSONObject().apply {
put("address", entry.address)
put("label", entry.label)
})
}
prefs.edit().putString(KEY_SERVERS, arr.toString()).apply()
}
fun loadServers(): List<ServerEntry>? {
val json = prefs.getString(KEY_SERVERS, null) ?: return null
return try {
val arr = JSONArray(json)
(0 until arr.length()).map { i ->
val obj = arr.getJSONObject(i)
ServerEntry(obj.getString("address"), obj.getString("label"))
}
} catch (_: Exception) { null }
}
fun saveSelectedServer(index: Int) {
prefs.edit().putInt(KEY_SELECTED_SERVER, index).apply()
}
fun loadSelectedServer(): Int = prefs.getInt(KEY_SELECTED_SERVER, 0)
// --- Room ---
fun saveRoom(name: String) { prefs.edit().putString(KEY_ROOM, name).apply() }
fun loadRoom(): String = prefs.getString(KEY_ROOM, "android") ?: "android"
// --- Alias ---
fun saveAlias(alias: String) { prefs.edit().putString(KEY_ALIAS, alias).apply() }
/**
* Load alias, generating a random name on first launch.
*/
fun getOrCreateAlias(): String {
val existing = prefs.getString(KEY_ALIAS, null)
if (!existing.isNullOrEmpty()) return existing
val name = generateRandomName()
prefs.edit().putString(KEY_ALIAS, name).apply()
return name
}
private fun generateRandomName(): String {
val adjectives = listOf(
"Swift", "Silent", "Brave", "Calm", "Dark", "Fierce", "Ghost",
"Iron", "Lucky", "Noble", "Quick", "Sharp", "Storm", "Wild",
"Cold", "Bright", "Lone", "Red", "Grey", "Frosty", "Dusty",
"Rusty", "Neon", "Void", "Solar", "Lunar", "Cyber", "Pixel",
"Sonic", "Hyper", "Turbo", "Nano", "Mega", "Ultra", "Zinc"
)
val nouns = listOf(
"Wolf", "Hawk", "Fox", "Bear", "Lynx", "Crow", "Viper",
"Cobra", "Tiger", "Eagle", "Shark", "Raven", "Falcon", "Otter",
"Mantis", "Panda", "Jackal", "Badger", "Heron", "Bison",
"Condor", "Coyote", "Gecko", "Hornet", "Marten", "Osprey",
"Parrot", "Puma", "Raptor", "Stork", "Toucan", "Walrus"
)
val adj = adjectives.random()
val noun = nouns.random()
return "$adj $noun"
}
// --- Gain ---
fun savePlayoutGain(db: Float) { prefs.edit().putFloat(KEY_PLAYOUT_GAIN, db).apply() }
fun loadPlayoutGain(): Float = prefs.getFloat(KEY_PLAYOUT_GAIN, 0f)
fun saveCaptureGain(db: Float) { prefs.edit().putFloat(KEY_CAPTURE_GAIN, db).apply() }
fun loadCaptureGain(): Float = prefs.getFloat(KEY_CAPTURE_GAIN, 0f)
// --- IPv6 ---
fun savePreferIPv6(prefer: Boolean) { prefs.edit().putBoolean(KEY_PREFER_IPV6, prefer).apply() }
fun loadPreferIPv6(): Boolean = prefs.getBoolean(KEY_PREFER_IPV6, false)
// --- Identity seed ---
/**
* Get or generate the identity seed. On first call, generates a random
* 32-byte seed and persists it. Subsequent calls return the same seed.
*/
fun getOrCreateSeedHex(): String {
val existing = prefs.getString(KEY_IDENTITY_SEED, null)
if (!existing.isNullOrEmpty()) return existing
val seed = ByteArray(32).also { SecureRandom().nextBytes(it) }
val hex = seed.joinToString("") { "%02x".format(it) }
prefs.edit().putString(KEY_IDENTITY_SEED, hex).apply()
return hex
}
fun loadSeedHex(): String = prefs.getString(KEY_IDENTITY_SEED, "") ?: ""
fun saveSeedHex(hex: String) {
prefs.edit().putString(KEY_IDENTITY_SEED, hex).apply()
}
}

View File

@@ -35,11 +35,12 @@ class WzpEngine(private val callback: WzpCallback) {
* @param room room identifier (used as QUIC SNI) * @param room room identifier (used as QUIC SNI)
* @param seedHex 64-char hex-encoded 32-byte identity seed (empty = random) * @param seedHex 64-char hex-encoded 32-byte identity seed (empty = random)
* @param token authentication token (empty = no auth) * @param token authentication token (empty = no auth)
* @param alias display name sent to relay for room participant list
* @return 0 on success, negative error code on failure * @return 0 on success, negative error code on failure
*/ */
fun startCall(relayAddr: String, room: String, seedHex: String = "", token: String = ""): Int { fun startCall(relayAddr: String, room: String, seedHex: String = "", token: String = "", alias: String = ""): Int {
check(nativeHandle != 0L) { "Engine not initialized" } check(nativeHandle != 0L) { "Engine not initialized" }
val result = nativeStartCall(nativeHandle, relayAddr, room, seedHex, token) val result = nativeStartCall(nativeHandle, relayAddr, room, seedHex, token, alias)
if (result == 0) { if (result == 0) {
callback.onCallStateChanged(CallStateConstants.CONNECTING) callback.onCallStateChanged(CallStateConstants.CONNECTING)
} else { } else {
@@ -120,7 +121,7 @@ class WzpEngine(private val callback: WzpCallback) {
private external fun nativeInit(): Long private external fun nativeInit(): Long
private external fun nativeStartCall( private external fun nativeStartCall(
handle: Long, relay: String, room: String, seed: String, token: String handle: Long, relay: String, room: String, seed: String, token: String, alias: String
): Int ): Int
private external fun nativeStopCall(handle: Long) private external fun nativeStopCall(handle: Long)
private external fun nativeSetMute(handle: Long, muted: Boolean) private external fun nativeSetMute(handle: Long, muted: Boolean)

View File

@@ -15,8 +15,13 @@ import androidx.compose.material3.dynamicLightColorScheme
import androidx.compose.material3.lightColorScheme import androidx.compose.material3.lightColorScheme
import androidx.compose.foundation.isSystemInDarkTheme import androidx.compose.foundation.isSystemInDarkTheme
import androidx.compose.runtime.Composable import androidx.compose.runtime.Composable
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import androidx.compose.ui.platform.LocalContext import androidx.compose.ui.platform.LocalContext
import androidx.core.content.ContextCompat import androidx.core.content.ContextCompat
import com.wzp.ui.settings.SettingsScreen
/** /**
* Main activity hosting the in-call Compose UI. * Main activity hosting the in-call Compose UI.
@@ -43,14 +48,21 @@ class CallActivity : ComponentActivity() {
setContent { setContent {
WzpTheme { WzpTheme {
var showSettings by remember { mutableStateOf(false) }
if (showSettings) {
SettingsScreen(
viewModel = viewModel,
onBack = { showSettings = false }
)
} else {
InCallScreen( InCallScreen(
viewModel = viewModel, viewModel = viewModel,
onHangUp = { onHangUp = { viewModel.stopCall() },
viewModel.stopCall() onOpenSettings = { showSettings = true }
}
) )
} }
} }
}
if (ContextCompat.checkSelfPermission(this, Manifest.permission.RECORD_AUDIO) if (ContextCompat.checkSelfPermission(this, Manifest.permission.RECORD_AUDIO)
!= PackageManager.PERMISSION_GRANTED != PackageManager.PERMISSION_GRANTED

View File

@@ -6,6 +6,7 @@ import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope import androidx.lifecycle.viewModelScope
import com.wzp.audio.AudioPipeline import com.wzp.audio.AudioPipeline
import com.wzp.audio.AudioRouteManager import com.wzp.audio.AudioRouteManager
import com.wzp.data.SettingsRepository
import com.wzp.engine.CallStats import com.wzp.engine.CallStats
import com.wzp.service.CallService import com.wzp.service.CallService
import com.wzp.engine.WzpCallback import com.wzp.engine.WzpCallback
@@ -31,6 +32,7 @@ class CallViewModel : ViewModel(), WzpCallback {
private var audioRouteManager: AudioRouteManager? = null private var audioRouteManager: AudioRouteManager? = null
private var audioStarted = false private var audioStarted = false
private var appContext: Context? = null private var appContext: Context? = null
private var settings: SettingsRepository? = null
private val _callState = MutableStateFlow(0) private val _callState = MutableStateFlow(0)
val callState: StateFlow<Int> get() = _callState.asStateFlow() val callState: StateFlow<Int> get() = _callState.asStateFlow()
@@ -68,6 +70,12 @@ class CallViewModel : ViewModel(), WzpCallback {
private val _captureGainDb = MutableStateFlow(0f) private val _captureGainDb = MutableStateFlow(0f)
val captureGainDb: StateFlow<Float> = _captureGainDb.asStateFlow() val captureGainDb: StateFlow<Float> = _captureGainDb.asStateFlow()
private val _alias = MutableStateFlow("")
val alias: StateFlow<String> = _alias.asStateFlow()
private val _seedHex = MutableStateFlow("")
val seedHex: StateFlow<String> = _seedHex.asStateFlow()
private var statsJob: Job? = null private var statsJob: Job? = null
companion object { companion object {
@@ -88,20 +96,43 @@ class CallViewModel : ViewModel(), WzpCallback {
if (audioRouteManager == null) { if (audioRouteManager == null) {
audioRouteManager = AudioRouteManager(appCtx) audioRouteManager = AudioRouteManager(appCtx)
} }
if (settings == null) {
settings = SettingsRepository(appCtx)
loadSettings()
}
}
private fun loadSettings() {
val s = settings ?: return
s.loadServers()?.let { saved ->
if (saved.isNotEmpty()) _servers.value = saved
}
_selectedServer.value = s.loadSelectedServer().coerceIn(0, _servers.value.lastIndex)
_roomName.value = s.loadRoom()
_alias.value = s.getOrCreateAlias()
_preferIPv6.value = s.loadPreferIPv6()
_playoutGainDb.value = s.loadPlayoutGain()
_captureGainDb.value = s.loadCaptureGain()
_seedHex.value = s.getOrCreateSeedHex()
} }
fun selectServer(index: Int) { fun selectServer(index: Int) {
if (index in _servers.value.indices) { if (index in _servers.value.indices) {
_selectedServer.value = index _selectedServer.value = index
settings?.saveSelectedServer(index)
} }
} }
fun setPreferIPv6(prefer: Boolean) { _preferIPv6.value = prefer } fun setPreferIPv6(prefer: Boolean) {
_preferIPv6.value = prefer
settings?.savePreferIPv6(prefer)
}
fun addServer(hostPort: String, label: String) { fun addServer(hostPort: String, label: String) {
val current = _servers.value.toMutableList() val current = _servers.value.toMutableList()
current.add(ServerEntry(hostPort, label)) current.add(ServerEntry(hostPort, label))
_servers.value = current _servers.value = current
settings?.saveServers(current)
} }
fun removeServer(index: Int) { fun removeServer(index: Int) {
@@ -113,19 +144,36 @@ class CallViewModel : ViewModel(), WzpCallback {
if (_selectedServer.value >= current.size) { if (_selectedServer.value >= current.size) {
_selectedServer.value = 0 _selectedServer.value = 0
} }
settings?.saveServers(current)
settings?.saveSelectedServer(_selectedServer.value)
} }
} }
fun setRoomName(name: String) { _roomName.value = name } fun setRoomName(name: String) {
_roomName.value = name
settings?.saveRoom(name)
}
fun setPlayoutGainDb(db: Float) { fun setPlayoutGainDb(db: Float) {
_playoutGainDb.value = db _playoutGainDb.value = db
audioPipeline?.playoutGainDb = db audioPipeline?.playoutGainDb = db
settings?.savePlayoutGain(db)
} }
fun setCaptureGainDb(db: Float) { fun setCaptureGainDb(db: Float) {
_captureGainDb.value = db _captureGainDb.value = db
audioPipeline?.captureGainDb = db audioPipeline?.captureGainDb = db
settings?.saveCaptureGain(db)
}
fun setAlias(alias: String) {
_alias.value = alias
settings?.saveAlias(alias)
}
fun restoreSeed(hex: String) {
_seedHex.value = hex
settings?.saveSeedHex(hex)
} }
/** /**
@@ -203,8 +251,10 @@ class CallViewModel : ViewModel(), WzpCallback {
viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) { viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) {
try { try {
val relay = resolveToIp(serverEntry.address) val relay = resolveToIp(serverEntry.address)
Log.i(TAG, "startCall: resolved=$relay, calling engine.startCall") val seed = _seedHex.value
val result = engine?.startCall(relay, room) ?: -1 val name = _alias.value
Log.i(TAG, "startCall: resolved=$relay, alias=$name, calling engine.startCall")
val result = engine?.startCall(relay, room, seedHex = seed, alias = name) ?: -1
Log.i(TAG, "startCall: engine returned $result") Log.i(TAG, "startCall: engine returned $result")
// Only wire up notification callback after engine is running // Only wire up notification callback after engine is running
CallService.onStopFromNotification = { stopCall() } CallService.onStopFromNotification = { stopCall() }

View File

@@ -54,7 +54,8 @@ import kotlin.math.roundToInt
@Composable @Composable
fun InCallScreen( fun InCallScreen(
viewModel: CallViewModel, viewModel: CallViewModel,
onHangUp: () -> Unit onHangUp: () -> Unit,
onOpenSettings: () -> Unit = {}
) { ) {
val callState by viewModel.callState.collectAsState() val callState by viewModel.callState.collectAsState()
val isMuted by viewModel.isMuted.collectAsState() val isMuted by viewModel.isMuted.collectAsState()
@@ -82,7 +83,16 @@ fun InCallScreen(
.verticalScroll(rememberScrollState()), .verticalScroll(rememberScrollState()),
horizontalAlignment = Alignment.CenterHorizontally horizontalAlignment = Alignment.CenterHorizontally
) { ) {
Spacer(modifier = Modifier.height(48.dp)) // Settings button (top-right)
if (callState == 0) {
Row(modifier = Modifier.fillMaxWidth(), horizontalArrangement = Arrangement.End) {
TextButton(onClick = onOpenSettings) {
Text("Settings")
}
}
}
Spacer(modifier = Modifier.height(if (callState == 0) 16.dp else 48.dp))
Text( Text(
text = "WZ Phone", text = "WZ Phone",

View File

@@ -0,0 +1,437 @@
package com.wzp.ui.settings
import android.content.ClipData
import android.content.ClipboardManager
import android.content.Context
import android.widget.Toast
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.ExperimentalLayoutApi
import androidx.compose.foundation.layout.FlowRow
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.width
import androidx.compose.foundation.rememberScrollState
import androidx.compose.foundation.shape.RoundedCornerShape
import androidx.compose.foundation.verticalScroll
import androidx.compose.material3.AlertDialog
import androidx.compose.material3.Button
import androidx.compose.material3.ButtonDefaults
import androidx.compose.material3.FilledTonalButton
import androidx.compose.material3.FilledTonalIconButton
import androidx.compose.material3.Divider
import androidx.compose.material3.IconButtonDefaults
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.OutlinedButton
import androidx.compose.material3.OutlinedTextField
import androidx.compose.material3.Slider
import androidx.compose.material3.Surface
import androidx.compose.material3.Switch
import androidx.compose.material3.Text
import androidx.compose.material3.TextButton
import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.text.font.FontFamily
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.unit.dp
import com.wzp.ui.call.CallViewModel
@OptIn(ExperimentalLayoutApi::class)
@Composable
fun SettingsScreen(
viewModel: CallViewModel,
onBack: () -> Unit
) {
val context = LocalContext.current
val servers by viewModel.servers.collectAsState()
val selectedServer by viewModel.selectedServer.collectAsState()
val roomName by viewModel.roomName.collectAsState()
val preferIPv6 by viewModel.preferIPv6.collectAsState()
val playoutGainDb by viewModel.playoutGainDb.collectAsState()
val captureGainDb by viewModel.captureGainDb.collectAsState()
val alias by viewModel.alias.collectAsState()
val seedHex by viewModel.seedHex.collectAsState()
var showAddServerDialog by remember { mutableStateOf(false) }
var showRestoreKeyDialog by remember { mutableStateOf(false) }
Surface(
modifier = Modifier.fillMaxSize(),
color = MaterialTheme.colorScheme.background
) {
Column(
modifier = Modifier
.fillMaxSize()
.padding(24.dp)
.verticalScroll(rememberScrollState())
) {
// Header
Row(
modifier = Modifier.fillMaxWidth(),
verticalAlignment = Alignment.CenterVertically
) {
TextButton(onClick = onBack) {
Text("< Back")
}
Spacer(modifier = Modifier.weight(1f))
Text(
text = "Settings",
style = MaterialTheme.typography.headlineSmall.copy(
fontWeight = FontWeight.Bold
),
color = MaterialTheme.colorScheme.primary
)
Spacer(modifier = Modifier.weight(1f))
// Balance the back button
Spacer(modifier = Modifier.width(64.dp))
}
Spacer(modifier = Modifier.height(24.dp))
// --- Identity ---
SectionHeader("Identity")
OutlinedTextField(
value = alias,
onValueChange = { viewModel.setAlias(it) },
label = { Text("Display Name") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(16.dp))
// Fingerprint display
val fingerprint = if (seedHex.length >= 16) seedHex.take(16).uppercase() else "Not generated"
Text(
text = "Fingerprint",
style = MaterialTheme.typography.labelSmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Text(
text = fingerprint.chunked(4).joinToString(" "),
style = MaterialTheme.typography.bodyMedium.copy(
fontFamily = FontFamily.Monospace
),
color = MaterialTheme.colorScheme.onSurface
)
Spacer(modifier = Modifier.height(12.dp))
// Key backup/restore
Row(horizontalArrangement = Arrangement.spacedBy(8.dp)) {
FilledTonalButton(onClick = {
val clipboard = context.getSystemService(Context.CLIPBOARD_SERVICE) as ClipboardManager
clipboard.setPrimaryClip(ClipData.newPlainText("WZP Key", seedHex))
Toast.makeText(context, "Key copied to clipboard", Toast.LENGTH_SHORT).show()
}) {
Text("Copy Key")
}
OutlinedButton(onClick = { showRestoreKeyDialog = true }) {
Text("Restore Key")
}
}
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Audio ---
SectionHeader("Audio Defaults")
GainSlider(
label = "Voice Volume",
gainDb = playoutGainDb,
onGainChange = { viewModel.setPlayoutGainDb(it) }
)
Spacer(modifier = Modifier.height(4.dp))
GainSlider(
label = "Mic Gain",
gainDb = captureGainDb,
onGainChange = { viewModel.setCaptureGainDb(it) }
)
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Servers ---
SectionHeader("Servers")
FlowRow(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.Start,
verticalArrangement = Arrangement.spacedBy(4.dp)
) {
servers.forEachIndexed { idx, entry ->
val isSelected = selectedServer == idx
Row(verticalAlignment = Alignment.CenterVertically) {
FilledTonalIconButton(
onClick = { viewModel.selectServer(idx) },
modifier = Modifier
.padding(end = 2.dp)
.height(36.dp)
.width(140.dp),
shape = RoundedCornerShape(8.dp),
colors = if (isSelected) {
IconButtonDefaults.filledTonalIconButtonColors(
containerColor = MaterialTheme.colorScheme.primaryContainer,
contentColor = MaterialTheme.colorScheme.onPrimaryContainer
)
} else {
IconButtonDefaults.filledTonalIconButtonColors()
}
) {
Text(
text = entry.label,
style = MaterialTheme.typography.labelSmall,
maxLines = 1
)
}
// Show remove button for non-default servers
if (idx >= 2) {
TextButton(
onClick = { viewModel.removeServer(idx) },
modifier = Modifier.height(36.dp)
) {
Text("X", color = MaterialTheme.colorScheme.error)
}
}
}
}
}
Spacer(modifier = Modifier.height(8.dp))
OutlinedButton(
onClick = { showAddServerDialog = true },
shape = RoundedCornerShape(8.dp)
) {
Text("+ Add Server")
}
// Show selected server address
Spacer(modifier = Modifier.height(8.dp))
Text(
text = "Default: ${servers.getOrNull(selectedServer)?.address ?: "none"}",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Network ---
SectionHeader("Network")
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth()
) {
Text(
text = "Prefer IPv6",
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.weight(1f)
)
Switch(
checked = preferIPv6,
onCheckedChange = { viewModel.setPreferIPv6(it) }
)
}
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Room ---
SectionHeader("Room")
OutlinedTextField(
value = roomName,
onValueChange = { viewModel.setRoomName(it) },
label = { Text("Default Room") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(32.dp))
}
}
if (showAddServerDialog) {
AddServerDialog(
onDismiss = { showAddServerDialog = false },
onAdd = { host, port, label ->
viewModel.addServer("$host:$port", label)
showAddServerDialog = false
}
)
}
if (showRestoreKeyDialog) {
RestoreKeyDialog(
onDismiss = { showRestoreKeyDialog = false },
onRestore = { hex ->
viewModel.restoreSeed(hex)
showRestoreKeyDialog = false
Toast.makeText(context, "Key restored", Toast.LENGTH_SHORT).show()
}
)
}
}
@Composable
private fun SectionHeader(title: String) {
Text(
text = title,
style = MaterialTheme.typography.titleMedium.copy(fontWeight = FontWeight.Bold),
color = MaterialTheme.colorScheme.primary
)
Spacer(modifier = Modifier.height(8.dp))
}
@Composable
private fun GainSlider(label: String, gainDb: Float, onGainChange: (Float) -> Unit) {
Column(
modifier = Modifier.fillMaxWidth(),
horizontalAlignment = Alignment.CenterHorizontally
) {
val sign = if (gainDb >= 0) "+" else ""
Text(
text = "$label: ${sign}${"%.0f".format(gainDb)} dB",
style = MaterialTheme.typography.labelSmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Slider(
value = gainDb,
onValueChange = { onGainChange(Math.round(it).toFloat()) },
valueRange = -20f..20f,
steps = 0,
modifier = Modifier.fillMaxWidth()
)
}
}
@Composable
private fun AddServerDialog(
onDismiss: () -> Unit,
onAdd: (host: String, port: String, label: String) -> Unit
) {
var host by remember { mutableStateOf("") }
var port by remember { mutableStateOf("4433") }
var label by remember { mutableStateOf("") }
AlertDialog(
onDismissRequest = onDismiss,
title = { Text("Add Server") },
text = {
Column {
OutlinedTextField(
value = host,
onValueChange = { host = it },
label = { Text("Host (IP or domain)") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(8.dp))
OutlinedTextField(
value = port,
onValueChange = { port = it },
label = { Text("Port") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(8.dp))
OutlinedTextField(
value = label,
onValueChange = { label = it },
label = { Text("Label (optional)") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
}
},
confirmButton = {
TextButton(
onClick = {
if (host.isNotBlank()) {
val displayLabel = label.ifBlank { host }
onAdd(host.trim(), port.trim(), displayLabel)
}
}
) { Text("Add") }
},
dismissButton = {
TextButton(onClick = onDismiss) { Text("Cancel") }
}
)
}
@Composable
private fun RestoreKeyDialog(
onDismiss: () -> Unit,
onRestore: (hex: String) -> Unit
) {
var keyInput by remember { mutableStateOf("") }
var error by remember { mutableStateOf<String?>(null) }
AlertDialog(
onDismissRequest = onDismiss,
title = { Text("Restore Identity Key") },
text = {
Column {
Text(
text = "Paste your 64-character hex key below. This will replace your current identity.",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Spacer(modifier = Modifier.height(8.dp))
OutlinedTextField(
value = keyInput,
onValueChange = {
keyInput = it.trim().lowercase()
error = null
},
label = { Text("Identity Key (hex)") },
singleLine = true,
modifier = Modifier.fillMaxWidth(),
isError = error != null
)
error?.let {
Text(
text = it,
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.error
)
}
}
},
confirmButton = {
TextButton(
onClick = {
val cleaned = keyInput.replace("\\s".toRegex(), "")
if (cleaned.length != 64 || !cleaned.all { it in '0'..'9' || it in 'a'..'f' }) {
error = "Key must be exactly 64 hex characters"
} else {
onRestore(cleaned)
}
}
) { Text("Restore") }
},
dismissButton = {
TextButton(onClick = onDismiss) { Text("Cancel") }
}
)
}

View File

@@ -39,6 +39,7 @@ pub struct CallStartConfig {
pub room: String, pub room: String,
pub auth_token: Vec<u8>, pub auth_token: Vec<u8>,
pub identity_seed: [u8; 32], pub identity_seed: [u8; 32],
pub alias: Option<String>,
} }
impl Default for CallStartConfig { impl Default for CallStartConfig {
@@ -49,6 +50,7 @@ impl Default for CallStartConfig {
room: String::new(), room: String::new(),
auth_token: Vec::new(), auth_token: Vec::new(),
identity_seed: [0u8; 32], identity_seed: [0u8; 32],
alias: None,
} }
} }
} }
@@ -117,6 +119,7 @@ impl WzpEngine {
let room = config.room.clone(); let room = config.room.clone();
let identity_seed = config.identity_seed; let identity_seed = config.identity_seed;
let profile = config.profile; let profile = config.profile;
let alias = config.alias.clone();
let state = self.state.clone(); let state = self.state.clone();
self.state.running.store(true, Ordering::Release); self.state.running.store(true, Ordering::Release);
@@ -124,7 +127,7 @@ impl WzpEngine {
let state_clone = state.clone(); let state_clone = state.clone();
runtime.block_on(async move { runtime.block_on(async move {
if let Err(e) = run_call(relay_addr, &room, &identity_seed, profile, state_clone).await if let Err(e) = run_call(relay_addr, &room, &identity_seed, profile, alias.as_deref(), state_clone).await
{ {
error!("call failed: {e}"); error!("call failed: {e}");
} }
@@ -204,6 +207,7 @@ async fn run_call(
room: &str, room: &str,
identity_seed: &[u8; 32], identity_seed: &[u8; 32],
profile: QualityProfile, profile: QualityProfile,
alias: Option<&str>,
state: Arc<EngineState>, state: Arc<EngineState>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let _ = rustls::crypto::ring::default_provider().install_default(); let _ = rustls::crypto::ring::default_provider().install_default();
@@ -238,6 +242,7 @@ async fn run_call(
QualityProfile::DEGRADED, QualityProfile::DEGRADED,
QualityProfile::CATASTROPHIC, QualityProfile::CATASTROPHIC,
], ],
alias: alias.map(|s| s.to_string()),
}; };
transport.send_signal(&offer).await?; transport.send_signal(&offer).await?;
info!("CallOffer sent, waiting for CallAnswer..."); info!("CallOffer sent, waiting for CallAnswer...");

View File

@@ -54,12 +54,14 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartCall(
room_j: JString, room_j: JString,
seed_hex_j: JString, seed_hex_j: JString,
token_j: JString, token_j: JString,
alias_j: JString,
) -> jint { ) -> jint {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default(); let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default();
let room: String = env.get_string(&room_j).map(|s| s.into()).unwrap_or_default(); let room: String = env.get_string(&room_j).map(|s| s.into()).unwrap_or_default();
let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default(); let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default();
let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default(); let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default();
let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default();
let h = unsafe { handle_ref(handle) }; let h = unsafe { handle_ref(handle) };
@@ -83,6 +85,7 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartCall(
room, room,
auth_token: if token.is_empty() { Vec::new() } else { token.into_bytes() }, auth_token: if token.is_empty() { Vec::new() } else { token.into_bytes() },
identity_seed, identity_seed,
alias: if alias.is_empty() { None } else { Some(alias) },
}; };
match h.engine.start_call(config) { match h.engine.start_call(config) {

View File

@@ -3,12 +3,10 @@
//! Both structs use 48 kHz, mono, i16 format to match the WarzonePhone codec //! Both structs use 48 kHz, mono, i16 format to match the WarzonePhone codec
//! pipeline. Frames are 960 samples (20 ms at 48 kHz). //! pipeline. Frames are 960 samples (20 ms at 48 kHz).
//! //!
//! The cpal `Stream` type is not `Send`, so each struct spawns a dedicated OS //! Audio callbacks are **lock-free**: they read/write directly to an `AudioRing`
//! thread that owns the stream. The public API exposes only `Send + Sync` //! (atomic SPSC ring buffer). No Mutex, no channel, no allocation on the hot path.
//! channel handles.
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
@@ -16,6 +14,8 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{SampleFormat, SampleRate, StreamConfig}; use cpal::{SampleFormat, SampleRate, StreamConfig};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::audio_ring::AudioRing;
/// Number of samples per 20 ms frame at 48 kHz mono. /// Number of samples per 20 ms frame at 48 kHz mono.
pub const FRAME_SAMPLES: usize = 960; pub const FRAME_SAMPLES: usize = 960;
@@ -23,22 +23,24 @@ pub const FRAME_SAMPLES: usize = 960;
// AudioCapture // AudioCapture
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Captures microphone input and yields 960-sample PCM frames. /// Captures microphone input via CPAL and writes PCM into a lock-free ring buffer.
/// ///
/// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`. /// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`.
pub struct AudioCapture { pub struct AudioCapture {
rx: mpsc::Receiver<Vec<i16>>, ring: Arc<AudioRing>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
} }
impl AudioCapture { impl AudioCapture {
/// Create and start capturing from the default input device at 48 kHz mono. /// Create and start capturing from the default input device at 48 kHz mono.
pub fn start() -> Result<Self, anyhow::Error> { pub fn start() -> Result<Self, anyhow::Error> {
let (tx, rx) = mpsc::sync_channel::<Vec<i16>>(64); let ring = Arc::new(AudioRing::new());
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let (init_tx, init_rx) = mpsc::sync_channel::<Result<(), String>>(1); let (init_tx, init_rx) = std::sync::mpsc::sync_channel::<Result<(), String>>(1);
let ring_cb = ring.clone();
let running_clone = running.clone();
std::thread::Builder::new() std::thread::Builder::new()
.name("wzp-audio-capture".into()) .name("wzp-audio-capture".into())
@@ -54,21 +56,17 @@ impl AudioCapture {
let config = StreamConfig { let config = StreamConfig {
channels: 1, channels: 1,
sample_rate: SampleRate(48_000), sample_rate: SampleRate(48_000),
buffer_size: cpal::BufferSize::Default, buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32),
}; };
let use_f32 = !supports_i16_input(&device)?; let use_f32 = !supports_i16_input(&device)?;
let buf = Arc::new(std::sync::Mutex::new(
Vec::<i16>::with_capacity(FRAME_SAMPLES),
));
let err_cb = |e: cpal::StreamError| { let err_cb = |e: cpal::StreamError| {
warn!("input stream error: {e}"); warn!("input stream error: {e}");
}; };
let stream = if use_f32 { let stream = if use_f32 {
let buf = buf.clone(); let ring = ring_cb.clone();
let tx = tx.clone();
let running = running_clone.clone(); let running = running_clone.clone();
device.build_input_stream( device.build_input_stream(
&config, &config,
@@ -76,21 +74,22 @@ impl AudioCapture {
if !running.load(Ordering::Relaxed) { if !running.load(Ordering::Relaxed) {
return; return;
} }
let mut lock = buf.lock().unwrap(); // Batch convert f32 → i16, then write entire slice to ring.
for &s in data { // Stack alloc for typical callback sizes (≤ 960 samples).
lock.push(f32_to_i16(s)); let mut tmp = [0i16; FRAME_SAMPLES];
if lock.len() == FRAME_SAMPLES { for chunk in data.chunks(FRAME_SAMPLES) {
let frame = lock.drain(..).collect(); let n = chunk.len();
let _ = tx.try_send(frame); for i in 0..n {
tmp[i] = f32_to_i16(chunk[i]);
} }
ring.write(&tmp[..n]);
} }
}, },
err_cb, err_cb,
None, None,
)? )?
} else { } else {
let buf = buf.clone(); let ring = ring_cb.clone();
let tx = tx.clone();
let running = running_clone.clone(); let running = running_clone.clone();
device.build_input_stream( device.build_input_stream(
&config, &config,
@@ -98,14 +97,7 @@ impl AudioCapture {
if !running.load(Ordering::Relaxed) { if !running.load(Ordering::Relaxed) {
return; return;
} }
let mut lock = buf.lock().unwrap(); ring.write(data);
for &s in data {
lock.push(s);
if lock.len() == FRAME_SAMPLES {
let frame = lock.drain(..).collect();
let _ = tx.try_send(frame);
}
}
}, },
err_cb, err_cb,
None, None,
@@ -114,7 +106,6 @@ impl AudioCapture {
stream.play().context("failed to start input stream")?; stream.play().context("failed to start input stream")?;
// Signal success to the caller before parking.
let _ = init_tx.send(Ok(())); let _ = init_tx.send(Ok(()));
// Keep stream alive until stopped. // Keep stream alive until stopped.
@@ -135,15 +126,12 @@ impl AudioCapture {
.map_err(|_| anyhow!("capture thread exited before signaling"))? .map_err(|_| anyhow!("capture thread exited before signaling"))?
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;
Ok(Self { rx, running }) Ok(Self { ring, running })
} }
/// Read the next frame of 960 PCM samples (blocking until available). /// Get a reference to the capture ring buffer for direct polling.
/// pub fn ring(&self) -> &Arc<AudioRing> {
/// Returns `None` when the stream has been stopped or the channel is &self.ring
/// disconnected.
pub fn read_frame(&self) -> Option<Vec<i16>> {
self.rx.recv().ok()
} }
/// Stop capturing. /// Stop capturing.
@@ -152,26 +140,34 @@ impl AudioCapture {
} }
} }
impl Drop for AudioCapture {
fn drop(&mut self) {
self.stop();
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// AudioPlayback // AudioPlayback
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Plays PCM frames through the default output device at 48 kHz mono. /// Plays PCM through the default output device, reading from a lock-free ring buffer.
/// ///
/// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`. /// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`.
pub struct AudioPlayback { pub struct AudioPlayback {
tx: mpsc::SyncSender<Vec<i16>>, ring: Arc<AudioRing>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
} }
impl AudioPlayback { impl AudioPlayback {
/// Create and start playback on the default output device at 48 kHz mono. /// Create and start playback on the default output device at 48 kHz mono.
pub fn start() -> Result<Self, anyhow::Error> { pub fn start() -> Result<Self, anyhow::Error> {
let (tx, rx) = mpsc::sync_channel::<Vec<i16>>(64); let ring = Arc::new(AudioRing::new());
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let (init_tx, init_rx) = mpsc::sync_channel::<Result<(), String>>(1); let (init_tx, init_rx) = std::sync::mpsc::sync_channel::<Result<(), String>>(1);
let ring_cb = ring.clone();
let running_clone = running.clone();
std::thread::Builder::new() std::thread::Builder::new()
.name("wzp-audio-playback".into()) .name("wzp-audio-playback".into())
@@ -187,67 +183,45 @@ impl AudioPlayback {
let config = StreamConfig { let config = StreamConfig {
channels: 1, channels: 1,
sample_rate: SampleRate(48_000), sample_rate: SampleRate(48_000),
buffer_size: cpal::BufferSize::Default, buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32),
}; };
let use_f32 = !supports_i16_output(&device)?; let use_f32 = !supports_i16_output(&device)?;
// Shared ring of samples the cpal callback drains from.
let ring = Arc::new(std::sync::Mutex::new(
std::collections::VecDeque::<i16>::with_capacity(FRAME_SAMPLES * 8),
));
// Background drainer: moves frames from the mpsc channel into the ring.
{
let ring = ring.clone();
let running = running_clone.clone();
std::thread::Builder::new()
.name("wzp-playback-drain".into())
.spawn(move || {
while running.load(Ordering::Relaxed) {
match rx.recv_timeout(std::time::Duration::from_millis(100)) {
Ok(frame) => {
let mut lock = ring.lock().unwrap();
lock.extend(frame);
while lock.len() > FRAME_SAMPLES * 16 {
lock.pop_front();
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
})?;
}
let err_cb = |e: cpal::StreamError| { let err_cb = |e: cpal::StreamError| {
warn!("output stream error: {e}"); warn!("output stream error: {e}");
}; };
let stream = if use_f32 { let stream = if use_f32 {
let ring = ring.clone(); let ring = ring_cb.clone();
device.build_output_stream( device.build_output_stream(
&config, &config,
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let mut lock = ring.lock().unwrap(); let mut tmp = [0i16; FRAME_SAMPLES];
for sample in data.iter_mut() { for chunk in data.chunks_mut(FRAME_SAMPLES) {
*sample = match lock.pop_front() { let n = chunk.len();
Some(s) => i16_to_f32(s), let read = ring.read(&mut tmp[..n]);
None => 0.0, for i in 0..read {
}; chunk[i] = i16_to_f32(tmp[i]);
}
// Fill remainder with silence if ring underran
for i in read..n {
chunk[i] = 0.0;
}
} }
}, },
err_cb, err_cb,
None, None,
)? )?
} else { } else {
let ring = ring.clone(); let ring = ring_cb.clone();
device.build_output_stream( device.build_output_stream(
&config, &config,
move |data: &mut [i16], _: &cpal::OutputCallbackInfo| { move |data: &mut [i16], _: &cpal::OutputCallbackInfo| {
let mut lock = ring.lock().unwrap(); let read = ring.read(data);
for sample in data.iter_mut() { // Fill remainder with silence if ring underran
*sample = lock.pop_front().unwrap_or(0); for sample in &mut data[read..] {
*sample = 0;
} }
}, },
err_cb, err_cb,
@@ -257,7 +231,6 @@ impl AudioPlayback {
stream.play().context("failed to start output stream")?; stream.play().context("failed to start output stream")?;
// Signal success to the caller before parking.
let _ = init_tx.send(Ok(())); let _ = init_tx.send(Ok(()));
// Keep stream alive until stopped. // Keep stream alive until stopped.
@@ -278,12 +251,12 @@ impl AudioPlayback {
.map_err(|_| anyhow!("playback thread exited before signaling"))? .map_err(|_| anyhow!("playback thread exited before signaling"))?
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;
Ok(Self { tx, running }) Ok(Self { ring, running })
} }
/// Write a frame of PCM samples for playback. /// Get a reference to the playout ring buffer for direct writing.
pub fn write_frame(&self, pcm: &[i16]) { pub fn ring(&self) -> &Arc<AudioRing> {
let _ = self.tx.try_send(pcm.to_vec()); &self.ring
} }
/// Stop playback. /// Stop playback.
@@ -292,11 +265,16 @@ impl AudioPlayback {
} }
} }
impl Drop for AudioPlayback {
fn drop(&mut self) {
self.stop();
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helpers // Helpers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Check if the input device supports i16 at 48 kHz mono.
fn supports_i16_input(device: &cpal::Device) -> Result<bool, anyhow::Error> { fn supports_i16_input(device: &cpal::Device) -> Result<bool, anyhow::Error> {
let supported = device let supported = device
.supported_input_configs() .supported_input_configs()
@@ -313,7 +291,6 @@ fn supports_i16_input(device: &cpal::Device) -> Result<bool, anyhow::Error> {
Ok(false) Ok(false)
} }
/// Check if the output device supports i16 at 48 kHz mono.
fn supports_i16_output(device: &cpal::Device) -> Result<bool, anyhow::Error> { fn supports_i16_output(device: &cpal::Device) -> Result<bool, anyhow::Error> {
let supported = device let supported = device
.supported_output_configs() .supported_output_configs()

View File

@@ -0,0 +1,89 @@
//! Lock-free SPSC ring buffer for audio PCM transfer between
//! CPAL audio callbacks and the Rust engine.
//!
//! Identical design to wzp-android's audio_ring: the producer writes and
//! advances a write cursor, the consumer reads and advances a read cursor.
//! Both cursors are atomic — no mutex, no blocking on the audio thread.
use std::sync::atomic::{AtomicUsize, Ordering};
/// Ring buffer capacity in i16 samples.
/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono.
const RING_CAPACITY: usize = 960 * 10;
/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples.
pub struct AudioRing {
buf: Box<[i16; RING_CAPACITY]>,
write_pos: AtomicUsize,
read_pos: AtomicUsize,
}
// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads.
// The atomics ensure visibility. The buffer itself is never accessed
// from the same index by both threads simultaneously because the
// producer only writes to positions between write_pos and read_pos,
// and the consumer only reads from positions between read_pos and write_pos.
unsafe impl Send for AudioRing {}
unsafe impl Sync for AudioRing {}
impl AudioRing {
pub fn new() -> Self {
Self {
buf: Box::new([0i16; RING_CAPACITY]),
write_pos: AtomicUsize::new(0),
read_pos: AtomicUsize::new(0),
}
}
/// Number of samples available to read.
pub fn available(&self) -> usize {
let w = self.write_pos.load(Ordering::Acquire);
let r = self.read_pos.load(Ordering::Acquire);
w.wrapping_sub(r)
}
/// Write samples into the ring. Returns number of samples written.
/// Drops oldest samples if the ring is full.
pub fn write(&self, samples: &[i16]) -> usize {
let w = self.write_pos.load(Ordering::Relaxed);
let count = samples.len().min(RING_CAPACITY);
for i in 0..count {
let idx = (w + i) % RING_CAPACITY;
unsafe {
let ptr = self.buf.as_ptr() as *mut i16;
*ptr.add(idx) = samples[i];
}
}
self.write_pos
.store(w.wrapping_add(count), Ordering::Release);
// If we overwrote unread data, advance read_pos
if self.available() > RING_CAPACITY {
let new_read = self
.write_pos
.load(Ordering::Relaxed)
.wrapping_sub(RING_CAPACITY);
self.read_pos.store(new_read, Ordering::Release);
}
count
}
/// Read samples from the ring into `out`. Returns number of samples read.
pub fn read(&self, out: &mut [i16]) -> usize {
let avail = self.available();
let count = out.len().min(avail);
let r = self.read_pos.load(Ordering::Relaxed);
for i in 0..count {
let idx = (r + i) % RING_CAPACITY;
out[i] = unsafe { *self.buf.as_ptr().add(idx) };
}
self.read_pos
.store(r.wrapping_add(count), Ordering::Release);
count
}
}

View File

@@ -45,12 +45,22 @@ struct CliArgs {
seed_hex: Option<String>, seed_hex: Option<String>,
mnemonic: Option<String>, mnemonic: Option<String>,
room: Option<String>, room: Option<String>,
raw_room: bool,
alias: Option<String>,
token: Option<String>, token: Option<String>,
_metrics_file: Option<String>, _metrics_file: Option<String>,
} }
/// Default identity file path: ~/.wzp/identity
fn default_identity_path() -> std::path::PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
std::path::PathBuf::from(home).join(".wzp").join("identity")
}
impl CliArgs { impl CliArgs {
/// Resolve the identity seed from --seed, --mnemonic, or generate a new one. /// Resolve the identity seed from --seed, --mnemonic, or persistent file.
///
/// Priority: --seed > --mnemonic > ~/.wzp/identity > generate + save.
pub fn resolve_seed(&self) -> wzp_crypto::Seed { pub fn resolve_seed(&self) -> wzp_crypto::Seed {
if let Some(ref hex_str) = self.seed_hex { if let Some(ref hex_str) = self.seed_hex {
let seed = wzp_crypto::Seed::from_hex(hex_str).expect("invalid --seed hex"); let seed = wzp_crypto::Seed::from_hex(hex_str).expect("invalid --seed hex");
@@ -65,10 +75,30 @@ impl CliArgs {
info!(fingerprint = %fp, "identity from --mnemonic"); info!(fingerprint = %fp, "identity from --mnemonic");
seed seed
} else { } else {
let path = default_identity_path();
// Try loading existing identity
if path.exists() {
if let Ok(hex_str) = std::fs::read_to_string(&path) {
let hex_str = hex_str.trim();
if let Ok(seed) = wzp_crypto::Seed::from_hex(hex_str) {
let id = seed.derive_identity();
let fp = id.public_identity().fingerprint;
info!(fingerprint = %fp, path = %path.display(), "loaded persistent identity");
return seed;
}
}
}
// Generate new and save
let seed = wzp_crypto::Seed::generate(); let seed = wzp_crypto::Seed::generate();
let id = seed.derive_identity(); let id = seed.derive_identity();
let fp = id.public_identity().fingerprint; let fp = id.public_identity().fingerprint;
info!(fingerprint = %fp, "generated ephemeral identity"); if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
// Encode seed as hex manually (avoid dep on `hex` crate in binary)
let hex_str: String = seed.0.iter().map(|b| format!("{b:02x}")).collect();
std::fs::write(&path, hex_str).ok();
info!(fingerprint = %fp, path = %path.display(), "generated and saved new identity");
seed seed
} }
} }
@@ -86,6 +116,8 @@ fn parse_args() -> CliArgs {
let mut seed_hex = None; let mut seed_hex = None;
let mut mnemonic = None; let mut mnemonic = None;
let mut room = None; let mut room = None;
let mut raw_room = false;
let mut alias = None;
let mut token = None; let mut token = None;
let mut metrics_file = None; let mut metrics_file = None;
let mut relay_str = None; let mut relay_str = None;
@@ -130,6 +162,11 @@ fn parse_args() -> CliArgs {
i += 1; i += 1;
room = Some(args.get(i).expect("--room requires a name").to_string()); room = Some(args.get(i).expect("--room requires a name").to_string());
} }
"--raw-room" => raw_room = true,
"--alias" => {
i += 1;
alias = Some(args.get(i).expect("--alias requires a name").to_string());
}
"--token" => { "--token" => {
i += 1; i += 1;
token = Some(args.get(i).expect("--token requires a value").to_string()); token = Some(args.get(i).expect("--token requires a value").to_string());
@@ -183,10 +220,13 @@ fn parse_args() -> CliArgs {
eprintln!(" --seed <hex> Identity seed (64 hex chars, featherChat compatible)"); eprintln!(" --seed <hex> Identity seed (64 hex chars, featherChat compatible)");
eprintln!(" --mnemonic <words...> Identity seed as BIP39 mnemonic (24 words)"); eprintln!(" --mnemonic <words...> Identity seed as BIP39 mnemonic (24 words)");
eprintln!(" --room <name> Room name (hashed for privacy before sending)"); eprintln!(" --room <name> Room name (hashed for privacy before sending)");
eprintln!(" --raw-room Send room name as-is (no hash, for Android compat)");
eprintln!(" --alias <name> Display name shown to other participants");
eprintln!(" --token <token> featherChat bearer token for relay auth"); eprintln!(" --token <token> featherChat bearer token for relay auth");
eprintln!(" --metrics-file <path> Write JSONL telemetry to file (1 line/sec)"); eprintln!(" --metrics-file <path> Write JSONL telemetry to file (1 line/sec)");
eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)"); eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)");
eprintln!(); eprintln!();
eprintln!("Identity is auto-saved to ~/.wzp/identity on first run.");
eprintln!("Default relay: 127.0.0.1:4433"); eprintln!("Default relay: 127.0.0.1:4433");
std::process::exit(0); std::process::exit(0);
} }
@@ -219,6 +259,8 @@ fn parse_args() -> CliArgs {
seed_hex, seed_hex,
mnemonic, mnemonic,
room, room,
raw_room,
alias,
token, token,
_metrics_file: metrics_file, _metrics_file: metrics_file,
} }
@@ -250,8 +292,14 @@ async fn main() -> anyhow::Result<()> {
"WarzonePhone client" "WarzonePhone client"
); );
// Hash room name for SNI privacy (or "default" if none specified) // Compute SNI from room name.
// --raw-room sends the name as-is (for Android compat — Android doesn't hash).
// Default behaviour hashes for privacy.
let sni = match &cli.room { let sni = match &cli.room {
Some(name) if cli.raw_room => {
info!(room = %name, "using raw room name as SNI (no hash)");
name.clone()
}
Some(name) => { Some(name) => {
let hashed = wzp_crypto::hash_room_name(name); let hashed = wzp_crypto::hash_room_name(name);
info!(room = %name, hashed = %hashed, "room name hashed for SNI"); info!(room = %name, hashed = %hashed, "room name hashed for SNI");
@@ -287,6 +335,7 @@ async fn main() -> anyhow::Result<()> {
let _crypto_session = wzp_client::handshake::perform_handshake( let _crypto_session = wzp_client::handshake::perform_handshake(
&*transport, &*transport,
&seed.0, &seed.0,
cli.alias.as_deref(),
).await?; ).await?;
info!("crypto handshake complete"); info!("crypto handshake complete");
@@ -548,78 +597,225 @@ async fn run_file_mode(
} }
/// Live mode: capture from mic, encode, send; receive, decode, play. /// Live mode: capture from mic, encode, send; receive, decode, play.
///
/// Architecture (mirrors wzp-android/engine.rs):
/// CPAL capture callback → AudioRing → send task (5ms poll) → QUIC
/// QUIC → recv task → jitter buffer → decode tick (20ms) → AudioRing → CPAL playback callback
///
/// All lock-free: CPAL callbacks use atomic ring buffers, no Mutex on the audio path.
#[cfg(feature = "audio")] #[cfg(feature = "audio")]
async fn run_live(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::Result<()> { async fn run_live(
transport: Arc<wzp_transport::QuinnTransport>,
) -> anyhow::Result<()> {
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool, Ordering};
use wzp_client::audio_io::{AudioCapture, AudioPlayback}; use wzp_client::audio_io::{AudioCapture, AudioPlayback};
use wzp_client::call::JitterTelemetry;
let capture = AudioCapture::start()?; let capture = AudioCapture::start()?;
let playback = AudioPlayback::start()?; let playback = AudioPlayback::start()?;
info!("Audio I/O started — press Ctrl+C to stop"); info!("audio I/O started (lock-free ring buffers) — press Ctrl+C to stop");
let capture_ring = capture.ring().clone();
let playout_ring = playback.ring().clone();
let running = StdArc::new(AtomicBool::new(true));
// --- Signal handler: set running=false on first Ctrl+C, force-quit on second ---
let signal_running = running.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
eprintln!(); // newline after ^C
info!("Ctrl+C received, shutting down...");
signal_running.store(false, Ordering::SeqCst);
tokio::signal::ctrl_c().await.ok();
eprintln!("\nForce quit");
std::process::exit(1);
});
let send_transport = transport.clone();
let rt_handle = tokio::runtime::Handle::current();
let send_handle = std::thread::Builder::new()
.name("wzp-send-loop".into())
.spawn(move || {
let config = CallConfig::default(); let config = CallConfig::default();
// --- Send task: poll capture ring → encode → send via async ---
let send_transport = transport.clone();
let send_running = running.clone();
let send_task = async move {
let mut encoder = CallEncoder::new(&config); let mut encoder = CallEncoder::new(&config);
let mut capture_buf = vec![0i16; FRAME_SAMPLES];
let mut frames_sent: u64 = 0;
loop { loop {
let frame = match capture.read_frame() { if !send_running.load(Ordering::Relaxed) {
Some(f) => f, break;
None => break, }
};
let packets = match encoder.encode_frame(&frame) { let avail = capture_ring.available();
if avail < FRAME_SAMPLES {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
continue;
}
let read = capture_ring.read(&mut capture_buf);
if read < FRAME_SAMPLES {
continue;
}
let packets = match encoder.encode_frame(&capture_buf) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
error!("encode error: {e}"); error!("encode error: {e}");
continue; continue;
} }
}; };
for pkt in &packets { for pkt in &packets {
if let Err(e) = rt_handle.block_on(send_transport.send_media(pkt)) { if let Err(e) = send_transport.send_media(pkt).await {
error!("send error: {e}"); error!("send error: {e}");
return; return;
} }
} }
}
})?;
frames_sent += 1;
if frames_sent == 1 || frames_sent % 500 == 0 {
info!(frames_sent, "send progress");
}
}
};
// --- Recv task: receive packets → ingest into jitter buffer ---
// Uses timeout so it can check the running flag and exit on Ctrl+C.
let recv_transport = transport.clone(); let recv_transport = transport.clone();
let recv_handle = tokio::spawn(async move { let recv_running = running.clone();
let config = CallConfig::default(); let config = CallConfig::default();
let mut decoder = CallDecoder::new(&config); let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config)));
let mut pcm_buf = vec![0i16; FRAME_SAMPLES]; let decoder_recv = decoder.clone();
let recv_task = async move {
let mut packets_received: u64 = 0;
loop { loop {
match recv_transport.recv_media().await { if !recv_running.load(Ordering::Relaxed) {
Ok(Some(pkt)) => { break;
let is_repair = pkt.header.is_repair; }
decoder.ingest(pkt); // Timeout so we can check running flag periodically
// Only decode for source packets (1 source = 1 audio frame). let result = tokio::time::timeout(
// Repair packets feed the FEC decoder but don't produce audio. std::time::Duration::from_millis(100),
if !is_repair { recv_transport.recv_media(),
if let Some(_n) = decoder.decode_next(&mut pcm_buf) { )
playback.write_frame(&pcm_buf); .await;
match result {
Ok(Ok(Some(pkt))) => {
let mut dec = decoder_recv.lock().await;
dec.ingest(pkt);
packets_received += 1;
if packets_received == 1 || packets_received % 500 == 0 {
info!(packets_received, depth = dec.stats().current_depth, "recv progress");
} }
} }
} Ok(Ok(None)) => {
Ok(None) => {
info!("connection closed"); info!("connection closed");
break; break;
} }
Err(e) => { Ok(Err(e)) => {
error!("recv error: {e}"); error!("recv error: {e}");
break; break;
} }
Err(_) => {} // timeout — loop and check running flag
} }
} }
}); };
tokio::signal::ctrl_c().await?; // --- Playout tick: decode from jitter buffer at steady 20ms intervals ---
info!("Shutting down..."); let playout_running = running.clone();
let decoder_playout = decoder.clone();
let playout_task = async move {
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
let mut interval = tokio::time::interval(std::time::Duration::from_millis(20));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut telemetry = JitterTelemetry::new(5);
loop {
interval.tick().await;
if !playout_running.load(Ordering::Relaxed) {
break;
}
recv_handle.abort(); let mut dec = decoder_playout.lock().await;
drop(send_handle);
transport.close().await?; // Drain ready frames from jitter buffer into playout ring.
info!("done"); let mut decoded_this_tick = 0;
while let Some(n) = dec.decode_next(&mut pcm_buf) {
playout_ring.write(&pcm_buf[..n]);
decoded_this_tick += 1;
if decoded_this_tick >= 2 {
break; // Don't drain too aggressively in one tick
}
}
telemetry.maybe_log(dec.stats());
}
};
// --- Signal task: listen for RoomUpdate and display presence ---
let signal_transport = transport.clone();
let signal_running = running.clone();
let signal_task = async move {
loop {
if !signal_running.load(Ordering::Relaxed) {
break;
}
let result = tokio::time::timeout(
std::time::Duration::from_millis(200),
signal_transport.recv_signal(),
)
.await;
match result {
Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate { count, participants }))) => {
info!(count, "room update");
for p in &participants {
let name = p
.alias
.as_deref()
.unwrap_or("(no alias)");
let fp = if p.fingerprint.is_empty() {
"(no fingerprint)"
} else {
&p.fingerprint
};
info!(" participant: {name} [{fp}]");
}
}
Ok(Ok(Some(msg))) => {
info!("signal: {:?}", std::mem::discriminant(&msg));
}
Ok(Ok(None)) => {
info!("signal stream closed");
break;
}
Ok(Err(e)) => {
error!("signal recv error: {e}");
break;
}
Err(_) => {} // timeout — loop and check running flag
}
}
};
// --- Run all tasks, exit when any finishes (or running flag cleared by Ctrl+C) ---
tokio::select! {
_ = send_task => info!("send task ended"),
_ = recv_task => info!("recv task ended"),
_ = playout_task => info!("playout task ended"),
_ = signal_task => info!("signal task ended"),
}
running.store(false, Ordering::SeqCst);
capture.stop();
playback.stop();
// Give transport 2s to close gracefully, then bail
match tokio::time::timeout(std::time::Duration::from_secs(2), transport.close()).await {
Ok(Ok(())) => info!("done"),
Ok(Err(e)) => info!("close error (non-fatal): {e}"),
Err(_) => info!("close timed out, exiting anyway"),
}
Ok(()) Ok(())
} }

View File

@@ -110,6 +110,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse
SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::SetAlias { .. } => CallSignalType::Offer, // reuse
} }
} }

View File

@@ -17,6 +17,7 @@ use wzp_proto::{MediaTransport, QualityProfile, SignalMessage};
pub async fn perform_handshake( pub async fn perform_handshake(
transport: &dyn MediaTransport, transport: &dyn MediaTransport,
seed: &[u8; 32], seed: &[u8; 32],
alias: Option<&str>,
) -> Result<Box<dyn CryptoSession>, anyhow::Error> { ) -> Result<Box<dyn CryptoSession>, anyhow::Error> {
// 1. Create key exchange from identity seed // 1. Create key exchange from identity seed
let mut kx = WarzoneKeyExchange::from_identity_seed(seed); let mut kx = WarzoneKeyExchange::from_identity_seed(seed);
@@ -41,6 +42,7 @@ pub async fn perform_handshake(
QualityProfile::DEGRADED, QualityProfile::DEGRADED,
QualityProfile::CATASTROPHIC, QualityProfile::CATASTROPHIC,
], ],
alias: alias.map(|s| s.to_string()),
}; };
transport.send_signal(&offer).await?; transport.send_signal(&offer).await?;

View File

@@ -8,6 +8,8 @@
#[cfg(feature = "audio")] #[cfg(feature = "audio")]
pub mod audio_io; pub mod audio_io;
#[cfg(feature = "audio")]
pub mod audio_ring;
pub mod bench; pub mod bench;
pub mod call; pub mod call;
pub mod drift_test; pub mod drift_test;

View File

@@ -548,6 +548,9 @@ pub enum SignalMessage {
signature: Vec<u8>, signature: Vec<u8>,
/// Supported quality profiles. /// Supported quality profiles.
supported_profiles: Vec<crate::QualityProfile>, supported_profiles: Vec<crate::QualityProfile>,
/// Optional display name set by the caller.
#[serde(default)]
alias: Option<String>,
}, },
/// Call acceptance (analogous to Warzone's WireMessage::CallAnswer). /// Call acceptance (analogous to Warzone's WireMessage::CallAnswer).
@@ -653,6 +656,11 @@ pub enum SignalMessage {
/// List of participants currently in the room. /// List of participants currently in the room.
participants: Vec<RoomParticipant>, participants: Vec<RoomParticipant>,
}, },
/// Set or update the client's display name.
/// Sent by client after joining; relay updates the participant entry and
/// re-broadcasts a RoomUpdate to all participants.
SetAlias { alias: String },
} }
/// A participant entry in a RoomUpdate message. /// A participant entry in a RoomUpdate message.

View File

@@ -15,25 +15,27 @@ use wzp_proto::{MediaTransport, QualityProfile, SignalMessage};
/// 5. Derive shared ChaCha20-Poly1305 session /// 5. Derive shared ChaCha20-Poly1305 session
/// 6. Send `CallAnswer` back /// 6. Send `CallAnswer` back
/// ///
/// Returns the derived `CryptoSession` and the chosen `QualityProfile`. /// Returns the derived `CryptoSession`, the chosen `QualityProfile`, the caller's fingerprint,
/// and the caller's alias (if provided in CallOffer).
pub async fn accept_handshake( pub async fn accept_handshake(
transport: &dyn MediaTransport, transport: &dyn MediaTransport,
seed: &[u8; 32], seed: &[u8; 32],
) -> Result<(Box<dyn CryptoSession>, QualityProfile), anyhow::Error> { ) -> Result<(Box<dyn CryptoSession>, QualityProfile, String, Option<String>), anyhow::Error> {
// 1. Receive CallOffer // 1. Receive CallOffer
let offer = transport let offer = transport
.recv_signal() .recv_signal()
.await? .await?
.ok_or_else(|| anyhow::anyhow!("connection closed before receiving CallOffer"))?; .ok_or_else(|| anyhow::anyhow!("connection closed before receiving CallOffer"))?;
let (caller_identity_pub, caller_ephemeral_pub, caller_signature, supported_profiles) = let (caller_identity_pub, caller_ephemeral_pub, caller_signature, supported_profiles, caller_alias) =
match offer { match offer {
SignalMessage::CallOffer { SignalMessage::CallOffer {
identity_pub, identity_pub,
ephemeral_pub, ephemeral_pub,
signature, signature,
supported_profiles, supported_profiles,
} => (identity_pub, ephemeral_pub, signature, supported_profiles), alias,
} => (identity_pub, ephemeral_pub, signature, supported_profiles, alias),
other => { other => {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"expected CallOffer, got {:?}", "expected CallOffer, got {:?}",
@@ -76,7 +78,13 @@ pub async fn accept_handshake(
}; };
transport.send_signal(&answer).await?; transport.send_signal(&answer).await?;
Ok((session, chosen_profile)) // Derive caller fingerprint from their identity public key (first 8 bytes as hex)
let caller_fp = caller_identity_pub[..8]
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>();
Ok((session, chosen_profile, caller_fp, caller_alias))
} }
/// Select the best quality profile from those the caller supports. /// Select the best quality profile from those the caller supports.

View File

@@ -431,7 +431,7 @@ async fn main() -> anyhow::Result<()> {
// Crypto handshake: verify client identity + negotiate quality profile // Crypto handshake: verify client identity + negotiate quality profile
let handshake_start = std::time::Instant::now(); let handshake_start = std::time::Instant::now();
let (_crypto_session, _chosen_profile) = match wzp_relay::handshake::accept_handshake( let (_crypto_session, _chosen_profile, caller_fp, caller_alias) = match wzp_relay::handshake::accept_handshake(
&*transport, &*transport,
&relay_seed_bytes, &relay_seed_bytes,
).await { ).await {
@@ -448,10 +448,13 @@ async fn main() -> anyhow::Result<()> {
} }
}; };
// Use the caller's identity fingerprint from the handshake
let participant_fp = authenticated_fp.clone().unwrap_or(caller_fp);
// Register in presence registry // Register in presence registry
if let Some(ref fp) = authenticated_fp { {
let mut reg = presence.lock().await; let mut reg = presence.lock().await;
reg.register_local(fp, None, Some(room_name.clone())); reg.register_local(&participant_fp, None, Some(room_name.clone()));
} }
info!(%addr, room = %room_name, "client joining"); info!(%addr, room = %room_name, "client joining");
@@ -506,8 +509,8 @@ async fn main() -> anyhow::Result<()> {
&room_name, &room_name,
addr, addr,
room::ParticipantSender::Quic(transport.clone()), room::ParticipantSender::Quic(transport.clone()),
authenticated_fp.as_deref(), Some(&participant_fp),
None, // alias — TODO: accept from client caller_alias.as_deref(),
) { ) {
Ok((id, update, senders)) => { Ok((id, update, senders)) => {
metrics.active_rooms.set(mgr.list().len() as i64); metrics.active_rooms.set(mgr.list().len() as i64);

View File

@@ -141,6 +141,17 @@ impl Room {
self.participants.iter().map(|p| p.sender.clone()).collect() self.participants.iter().map(|p| p.sender.clone()).collect()
} }
/// Update a participant's alias. Returns true if the participant was found.
fn set_alias(&mut self, id: ParticipantId, alias: String) -> bool {
if let Some(p) = self.participants.iter_mut().find(|p| p.id == id) {
info!(participant = id, %alias, "alias updated");
p.alias = Some(alias);
true
} else {
false
}
}
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.participants.is_empty() self.participants.is_empty()
} }
@@ -255,6 +266,26 @@ impl RoomManager {
} }
} }
/// Update a participant's alias and return a RoomUpdate + senders for broadcasting.
pub fn set_alias(
&mut self,
room_name: &str,
participant_id: ParticipantId,
alias: String,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
if let Some(room) = self.rooms.get_mut(room_name) {
if room.set_alias(participant_id, alias) {
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
return Some((update, senders));
}
}
None
}
/// Get senders for all OTHER participants in a room. /// Get senders for all OTHER participants in a room.
pub fn others( pub fn others(
&self, &self,
@@ -374,10 +405,17 @@ async fn run_participant_plain(
session_id: &str, session_id: &str,
) { ) {
let addr = transport.connection().remote_address(); let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
// Media forwarding task
let media_room_mgr = room_mgr.clone();
let media_room_name = room_name.clone();
let media_transport = transport.clone();
let media_metrics = metrics.clone();
let media_session_id = session_id.to_string();
let media_task = async move {
let mut packets_forwarded = 0u64;
loop { loop {
let pkt = match transport.recv_media().await { let pkt = match media_transport.recv_media().await {
Ok(Some(pkt)) => pkt, Ok(Some(pkt)) => pkt,
Ok(None) => { Ok(None) => {
info!(%addr, participant = participant_id, "disconnected"); info!(%addr, participant = participant_id, "disconnected");
@@ -394,18 +432,15 @@ async fn run_participant_plain(
} }
}; };
// Update per-session quality metrics if a quality report is present
if let Some(ref report) = pkt.quality_report { if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report); media_metrics.update_session_quality(&media_session_id, report);
} }
// Get current list of other participants
let others = { let others = {
let mgr = room_mgr.lock().await; let mgr = media_room_mgr.lock().await;
mgr.others(&room_name, participant_id) mgr.others(&media_room_name, participant_id)
}; };
// Forward to all others
let pkt_bytes = pkt.payload.len() as u64; let pkt_bytes = pkt.payload.len() as u64;
for other in &others { for other in &others {
match other { match other {
@@ -413,23 +448,22 @@ async fn run_participant_plain(
let _ = t.send_media(&pkt).await; let _ = t.send_media(&pkt).await;
} }
ParticipantSender::WebSocket(_) => { ParticipantSender::WebSocket(_) => {
// WS clients receive raw payload bytes
let _ = other.send_raw(&pkt.payload).await; let _ = other.send_raw(&pkt.payload).await;
} }
} }
} }
let fan_out = others.len() as u64; let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out); media_metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out); media_metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1; packets_forwarded += 1;
if packets_forwarded % 500 == 0 { if packets_forwarded % 500 == 0 {
let room_size = { let room_size = {
let mgr = room_mgr.lock().await; let mgr = media_room_mgr.lock().await;
mgr.room_size(&room_name) mgr.room_size(&media_room_name)
}; };
info!( info!(
room = %room_name, room = %media_room_name,
participant = participant_id, participant = participant_id,
forwarded = packets_forwarded, forwarded = packets_forwarded,
room_size, room_size,
@@ -437,6 +471,46 @@ async fn run_participant_plain(
); );
} }
} }
};
// Signal handling task — processes SetAlias and other in-call signals
let signal_room_mgr = room_mgr.clone();
let signal_room_name = room_name.clone();
let signal_transport = transport.clone();
let signal_task = async move {
loop {
match signal_transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::SetAlias { alias })) => {
info!(%addr, participant = participant_id, %alias, "SetAlias received");
let mut mgr = signal_room_mgr.lock().await;
if let Some((update, senders)) =
mgr.set_alias(&signal_room_name, participant_id, alias)
{
drop(mgr);
broadcast_signal(&senders, &update).await;
}
}
Ok(Some(wzp_proto::SignalMessage::Hangup { .. })) => {
info!(%addr, participant = participant_id, "hangup received");
break;
}
Ok(Some(msg)) => {
info!(%addr, participant = participant_id, "signal: {:?}", std::mem::discriminant(&msg));
}
Ok(None) => break,
Err(e) => {
warn!(%addr, participant = participant_id, "signal recv error: {e}");
break;
}
}
}
};
// Run both in parallel — exit when either finishes (disconnection)
tokio::select! {
_ = media_task => {}
_ = signal_task => {}
}
// Clean up — leave room and broadcast update to remaining participants // Clean up — leave room and broadcast update to remaining participants
let mut mgr = room_mgr.lock().await; let mut mgr = room_mgr.lock().await;