4 Commits

Author SHA1 Message Date
Siavash Sameni
ba29d8354f fix: send alias via CallOffer handshake (match Android approach)
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m44s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 09:10:07 +04:00
Siavash Sameni
0908507a7a Merge remote-tracking branch 'origin/feat/android-voip-client' into feat/desktop-audio-rewrite 2026-04-06 09:04:55 +04:00
Siavash Sameni
860c90394d feat: rewrite desktop audio I/O with lock-free ring buffers
- Replace Mutex-based CPAL callbacks with atomic SPSC ring buffers
- Proper async send/recv loops (no block_on), 20ms playout tick
- Add signal task for RoomUpdate presence display
- Add --alias, --raw-room flags and key persistence (~/.wzp/identity)
- Add SetAlias signal variant and relay-side handling
- Graceful Ctrl+C shutdown with force-quit on second press

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 09:04:51 +04:00
Claude
0835c36d0f feat: settings page with persistence, client alias in handshake, fix null fingerprints
Some checks failed
Build Release Binaries / build-amd64 (push) Failing after 3m34s
- Add SettingsScreen with identity (alias, key backup/restore), audio defaults,
  server management, network prefs, and default room
- SettingsRepository persists all settings via SharedPreferences
- Auto-generate random display names on first launch (e.g. "Swift Wolf")
- Thread alias through CallOffer → relay handshake → RoomUpdate broadcast
- Derive caller fingerprint from identity key in relay handshake (fixes null
  fingerprints when --auth-url is not set)
- Persist identity seed for stable fingerprints across reconnects
- Add alias field to SignalMessage::CallOffer (serde default for backward compat)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 03:56:33 +00:00
18 changed files with 1239 additions and 226 deletions

View File

@@ -0,0 +1,135 @@
package com.wzp.data
import android.content.Context
import android.content.SharedPreferences
import com.wzp.ui.call.ServerEntry
import org.json.JSONArray
import org.json.JSONObject
import java.security.SecureRandom
/**
* Persists user settings via SharedPreferences.
*
* Stores: servers, default server index, room name, alias, gain values,
* IPv6 preference, and the identity seed (hex-encoded 32 bytes).
*/
class SettingsRepository(context: Context) {
private val prefs: SharedPreferences =
context.applicationContext.getSharedPreferences("wzp_settings", Context.MODE_PRIVATE)
companion object {
private const val KEY_SERVERS = "servers_json"
private const val KEY_SELECTED_SERVER = "selected_server"
private const val KEY_ROOM = "room_name"
private const val KEY_ALIAS = "alias"
private const val KEY_PLAYOUT_GAIN = "playout_gain_db"
private const val KEY_CAPTURE_GAIN = "capture_gain_db"
private const val KEY_PREFER_IPV6 = "prefer_ipv6"
private const val KEY_IDENTITY_SEED = "identity_seed_hex"
}
// --- Servers ---
fun saveServers(servers: List<ServerEntry>) {
val arr = JSONArray()
servers.forEach { entry ->
arr.put(JSONObject().apply {
put("address", entry.address)
put("label", entry.label)
})
}
prefs.edit().putString(KEY_SERVERS, arr.toString()).apply()
}
fun loadServers(): List<ServerEntry>? {
val json = prefs.getString(KEY_SERVERS, null) ?: return null
return try {
val arr = JSONArray(json)
(0 until arr.length()).map { i ->
val obj = arr.getJSONObject(i)
ServerEntry(obj.getString("address"), obj.getString("label"))
}
} catch (_: Exception) { null }
}
fun saveSelectedServer(index: Int) {
prefs.edit().putInt(KEY_SELECTED_SERVER, index).apply()
}
fun loadSelectedServer(): Int = prefs.getInt(KEY_SELECTED_SERVER, 0)
// --- Room ---
fun saveRoom(name: String) { prefs.edit().putString(KEY_ROOM, name).apply() }
fun loadRoom(): String = prefs.getString(KEY_ROOM, "android") ?: "android"
// --- Alias ---
fun saveAlias(alias: String) { prefs.edit().putString(KEY_ALIAS, alias).apply() }
/**
* Load alias, generating a random name on first launch.
*/
fun getOrCreateAlias(): String {
val existing = prefs.getString(KEY_ALIAS, null)
if (!existing.isNullOrEmpty()) return existing
val name = generateRandomName()
prefs.edit().putString(KEY_ALIAS, name).apply()
return name
}
private fun generateRandomName(): String {
val adjectives = listOf(
"Swift", "Silent", "Brave", "Calm", "Dark", "Fierce", "Ghost",
"Iron", "Lucky", "Noble", "Quick", "Sharp", "Storm", "Wild",
"Cold", "Bright", "Lone", "Red", "Grey", "Frosty", "Dusty",
"Rusty", "Neon", "Void", "Solar", "Lunar", "Cyber", "Pixel",
"Sonic", "Hyper", "Turbo", "Nano", "Mega", "Ultra", "Zinc"
)
val nouns = listOf(
"Wolf", "Hawk", "Fox", "Bear", "Lynx", "Crow", "Viper",
"Cobra", "Tiger", "Eagle", "Shark", "Raven", "Falcon", "Otter",
"Mantis", "Panda", "Jackal", "Badger", "Heron", "Bison",
"Condor", "Coyote", "Gecko", "Hornet", "Marten", "Osprey",
"Parrot", "Puma", "Raptor", "Stork", "Toucan", "Walrus"
)
val adj = adjectives.random()
val noun = nouns.random()
return "$adj $noun"
}
// --- Gain ---
fun savePlayoutGain(db: Float) { prefs.edit().putFloat(KEY_PLAYOUT_GAIN, db).apply() }
fun loadPlayoutGain(): Float = prefs.getFloat(KEY_PLAYOUT_GAIN, 0f)
fun saveCaptureGain(db: Float) { prefs.edit().putFloat(KEY_CAPTURE_GAIN, db).apply() }
fun loadCaptureGain(): Float = prefs.getFloat(KEY_CAPTURE_GAIN, 0f)
// --- IPv6 ---
fun savePreferIPv6(prefer: Boolean) { prefs.edit().putBoolean(KEY_PREFER_IPV6, prefer).apply() }
fun loadPreferIPv6(): Boolean = prefs.getBoolean(KEY_PREFER_IPV6, false)
// --- Identity seed ---
/**
* Get or generate the identity seed. On first call, generates a random
* 32-byte seed and persists it. Subsequent calls return the same seed.
*/
fun getOrCreateSeedHex(): String {
val existing = prefs.getString(KEY_IDENTITY_SEED, null)
if (!existing.isNullOrEmpty()) return existing
val seed = ByteArray(32).also { SecureRandom().nextBytes(it) }
val hex = seed.joinToString("") { "%02x".format(it) }
prefs.edit().putString(KEY_IDENTITY_SEED, hex).apply()
return hex
}
fun loadSeedHex(): String = prefs.getString(KEY_IDENTITY_SEED, "") ?: ""
fun saveSeedHex(hex: String) {
prefs.edit().putString(KEY_IDENTITY_SEED, hex).apply()
}
}

View File

@@ -35,11 +35,12 @@ class WzpEngine(private val callback: WzpCallback) {
* @param room room identifier (used as QUIC SNI)
* @param seedHex 64-char hex-encoded 32-byte identity seed (empty = random)
* @param token authentication token (empty = no auth)
* @param alias display name sent to relay for room participant list
* @return 0 on success, negative error code on failure
*/
fun startCall(relayAddr: String, room: String, seedHex: String = "", token: String = ""): Int {
fun startCall(relayAddr: String, room: String, seedHex: String = "", token: String = "", alias: String = ""): Int {
check(nativeHandle != 0L) { "Engine not initialized" }
val result = nativeStartCall(nativeHandle, relayAddr, room, seedHex, token)
val result = nativeStartCall(nativeHandle, relayAddr, room, seedHex, token, alias)
if (result == 0) {
callback.onCallStateChanged(CallStateConstants.CONNECTING)
} else {
@@ -120,7 +121,7 @@ class WzpEngine(private val callback: WzpCallback) {
private external fun nativeInit(): Long
private external fun nativeStartCall(
handle: Long, relay: String, room: String, seed: String, token: String
handle: Long, relay: String, room: String, seed: String, token: String, alias: String
): Int
private external fun nativeStopCall(handle: Long)
private external fun nativeSetMute(handle: Long, muted: Boolean)

View File

@@ -15,8 +15,13 @@ import androidx.compose.material3.dynamicLightColorScheme
import androidx.compose.material3.lightColorScheme
import androidx.compose.foundation.isSystemInDarkTheme
import androidx.compose.runtime.Composable
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import androidx.compose.ui.platform.LocalContext
import androidx.core.content.ContextCompat
import com.wzp.ui.settings.SettingsScreen
/**
* Main activity hosting the in-call Compose UI.
@@ -43,12 +48,19 @@ class CallActivity : ComponentActivity() {
setContent {
WzpTheme {
InCallScreen(
viewModel = viewModel,
onHangUp = {
viewModel.stopCall()
}
)
var showSettings by remember { mutableStateOf(false) }
if (showSettings) {
SettingsScreen(
viewModel = viewModel,
onBack = { showSettings = false }
)
} else {
InCallScreen(
viewModel = viewModel,
onHangUp = { viewModel.stopCall() },
onOpenSettings = { showSettings = true }
)
}
}
}

View File

@@ -6,6 +6,7 @@ import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.wzp.audio.AudioPipeline
import com.wzp.audio.AudioRouteManager
import com.wzp.data.SettingsRepository
import com.wzp.engine.CallStats
import com.wzp.service.CallService
import com.wzp.engine.WzpCallback
@@ -31,6 +32,7 @@ class CallViewModel : ViewModel(), WzpCallback {
private var audioRouteManager: AudioRouteManager? = null
private var audioStarted = false
private var appContext: Context? = null
private var settings: SettingsRepository? = null
private val _callState = MutableStateFlow(0)
val callState: StateFlow<Int> get() = _callState.asStateFlow()
@@ -68,6 +70,12 @@ class CallViewModel : ViewModel(), WzpCallback {
private val _captureGainDb = MutableStateFlow(0f)
val captureGainDb: StateFlow<Float> = _captureGainDb.asStateFlow()
private val _alias = MutableStateFlow("")
val alias: StateFlow<String> = _alias.asStateFlow()
private val _seedHex = MutableStateFlow("")
val seedHex: StateFlow<String> = _seedHex.asStateFlow()
private var statsJob: Job? = null
companion object {
@@ -88,20 +96,43 @@ class CallViewModel : ViewModel(), WzpCallback {
if (audioRouteManager == null) {
audioRouteManager = AudioRouteManager(appCtx)
}
if (settings == null) {
settings = SettingsRepository(appCtx)
loadSettings()
}
}
private fun loadSettings() {
val s = settings ?: return
s.loadServers()?.let { saved ->
if (saved.isNotEmpty()) _servers.value = saved
}
_selectedServer.value = s.loadSelectedServer().coerceIn(0, _servers.value.lastIndex)
_roomName.value = s.loadRoom()
_alias.value = s.getOrCreateAlias()
_preferIPv6.value = s.loadPreferIPv6()
_playoutGainDb.value = s.loadPlayoutGain()
_captureGainDb.value = s.loadCaptureGain()
_seedHex.value = s.getOrCreateSeedHex()
}
fun selectServer(index: Int) {
if (index in _servers.value.indices) {
_selectedServer.value = index
settings?.saveSelectedServer(index)
}
}
fun setPreferIPv6(prefer: Boolean) { _preferIPv6.value = prefer }
fun setPreferIPv6(prefer: Boolean) {
_preferIPv6.value = prefer
settings?.savePreferIPv6(prefer)
}
fun addServer(hostPort: String, label: String) {
val current = _servers.value.toMutableList()
current.add(ServerEntry(hostPort, label))
_servers.value = current
settings?.saveServers(current)
}
fun removeServer(index: Int) {
@@ -113,19 +144,36 @@ class CallViewModel : ViewModel(), WzpCallback {
if (_selectedServer.value >= current.size) {
_selectedServer.value = 0
}
settings?.saveServers(current)
settings?.saveSelectedServer(_selectedServer.value)
}
}
fun setRoomName(name: String) { _roomName.value = name }
fun setRoomName(name: String) {
_roomName.value = name
settings?.saveRoom(name)
}
fun setPlayoutGainDb(db: Float) {
_playoutGainDb.value = db
audioPipeline?.playoutGainDb = db
settings?.savePlayoutGain(db)
}
fun setCaptureGainDb(db: Float) {
_captureGainDb.value = db
audioPipeline?.captureGainDb = db
settings?.saveCaptureGain(db)
}
fun setAlias(alias: String) {
_alias.value = alias
settings?.saveAlias(alias)
}
fun restoreSeed(hex: String) {
_seedHex.value = hex
settings?.saveSeedHex(hex)
}
/**
@@ -203,8 +251,10 @@ class CallViewModel : ViewModel(), WzpCallback {
viewModelScope.launch(kotlinx.coroutines.Dispatchers.IO) {
try {
val relay = resolveToIp(serverEntry.address)
Log.i(TAG, "startCall: resolved=$relay, calling engine.startCall")
val result = engine?.startCall(relay, room) ?: -1
val seed = _seedHex.value
val name = _alias.value
Log.i(TAG, "startCall: resolved=$relay, alias=$name, calling engine.startCall")
val result = engine?.startCall(relay, room, seedHex = seed, alias = name) ?: -1
Log.i(TAG, "startCall: engine returned $result")
// Only wire up notification callback after engine is running
CallService.onStopFromNotification = { stopCall() }

View File

@@ -54,7 +54,8 @@ import kotlin.math.roundToInt
@Composable
fun InCallScreen(
viewModel: CallViewModel,
onHangUp: () -> Unit
onHangUp: () -> Unit,
onOpenSettings: () -> Unit = {}
) {
val callState by viewModel.callState.collectAsState()
val isMuted by viewModel.isMuted.collectAsState()
@@ -82,7 +83,16 @@ fun InCallScreen(
.verticalScroll(rememberScrollState()),
horizontalAlignment = Alignment.CenterHorizontally
) {
Spacer(modifier = Modifier.height(48.dp))
// Settings button (top-right)
if (callState == 0) {
Row(modifier = Modifier.fillMaxWidth(), horizontalArrangement = Arrangement.End) {
TextButton(onClick = onOpenSettings) {
Text("Settings")
}
}
}
Spacer(modifier = Modifier.height(if (callState == 0) 16.dp else 48.dp))
Text(
text = "WZ Phone",

View File

@@ -0,0 +1,437 @@
package com.wzp.ui.settings
import android.content.ClipData
import android.content.ClipboardManager
import android.content.Context
import android.widget.Toast
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.ExperimentalLayoutApi
import androidx.compose.foundation.layout.FlowRow
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.width
import androidx.compose.foundation.rememberScrollState
import androidx.compose.foundation.shape.RoundedCornerShape
import androidx.compose.foundation.verticalScroll
import androidx.compose.material3.AlertDialog
import androidx.compose.material3.Button
import androidx.compose.material3.ButtonDefaults
import androidx.compose.material3.FilledTonalButton
import androidx.compose.material3.FilledTonalIconButton
import androidx.compose.material3.Divider
import androidx.compose.material3.IconButtonDefaults
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.OutlinedButton
import androidx.compose.material3.OutlinedTextField
import androidx.compose.material3.Slider
import androidx.compose.material3.Surface
import androidx.compose.material3.Switch
import androidx.compose.material3.Text
import androidx.compose.material3.TextButton
import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.text.font.FontFamily
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.unit.dp
import com.wzp.ui.call.CallViewModel
@OptIn(ExperimentalLayoutApi::class)
@Composable
fun SettingsScreen(
viewModel: CallViewModel,
onBack: () -> Unit
) {
val context = LocalContext.current
val servers by viewModel.servers.collectAsState()
val selectedServer by viewModel.selectedServer.collectAsState()
val roomName by viewModel.roomName.collectAsState()
val preferIPv6 by viewModel.preferIPv6.collectAsState()
val playoutGainDb by viewModel.playoutGainDb.collectAsState()
val captureGainDb by viewModel.captureGainDb.collectAsState()
val alias by viewModel.alias.collectAsState()
val seedHex by viewModel.seedHex.collectAsState()
var showAddServerDialog by remember { mutableStateOf(false) }
var showRestoreKeyDialog by remember { mutableStateOf(false) }
Surface(
modifier = Modifier.fillMaxSize(),
color = MaterialTheme.colorScheme.background
) {
Column(
modifier = Modifier
.fillMaxSize()
.padding(24.dp)
.verticalScroll(rememberScrollState())
) {
// Header
Row(
modifier = Modifier.fillMaxWidth(),
verticalAlignment = Alignment.CenterVertically
) {
TextButton(onClick = onBack) {
Text("< Back")
}
Spacer(modifier = Modifier.weight(1f))
Text(
text = "Settings",
style = MaterialTheme.typography.headlineSmall.copy(
fontWeight = FontWeight.Bold
),
color = MaterialTheme.colorScheme.primary
)
Spacer(modifier = Modifier.weight(1f))
// Balance the back button
Spacer(modifier = Modifier.width(64.dp))
}
Spacer(modifier = Modifier.height(24.dp))
// --- Identity ---
SectionHeader("Identity")
OutlinedTextField(
value = alias,
onValueChange = { viewModel.setAlias(it) },
label = { Text("Display Name") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(16.dp))
// Fingerprint display
val fingerprint = if (seedHex.length >= 16) seedHex.take(16).uppercase() else "Not generated"
Text(
text = "Fingerprint",
style = MaterialTheme.typography.labelSmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Text(
text = fingerprint.chunked(4).joinToString(" "),
style = MaterialTheme.typography.bodyMedium.copy(
fontFamily = FontFamily.Monospace
),
color = MaterialTheme.colorScheme.onSurface
)
Spacer(modifier = Modifier.height(12.dp))
// Key backup/restore
Row(horizontalArrangement = Arrangement.spacedBy(8.dp)) {
FilledTonalButton(onClick = {
val clipboard = context.getSystemService(Context.CLIPBOARD_SERVICE) as ClipboardManager
clipboard.setPrimaryClip(ClipData.newPlainText("WZP Key", seedHex))
Toast.makeText(context, "Key copied to clipboard", Toast.LENGTH_SHORT).show()
}) {
Text("Copy Key")
}
OutlinedButton(onClick = { showRestoreKeyDialog = true }) {
Text("Restore Key")
}
}
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Audio ---
SectionHeader("Audio Defaults")
GainSlider(
label = "Voice Volume",
gainDb = playoutGainDb,
onGainChange = { viewModel.setPlayoutGainDb(it) }
)
Spacer(modifier = Modifier.height(4.dp))
GainSlider(
label = "Mic Gain",
gainDb = captureGainDb,
onGainChange = { viewModel.setCaptureGainDb(it) }
)
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Servers ---
SectionHeader("Servers")
FlowRow(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.Start,
verticalArrangement = Arrangement.spacedBy(4.dp)
) {
servers.forEachIndexed { idx, entry ->
val isSelected = selectedServer == idx
Row(verticalAlignment = Alignment.CenterVertically) {
FilledTonalIconButton(
onClick = { viewModel.selectServer(idx) },
modifier = Modifier
.padding(end = 2.dp)
.height(36.dp)
.width(140.dp),
shape = RoundedCornerShape(8.dp),
colors = if (isSelected) {
IconButtonDefaults.filledTonalIconButtonColors(
containerColor = MaterialTheme.colorScheme.primaryContainer,
contentColor = MaterialTheme.colorScheme.onPrimaryContainer
)
} else {
IconButtonDefaults.filledTonalIconButtonColors()
}
) {
Text(
text = entry.label,
style = MaterialTheme.typography.labelSmall,
maxLines = 1
)
}
// Show remove button for non-default servers
if (idx >= 2) {
TextButton(
onClick = { viewModel.removeServer(idx) },
modifier = Modifier.height(36.dp)
) {
Text("X", color = MaterialTheme.colorScheme.error)
}
}
}
}
}
Spacer(modifier = Modifier.height(8.dp))
OutlinedButton(
onClick = { showAddServerDialog = true },
shape = RoundedCornerShape(8.dp)
) {
Text("+ Add Server")
}
// Show selected server address
Spacer(modifier = Modifier.height(8.dp))
Text(
text = "Default: ${servers.getOrNull(selectedServer)?.address ?: "none"}",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Network ---
SectionHeader("Network")
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth()
) {
Text(
text = "Prefer IPv6",
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.weight(1f)
)
Switch(
checked = preferIPv6,
onCheckedChange = { viewModel.setPreferIPv6(it) }
)
}
Spacer(modifier = Modifier.height(24.dp))
Divider()
Spacer(modifier = Modifier.height(16.dp))
// --- Room ---
SectionHeader("Room")
OutlinedTextField(
value = roomName,
onValueChange = { viewModel.setRoomName(it) },
label = { Text("Default Room") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(32.dp))
}
}
if (showAddServerDialog) {
AddServerDialog(
onDismiss = { showAddServerDialog = false },
onAdd = { host, port, label ->
viewModel.addServer("$host:$port", label)
showAddServerDialog = false
}
)
}
if (showRestoreKeyDialog) {
RestoreKeyDialog(
onDismiss = { showRestoreKeyDialog = false },
onRestore = { hex ->
viewModel.restoreSeed(hex)
showRestoreKeyDialog = false
Toast.makeText(context, "Key restored", Toast.LENGTH_SHORT).show()
}
)
}
}
@Composable
private fun SectionHeader(title: String) {
Text(
text = title,
style = MaterialTheme.typography.titleMedium.copy(fontWeight = FontWeight.Bold),
color = MaterialTheme.colorScheme.primary
)
Spacer(modifier = Modifier.height(8.dp))
}
@Composable
private fun GainSlider(label: String, gainDb: Float, onGainChange: (Float) -> Unit) {
Column(
modifier = Modifier.fillMaxWidth(),
horizontalAlignment = Alignment.CenterHorizontally
) {
val sign = if (gainDb >= 0) "+" else ""
Text(
text = "$label: ${sign}${"%.0f".format(gainDb)} dB",
style = MaterialTheme.typography.labelSmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Slider(
value = gainDb,
onValueChange = { onGainChange(Math.round(it).toFloat()) },
valueRange = -20f..20f,
steps = 0,
modifier = Modifier.fillMaxWidth()
)
}
}
@Composable
private fun AddServerDialog(
onDismiss: () -> Unit,
onAdd: (host: String, port: String, label: String) -> Unit
) {
var host by remember { mutableStateOf("") }
var port by remember { mutableStateOf("4433") }
var label by remember { mutableStateOf("") }
AlertDialog(
onDismissRequest = onDismiss,
title = { Text("Add Server") },
text = {
Column {
OutlinedTextField(
value = host,
onValueChange = { host = it },
label = { Text("Host (IP or domain)") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(8.dp))
OutlinedTextField(
value = port,
onValueChange = { port = it },
label = { Text("Port") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
Spacer(modifier = Modifier.height(8.dp))
OutlinedTextField(
value = label,
onValueChange = { label = it },
label = { Text("Label (optional)") },
singleLine = true,
modifier = Modifier.fillMaxWidth()
)
}
},
confirmButton = {
TextButton(
onClick = {
if (host.isNotBlank()) {
val displayLabel = label.ifBlank { host }
onAdd(host.trim(), port.trim(), displayLabel)
}
}
) { Text("Add") }
},
dismissButton = {
TextButton(onClick = onDismiss) { Text("Cancel") }
}
)
}
@Composable
private fun RestoreKeyDialog(
onDismiss: () -> Unit,
onRestore: (hex: String) -> Unit
) {
var keyInput by remember { mutableStateOf("") }
var error by remember { mutableStateOf<String?>(null) }
AlertDialog(
onDismissRequest = onDismiss,
title = { Text("Restore Identity Key") },
text = {
Column {
Text(
text = "Paste your 64-character hex key below. This will replace your current identity.",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant
)
Spacer(modifier = Modifier.height(8.dp))
OutlinedTextField(
value = keyInput,
onValueChange = {
keyInput = it.trim().lowercase()
error = null
},
label = { Text("Identity Key (hex)") },
singleLine = true,
modifier = Modifier.fillMaxWidth(),
isError = error != null
)
error?.let {
Text(
text = it,
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.error
)
}
}
},
confirmButton = {
TextButton(
onClick = {
val cleaned = keyInput.replace("\\s".toRegex(), "")
if (cleaned.length != 64 || !cleaned.all { it in '0'..'9' || it in 'a'..'f' }) {
error = "Key must be exactly 64 hex characters"
} else {
onRestore(cleaned)
}
}
) { Text("Restore") }
},
dismissButton = {
TextButton(onClick = onDismiss) { Text("Cancel") }
}
)
}

View File

@@ -39,6 +39,7 @@ pub struct CallStartConfig {
pub room: String,
pub auth_token: Vec<u8>,
pub identity_seed: [u8; 32],
pub alias: Option<String>,
}
impl Default for CallStartConfig {
@@ -49,6 +50,7 @@ impl Default for CallStartConfig {
room: String::new(),
auth_token: Vec::new(),
identity_seed: [0u8; 32],
alias: None,
}
}
}
@@ -117,6 +119,7 @@ impl WzpEngine {
let room = config.room.clone();
let identity_seed = config.identity_seed;
let profile = config.profile;
let alias = config.alias.clone();
let state = self.state.clone();
self.state.running.store(true, Ordering::Release);
@@ -124,7 +127,7 @@ impl WzpEngine {
let state_clone = state.clone();
runtime.block_on(async move {
if let Err(e) = run_call(relay_addr, &room, &identity_seed, profile, state_clone).await
if let Err(e) = run_call(relay_addr, &room, &identity_seed, profile, alias.as_deref(), state_clone).await
{
error!("call failed: {e}");
}
@@ -204,6 +207,7 @@ async fn run_call(
room: &str,
identity_seed: &[u8; 32],
profile: QualityProfile,
alias: Option<&str>,
state: Arc<EngineState>,
) -> Result<(), anyhow::Error> {
let _ = rustls::crypto::ring::default_provider().install_default();
@@ -238,6 +242,7 @@ async fn run_call(
QualityProfile::DEGRADED,
QualityProfile::CATASTROPHIC,
],
alias: alias.map(|s| s.to_string()),
};
transport.send_signal(&offer).await?;
info!("CallOffer sent, waiting for CallAnswer...");

View File

@@ -54,12 +54,14 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartCall(
room_j: JString,
seed_hex_j: JString,
token_j: JString,
alias_j: JString,
) -> jint {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let relay_addr: String = env.get_string(&relay_addr_j).map(|s| s.into()).unwrap_or_default();
let room: String = env.get_string(&room_j).map(|s| s.into()).unwrap_or_default();
let seed_hex: String = env.get_string(&seed_hex_j).map(|s| s.into()).unwrap_or_default();
let token: String = env.get_string(&token_j).map(|s| s.into()).unwrap_or_default();
let alias: String = env.get_string(&alias_j).map(|s| s.into()).unwrap_or_default();
let h = unsafe { handle_ref(handle) };
@@ -83,6 +85,7 @@ pub unsafe extern "system" fn Java_com_wzp_engine_WzpEngine_nativeStartCall(
room,
auth_token: if token.is_empty() { Vec::new() } else { token.into_bytes() },
identity_seed,
alias: if alias.is_empty() { None } else { Some(alias) },
};
match h.engine.start_call(config) {

View File

@@ -3,12 +3,10 @@
//! Both structs use 48 kHz, mono, i16 format to match the WarzonePhone codec
//! pipeline. Frames are 960 samples (20 ms at 48 kHz).
//!
//! The cpal `Stream` type is not `Send`, so each struct spawns a dedicated OS
//! thread that owns the stream. The public API exposes only `Send + Sync`
//! channel handles.
//! Audio callbacks are **lock-free**: they read/write directly to an `AudioRing`
//! (atomic SPSC ring buffer). No Mutex, no channel, no allocation on the hot path.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use anyhow::{anyhow, Context};
@@ -16,6 +14,8 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{SampleFormat, SampleRate, StreamConfig};
use tracing::{info, warn};
use crate::audio_ring::AudioRing;
/// Number of samples per 20 ms frame at 48 kHz mono.
pub const FRAME_SAMPLES: usize = 960;
@@ -23,22 +23,24 @@ pub const FRAME_SAMPLES: usize = 960;
// AudioCapture
// ---------------------------------------------------------------------------
/// Captures microphone input and yields 960-sample PCM frames.
/// Captures microphone input via CPAL and writes PCM into a lock-free ring buffer.
///
/// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`.
pub struct AudioCapture {
rx: mpsc::Receiver<Vec<i16>>,
ring: Arc<AudioRing>,
running: Arc<AtomicBool>,
}
impl AudioCapture {
/// Create and start capturing from the default input device at 48 kHz mono.
pub fn start() -> Result<Self, anyhow::Error> {
let (tx, rx) = mpsc::sync_channel::<Vec<i16>>(64);
let ring = Arc::new(AudioRing::new());
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let (init_tx, init_rx) = mpsc::sync_channel::<Result<(), String>>(1);
let (init_tx, init_rx) = std::sync::mpsc::sync_channel::<Result<(), String>>(1);
let ring_cb = ring.clone();
let running_clone = running.clone();
std::thread::Builder::new()
.name("wzp-audio-capture".into())
@@ -54,21 +56,17 @@ impl AudioCapture {
let config = StreamConfig {
channels: 1,
sample_rate: SampleRate(48_000),
buffer_size: cpal::BufferSize::Default,
buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32),
};
let use_f32 = !supports_i16_input(&device)?;
let buf = Arc::new(std::sync::Mutex::new(
Vec::<i16>::with_capacity(FRAME_SAMPLES),
));
let err_cb = |e: cpal::StreamError| {
warn!("input stream error: {e}");
};
let stream = if use_f32 {
let buf = buf.clone();
let tx = tx.clone();
let ring = ring_cb.clone();
let running = running_clone.clone();
device.build_input_stream(
&config,
@@ -76,21 +74,22 @@ impl AudioCapture {
if !running.load(Ordering::Relaxed) {
return;
}
let mut lock = buf.lock().unwrap();
for &s in data {
lock.push(f32_to_i16(s));
if lock.len() == FRAME_SAMPLES {
let frame = lock.drain(..).collect();
let _ = tx.try_send(frame);
// Batch convert f32 → i16, then write entire slice to ring.
// Stack alloc for typical callback sizes (≤ 960 samples).
let mut tmp = [0i16; FRAME_SAMPLES];
for chunk in data.chunks(FRAME_SAMPLES) {
let n = chunk.len();
for i in 0..n {
tmp[i] = f32_to_i16(chunk[i]);
}
ring.write(&tmp[..n]);
}
},
err_cb,
None,
)?
} else {
let buf = buf.clone();
let tx = tx.clone();
let ring = ring_cb.clone();
let running = running_clone.clone();
device.build_input_stream(
&config,
@@ -98,14 +97,7 @@ impl AudioCapture {
if !running.load(Ordering::Relaxed) {
return;
}
let mut lock = buf.lock().unwrap();
for &s in data {
lock.push(s);
if lock.len() == FRAME_SAMPLES {
let frame = lock.drain(..).collect();
let _ = tx.try_send(frame);
}
}
ring.write(data);
},
err_cb,
None,
@@ -114,7 +106,6 @@ impl AudioCapture {
stream.play().context("failed to start input stream")?;
// Signal success to the caller before parking.
let _ = init_tx.send(Ok(()));
// Keep stream alive until stopped.
@@ -135,15 +126,12 @@ impl AudioCapture {
.map_err(|_| anyhow!("capture thread exited before signaling"))?
.map_err(|e| anyhow!("{e}"))?;
Ok(Self { rx, running })
Ok(Self { ring, running })
}
/// Read the next frame of 960 PCM samples (blocking until available).
///
/// Returns `None` when the stream has been stopped or the channel is
/// disconnected.
pub fn read_frame(&self) -> Option<Vec<i16>> {
self.rx.recv().ok()
/// Get a reference to the capture ring buffer for direct polling.
pub fn ring(&self) -> &Arc<AudioRing> {
&self.ring
}
/// Stop capturing.
@@ -152,26 +140,34 @@ impl AudioCapture {
}
}
impl Drop for AudioCapture {
fn drop(&mut self) {
self.stop();
}
}
// ---------------------------------------------------------------------------
// AudioPlayback
// ---------------------------------------------------------------------------
/// Plays PCM frames through the default output device at 48 kHz mono.
/// Plays PCM through the default output device, reading from a lock-free ring buffer.
///
/// The cpal stream lives on a dedicated OS thread; this handle is `Send + Sync`.
pub struct AudioPlayback {
tx: mpsc::SyncSender<Vec<i16>>,
ring: Arc<AudioRing>,
running: Arc<AtomicBool>,
}
impl AudioPlayback {
/// Create and start playback on the default output device at 48 kHz mono.
pub fn start() -> Result<Self, anyhow::Error> {
let (tx, rx) = mpsc::sync_channel::<Vec<i16>>(64);
let ring = Arc::new(AudioRing::new());
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let (init_tx, init_rx) = mpsc::sync_channel::<Result<(), String>>(1);
let (init_tx, init_rx) = std::sync::mpsc::sync_channel::<Result<(), String>>(1);
let ring_cb = ring.clone();
let running_clone = running.clone();
std::thread::Builder::new()
.name("wzp-audio-playback".into())
@@ -187,67 +183,45 @@ impl AudioPlayback {
let config = StreamConfig {
channels: 1,
sample_rate: SampleRate(48_000),
buffer_size: cpal::BufferSize::Default,
buffer_size: cpal::BufferSize::Fixed(FRAME_SAMPLES as u32),
};
let use_f32 = !supports_i16_output(&device)?;
// Shared ring of samples the cpal callback drains from.
let ring = Arc::new(std::sync::Mutex::new(
std::collections::VecDeque::<i16>::with_capacity(FRAME_SAMPLES * 8),
));
// Background drainer: moves frames from the mpsc channel into the ring.
{
let ring = ring.clone();
let running = running_clone.clone();
std::thread::Builder::new()
.name("wzp-playback-drain".into())
.spawn(move || {
while running.load(Ordering::Relaxed) {
match rx.recv_timeout(std::time::Duration::from_millis(100)) {
Ok(frame) => {
let mut lock = ring.lock().unwrap();
lock.extend(frame);
while lock.len() > FRAME_SAMPLES * 16 {
lock.pop_front();
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
})?;
}
let err_cb = |e: cpal::StreamError| {
warn!("output stream error: {e}");
};
let stream = if use_f32 {
let ring = ring.clone();
let ring = ring_cb.clone();
device.build_output_stream(
&config,
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let mut lock = ring.lock().unwrap();
for sample in data.iter_mut() {
*sample = match lock.pop_front() {
Some(s) => i16_to_f32(s),
None => 0.0,
};
let mut tmp = [0i16; FRAME_SAMPLES];
for chunk in data.chunks_mut(FRAME_SAMPLES) {
let n = chunk.len();
let read = ring.read(&mut tmp[..n]);
for i in 0..read {
chunk[i] = i16_to_f32(tmp[i]);
}
// Fill remainder with silence if ring underran
for i in read..n {
chunk[i] = 0.0;
}
}
},
err_cb,
None,
)?
} else {
let ring = ring.clone();
let ring = ring_cb.clone();
device.build_output_stream(
&config,
move |data: &mut [i16], _: &cpal::OutputCallbackInfo| {
let mut lock = ring.lock().unwrap();
for sample in data.iter_mut() {
*sample = lock.pop_front().unwrap_or(0);
let read = ring.read(data);
// Fill remainder with silence if ring underran
for sample in &mut data[read..] {
*sample = 0;
}
},
err_cb,
@@ -257,7 +231,6 @@ impl AudioPlayback {
stream.play().context("failed to start output stream")?;
// Signal success to the caller before parking.
let _ = init_tx.send(Ok(()));
// Keep stream alive until stopped.
@@ -278,12 +251,12 @@ impl AudioPlayback {
.map_err(|_| anyhow!("playback thread exited before signaling"))?
.map_err(|e| anyhow!("{e}"))?;
Ok(Self { tx, running })
Ok(Self { ring, running })
}
/// Write a frame of PCM samples for playback.
pub fn write_frame(&self, pcm: &[i16]) {
let _ = self.tx.try_send(pcm.to_vec());
/// Get a reference to the playout ring buffer for direct writing.
pub fn ring(&self) -> &Arc<AudioRing> {
&self.ring
}
/// Stop playback.
@@ -292,11 +265,16 @@ impl AudioPlayback {
}
}
impl Drop for AudioPlayback {
fn drop(&mut self) {
self.stop();
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// Check if the input device supports i16 at 48 kHz mono.
fn supports_i16_input(device: &cpal::Device) -> Result<bool, anyhow::Error> {
let supported = device
.supported_input_configs()
@@ -313,7 +291,6 @@ fn supports_i16_input(device: &cpal::Device) -> Result<bool, anyhow::Error> {
Ok(false)
}
/// Check if the output device supports i16 at 48 kHz mono.
fn supports_i16_output(device: &cpal::Device) -> Result<bool, anyhow::Error> {
let supported = device
.supported_output_configs()

View File

@@ -0,0 +1,89 @@
//! Lock-free SPSC ring buffer for audio PCM transfer between
//! CPAL audio callbacks and the Rust engine.
//!
//! Identical design to wzp-android's audio_ring: the producer writes and
//! advances a write cursor, the consumer reads and advances a read cursor.
//! Both cursors are atomic — no mutex, no blocking on the audio thread.
use std::sync::atomic::{AtomicUsize, Ordering};
/// Ring buffer capacity in i16 samples.
/// 960 samples * 10 frames = ~200ms of audio at 48kHz mono.
const RING_CAPACITY: usize = 960 * 10;
/// Lock-free single-producer single-consumer ring buffer for i16 PCM samples.
pub struct AudioRing {
buf: Box<[i16; RING_CAPACITY]>,
write_pos: AtomicUsize,
read_pos: AtomicUsize,
}
// SAFETY: AudioRing is designed for SPSC — one thread writes, one reads.
// The atomics ensure visibility. The buffer itself is never accessed
// from the same index by both threads simultaneously because the
// producer only writes to positions between write_pos and read_pos,
// and the consumer only reads from positions between read_pos and write_pos.
unsafe impl Send for AudioRing {}
unsafe impl Sync for AudioRing {}
impl AudioRing {
pub fn new() -> Self {
Self {
buf: Box::new([0i16; RING_CAPACITY]),
write_pos: AtomicUsize::new(0),
read_pos: AtomicUsize::new(0),
}
}
/// Number of samples available to read.
pub fn available(&self) -> usize {
let w = self.write_pos.load(Ordering::Acquire);
let r = self.read_pos.load(Ordering::Acquire);
w.wrapping_sub(r)
}
/// Write samples into the ring. Returns number of samples written.
/// Drops oldest samples if the ring is full.
pub fn write(&self, samples: &[i16]) -> usize {
let w = self.write_pos.load(Ordering::Relaxed);
let count = samples.len().min(RING_CAPACITY);
for i in 0..count {
let idx = (w + i) % RING_CAPACITY;
unsafe {
let ptr = self.buf.as_ptr() as *mut i16;
*ptr.add(idx) = samples[i];
}
}
self.write_pos
.store(w.wrapping_add(count), Ordering::Release);
// If we overwrote unread data, advance read_pos
if self.available() > RING_CAPACITY {
let new_read = self
.write_pos
.load(Ordering::Relaxed)
.wrapping_sub(RING_CAPACITY);
self.read_pos.store(new_read, Ordering::Release);
}
count
}
/// Read samples from the ring into `out`. Returns number of samples read.
pub fn read(&self, out: &mut [i16]) -> usize {
let avail = self.available();
let count = out.len().min(avail);
let r = self.read_pos.load(Ordering::Relaxed);
for i in 0..count {
let idx = (r + i) % RING_CAPACITY;
out[i] = unsafe { *self.buf.as_ptr().add(idx) };
}
self.read_pos
.store(r.wrapping_add(count), Ordering::Release);
count
}
}

View File

@@ -45,12 +45,22 @@ struct CliArgs {
seed_hex: Option<String>,
mnemonic: Option<String>,
room: Option<String>,
raw_room: bool,
alias: Option<String>,
token: Option<String>,
_metrics_file: Option<String>,
}
/// Default identity file path: ~/.wzp/identity
fn default_identity_path() -> std::path::PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
std::path::PathBuf::from(home).join(".wzp").join("identity")
}
impl CliArgs {
/// Resolve the identity seed from --seed, --mnemonic, or generate a new one.
/// Resolve the identity seed from --seed, --mnemonic, or persistent file.
///
/// Priority: --seed > --mnemonic > ~/.wzp/identity > generate + save.
pub fn resolve_seed(&self) -> wzp_crypto::Seed {
if let Some(ref hex_str) = self.seed_hex {
let seed = wzp_crypto::Seed::from_hex(hex_str).expect("invalid --seed hex");
@@ -65,10 +75,30 @@ impl CliArgs {
info!(fingerprint = %fp, "identity from --mnemonic");
seed
} else {
let path = default_identity_path();
// Try loading existing identity
if path.exists() {
if let Ok(hex_str) = std::fs::read_to_string(&path) {
let hex_str = hex_str.trim();
if let Ok(seed) = wzp_crypto::Seed::from_hex(hex_str) {
let id = seed.derive_identity();
let fp = id.public_identity().fingerprint;
info!(fingerprint = %fp, path = %path.display(), "loaded persistent identity");
return seed;
}
}
}
// Generate new and save
let seed = wzp_crypto::Seed::generate();
let id = seed.derive_identity();
let fp = id.public_identity().fingerprint;
info!(fingerprint = %fp, "generated ephemeral identity");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
// Encode seed as hex manually (avoid dep on `hex` crate in binary)
let hex_str: String = seed.0.iter().map(|b| format!("{b:02x}")).collect();
std::fs::write(&path, hex_str).ok();
info!(fingerprint = %fp, path = %path.display(), "generated and saved new identity");
seed
}
}
@@ -86,6 +116,8 @@ fn parse_args() -> CliArgs {
let mut seed_hex = None;
let mut mnemonic = None;
let mut room = None;
let mut raw_room = false;
let mut alias = None;
let mut token = None;
let mut metrics_file = None;
let mut relay_str = None;
@@ -130,6 +162,11 @@ fn parse_args() -> CliArgs {
i += 1;
room = Some(args.get(i).expect("--room requires a name").to_string());
}
"--raw-room" => raw_room = true,
"--alias" => {
i += 1;
alias = Some(args.get(i).expect("--alias requires a name").to_string());
}
"--token" => {
i += 1;
token = Some(args.get(i).expect("--token requires a value").to_string());
@@ -183,10 +220,13 @@ fn parse_args() -> CliArgs {
eprintln!(" --seed <hex> Identity seed (64 hex chars, featherChat compatible)");
eprintln!(" --mnemonic <words...> Identity seed as BIP39 mnemonic (24 words)");
eprintln!(" --room <name> Room name (hashed for privacy before sending)");
eprintln!(" --raw-room Send room name as-is (no hash, for Android compat)");
eprintln!(" --alias <name> Display name shown to other participants");
eprintln!(" --token <token> featherChat bearer token for relay auth");
eprintln!(" --metrics-file <path> Write JSONL telemetry to file (1 line/sec)");
eprintln!(" (48kHz mono s16le, play with ffplay -f s16le -ar 48000 -ch_layout mono file.raw)");
eprintln!();
eprintln!("Identity is auto-saved to ~/.wzp/identity on first run.");
eprintln!("Default relay: 127.0.0.1:4433");
std::process::exit(0);
}
@@ -219,6 +259,8 @@ fn parse_args() -> CliArgs {
seed_hex,
mnemonic,
room,
raw_room,
alias,
token,
_metrics_file: metrics_file,
}
@@ -250,8 +292,14 @@ async fn main() -> anyhow::Result<()> {
"WarzonePhone client"
);
// Hash room name for SNI privacy (or "default" if none specified)
// Compute SNI from room name.
// --raw-room sends the name as-is (for Android compat — Android doesn't hash).
// Default behaviour hashes for privacy.
let sni = match &cli.room {
Some(name) if cli.raw_room => {
info!(room = %name, "using raw room name as SNI (no hash)");
name.clone()
}
Some(name) => {
let hashed = wzp_crypto::hash_room_name(name);
info!(room = %name, hashed = %hashed, "room name hashed for SNI");
@@ -287,6 +335,7 @@ async fn main() -> anyhow::Result<()> {
let _crypto_session = wzp_client::handshake::perform_handshake(
&*transport,
&seed.0,
cli.alias.as_deref(),
).await?;
info!("crypto handshake complete");
@@ -548,78 +597,225 @@ async fn run_file_mode(
}
/// Live mode: capture from mic, encode, send; receive, decode, play.
///
/// Architecture (mirrors wzp-android/engine.rs):
/// CPAL capture callback → AudioRing → send task (5ms poll) → QUIC
/// QUIC → recv task → jitter buffer → decode tick (20ms) → AudioRing → CPAL playback callback
///
/// All lock-free: CPAL callbacks use atomic ring buffers, no Mutex on the audio path.
#[cfg(feature = "audio")]
async fn run_live(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::Result<()> {
async fn run_live(
transport: Arc<wzp_transport::QuinnTransport>,
) -> anyhow::Result<()> {
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool, Ordering};
use wzp_client::audio_io::{AudioCapture, AudioPlayback};
use wzp_client::call::JitterTelemetry;
let capture = AudioCapture::start()?;
let playback = AudioPlayback::start()?;
info!("Audio I/O started — press Ctrl+C to stop");
info!("audio I/O started (lock-free ring buffers) — press Ctrl+C to stop");
let capture_ring = capture.ring().clone();
let playout_ring = playback.ring().clone();
let running = StdArc::new(AtomicBool::new(true));
// --- Signal handler: set running=false on first Ctrl+C, force-quit on second ---
let signal_running = running.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
eprintln!(); // newline after ^C
info!("Ctrl+C received, shutting down...");
signal_running.store(false, Ordering::SeqCst);
tokio::signal::ctrl_c().await.ok();
eprintln!("\nForce quit");
std::process::exit(1);
});
let config = CallConfig::default();
// --- Send task: poll capture ring → encode → send via async ---
let send_transport = transport.clone();
let rt_handle = tokio::runtime::Handle::current();
let send_handle = std::thread::Builder::new()
.name("wzp-send-loop".into())
.spawn(move || {
let config = CallConfig::default();
let mut encoder = CallEncoder::new(&config);
loop {
let frame = match capture.read_frame() {
Some(f) => f,
None => break,
};
let packets = match encoder.encode_frame(&frame) {
Ok(p) => p,
Err(e) => {
error!("encode error: {e}");
continue;
}
};
for pkt in &packets {
if let Err(e) = rt_handle.block_on(send_transport.send_media(pkt)) {
error!("send error: {e}");
return;
}
let send_running = running.clone();
let send_task = async move {
let mut encoder = CallEncoder::new(&config);
let mut capture_buf = vec![0i16; FRAME_SAMPLES];
let mut frames_sent: u64 = 0;
loop {
if !send_running.load(Ordering::Relaxed) {
break;
}
let avail = capture_ring.available();
if avail < FRAME_SAMPLES {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
continue;
}
let read = capture_ring.read(&mut capture_buf);
if read < FRAME_SAMPLES {
continue;
}
let packets = match encoder.encode_frame(&capture_buf) {
Ok(p) => p,
Err(e) => {
error!("encode error: {e}");
continue;
}
};
for pkt in &packets {
if let Err(e) = send_transport.send_media(pkt).await {
error!("send error: {e}");
return;
}
}
})?;
frames_sent += 1;
if frames_sent == 1 || frames_sent % 500 == 0 {
info!(frames_sent, "send progress");
}
}
};
// --- Recv task: receive packets → ingest into jitter buffer ---
// Uses timeout so it can check the running flag and exit on Ctrl+C.
let recv_transport = transport.clone();
let recv_handle = tokio::spawn(async move {
let config = CallConfig::default();
let mut decoder = CallDecoder::new(&config);
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
let recv_running = running.clone();
let config = CallConfig::default();
let decoder = StdArc::new(tokio::sync::Mutex::new(CallDecoder::new(&config)));
let decoder_recv = decoder.clone();
let recv_task = async move {
let mut packets_received: u64 = 0;
loop {
match recv_transport.recv_media().await {
Ok(Some(pkt)) => {
let is_repair = pkt.header.is_repair;
decoder.ingest(pkt);
// Only decode for source packets (1 source = 1 audio frame).
// Repair packets feed the FEC decoder but don't produce audio.
if !is_repair {
if let Some(_n) = decoder.decode_next(&mut pcm_buf) {
playback.write_frame(&pcm_buf);
}
if !recv_running.load(Ordering::Relaxed) {
break;
}
// Timeout so we can check running flag periodically
let result = tokio::time::timeout(
std::time::Duration::from_millis(100),
recv_transport.recv_media(),
)
.await;
match result {
Ok(Ok(Some(pkt))) => {
let mut dec = decoder_recv.lock().await;
dec.ingest(pkt);
packets_received += 1;
if packets_received == 1 || packets_received % 500 == 0 {
info!(packets_received, depth = dec.stats().current_depth, "recv progress");
}
}
Ok(None) => {
Ok(Ok(None)) => {
info!("connection closed");
break;
}
Err(e) => {
Ok(Err(e)) => {
error!("recv error: {e}");
break;
}
Err(_) => {} // timeout — loop and check running flag
}
}
});
};
tokio::signal::ctrl_c().await?;
info!("Shutting down...");
// --- Playout tick: decode from jitter buffer at steady 20ms intervals ---
let playout_running = running.clone();
let decoder_playout = decoder.clone();
let playout_task = async move {
let mut pcm_buf = vec![0i16; FRAME_SAMPLES];
let mut interval = tokio::time::interval(std::time::Duration::from_millis(20));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut telemetry = JitterTelemetry::new(5);
loop {
interval.tick().await;
if !playout_running.load(Ordering::Relaxed) {
break;
}
recv_handle.abort();
drop(send_handle);
transport.close().await?;
info!("done");
let mut dec = decoder_playout.lock().await;
// Drain ready frames from jitter buffer into playout ring.
let mut decoded_this_tick = 0;
while let Some(n) = dec.decode_next(&mut pcm_buf) {
playout_ring.write(&pcm_buf[..n]);
decoded_this_tick += 1;
if decoded_this_tick >= 2 {
break; // Don't drain too aggressively in one tick
}
}
telemetry.maybe_log(dec.stats());
}
};
// --- Signal task: listen for RoomUpdate and display presence ---
let signal_transport = transport.clone();
let signal_running = running.clone();
let signal_task = async move {
loop {
if !signal_running.load(Ordering::Relaxed) {
break;
}
let result = tokio::time::timeout(
std::time::Duration::from_millis(200),
signal_transport.recv_signal(),
)
.await;
match result {
Ok(Ok(Some(wzp_proto::SignalMessage::RoomUpdate { count, participants }))) => {
info!(count, "room update");
for p in &participants {
let name = p
.alias
.as_deref()
.unwrap_or("(no alias)");
let fp = if p.fingerprint.is_empty() {
"(no fingerprint)"
} else {
&p.fingerprint
};
info!(" participant: {name} [{fp}]");
}
}
Ok(Ok(Some(msg))) => {
info!("signal: {:?}", std::mem::discriminant(&msg));
}
Ok(Ok(None)) => {
info!("signal stream closed");
break;
}
Ok(Err(e)) => {
error!("signal recv error: {e}");
break;
}
Err(_) => {} // timeout — loop and check running flag
}
}
};
// --- Run all tasks, exit when any finishes (or running flag cleared by Ctrl+C) ---
tokio::select! {
_ = send_task => info!("send task ended"),
_ = recv_task => info!("recv task ended"),
_ = playout_task => info!("playout task ended"),
_ = signal_task => info!("signal task ended"),
}
running.store(false, Ordering::SeqCst);
capture.stop();
playback.stop();
// Give transport 2s to close gracefully, then bail
match tokio::time::timeout(std::time::Duration::from_secs(2), transport.close()).await {
Ok(Ok(())) => info!("done"),
Ok(Err(e)) => info!("close error (non-fatal): {e}"),
Err(_) => info!("close timed out, exiting anyway"),
}
Ok(())
}

View File

@@ -110,6 +110,7 @@ pub fn signal_to_call_type(signal: &SignalMessage) -> CallSignalType {
SignalMessage::SessionForward { .. } => CallSignalType::Offer, // reuse
SignalMessage::SessionForwardAck { .. } => CallSignalType::Offer, // reuse
SignalMessage::RoomUpdate { .. } => CallSignalType::Offer, // reuse
SignalMessage::SetAlias { .. } => CallSignalType::Offer, // reuse
}
}

View File

@@ -17,6 +17,7 @@ use wzp_proto::{MediaTransport, QualityProfile, SignalMessage};
pub async fn perform_handshake(
transport: &dyn MediaTransport,
seed: &[u8; 32],
alias: Option<&str>,
) -> Result<Box<dyn CryptoSession>, anyhow::Error> {
// 1. Create key exchange from identity seed
let mut kx = WarzoneKeyExchange::from_identity_seed(seed);
@@ -41,6 +42,7 @@ pub async fn perform_handshake(
QualityProfile::DEGRADED,
QualityProfile::CATASTROPHIC,
],
alias: alias.map(|s| s.to_string()),
};
transport.send_signal(&offer).await?;

View File

@@ -8,6 +8,8 @@
#[cfg(feature = "audio")]
pub mod audio_io;
#[cfg(feature = "audio")]
pub mod audio_ring;
pub mod bench;
pub mod call;
pub mod drift_test;

View File

@@ -548,6 +548,9 @@ pub enum SignalMessage {
signature: Vec<u8>,
/// Supported quality profiles.
supported_profiles: Vec<crate::QualityProfile>,
/// Optional display name set by the caller.
#[serde(default)]
alias: Option<String>,
},
/// Call acceptance (analogous to Warzone's WireMessage::CallAnswer).
@@ -653,6 +656,11 @@ pub enum SignalMessage {
/// List of participants currently in the room.
participants: Vec<RoomParticipant>,
},
/// Set or update the client's display name.
/// Sent by client after joining; relay updates the participant entry and
/// re-broadcasts a RoomUpdate to all participants.
SetAlias { alias: String },
}
/// A participant entry in a RoomUpdate message.

View File

@@ -15,25 +15,27 @@ use wzp_proto::{MediaTransport, QualityProfile, SignalMessage};
/// 5. Derive shared ChaCha20-Poly1305 session
/// 6. Send `CallAnswer` back
///
/// Returns the derived `CryptoSession` and the chosen `QualityProfile`.
/// Returns the derived `CryptoSession`, the chosen `QualityProfile`, the caller's fingerprint,
/// and the caller's alias (if provided in CallOffer).
pub async fn accept_handshake(
transport: &dyn MediaTransport,
seed: &[u8; 32],
) -> Result<(Box<dyn CryptoSession>, QualityProfile), anyhow::Error> {
) -> Result<(Box<dyn CryptoSession>, QualityProfile, String, Option<String>), anyhow::Error> {
// 1. Receive CallOffer
let offer = transport
.recv_signal()
.await?
.ok_or_else(|| anyhow::anyhow!("connection closed before receiving CallOffer"))?;
let (caller_identity_pub, caller_ephemeral_pub, caller_signature, supported_profiles) =
let (caller_identity_pub, caller_ephemeral_pub, caller_signature, supported_profiles, caller_alias) =
match offer {
SignalMessage::CallOffer {
identity_pub,
ephemeral_pub,
signature,
supported_profiles,
} => (identity_pub, ephemeral_pub, signature, supported_profiles),
alias,
} => (identity_pub, ephemeral_pub, signature, supported_profiles, alias),
other => {
return Err(anyhow::anyhow!(
"expected CallOffer, got {:?}",
@@ -76,7 +78,13 @@ pub async fn accept_handshake(
};
transport.send_signal(&answer).await?;
Ok((session, chosen_profile))
// Derive caller fingerprint from their identity public key (first 8 bytes as hex)
let caller_fp = caller_identity_pub[..8]
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>();
Ok((session, chosen_profile, caller_fp, caller_alias))
}
/// Select the best quality profile from those the caller supports.

View File

@@ -431,7 +431,7 @@ async fn main() -> anyhow::Result<()> {
// Crypto handshake: verify client identity + negotiate quality profile
let handshake_start = std::time::Instant::now();
let (_crypto_session, _chosen_profile) = match wzp_relay::handshake::accept_handshake(
let (_crypto_session, _chosen_profile, caller_fp, caller_alias) = match wzp_relay::handshake::accept_handshake(
&*transport,
&relay_seed_bytes,
).await {
@@ -448,10 +448,13 @@ async fn main() -> anyhow::Result<()> {
}
};
// Use the caller's identity fingerprint from the handshake
let participant_fp = authenticated_fp.clone().unwrap_or(caller_fp);
// Register in presence registry
if let Some(ref fp) = authenticated_fp {
{
let mut reg = presence.lock().await;
reg.register_local(fp, None, Some(room_name.clone()));
reg.register_local(&participant_fp, None, Some(room_name.clone()));
}
info!(%addr, room = %room_name, "client joining");
@@ -506,8 +509,8 @@ async fn main() -> anyhow::Result<()> {
&room_name,
addr,
room::ParticipantSender::Quic(transport.clone()),
authenticated_fp.as_deref(),
None, // alias — TODO: accept from client
Some(&participant_fp),
caller_alias.as_deref(),
) {
Ok((id, update, senders)) => {
metrics.active_rooms.set(mgr.list().len() as i64);

View File

@@ -141,6 +141,17 @@ impl Room {
self.participants.iter().map(|p| p.sender.clone()).collect()
}
/// Update a participant's alias. Returns true if the participant was found.
fn set_alias(&mut self, id: ParticipantId, alias: String) -> bool {
if let Some(p) = self.participants.iter_mut().find(|p| p.id == id) {
info!(participant = id, %alias, "alias updated");
p.alias = Some(alias);
true
} else {
false
}
}
fn is_empty(&self) -> bool {
self.participants.is_empty()
}
@@ -255,6 +266,26 @@ impl RoomManager {
}
}
/// Update a participant's alias and return a RoomUpdate + senders for broadcasting.
pub fn set_alias(
&mut self,
room_name: &str,
participant_id: ParticipantId,
alias: String,
) -> Option<(wzp_proto::SignalMessage, Vec<ParticipantSender>)> {
if let Some(room) = self.rooms.get_mut(room_name) {
if room.set_alias(participant_id, alias) {
let update = wzp_proto::SignalMessage::RoomUpdate {
count: room.len() as u32,
participants: room.participant_list(),
};
let senders = room.all_senders();
return Some((update, senders));
}
}
None
}
/// Get senders for all OTHER participants in a room.
pub fn others(
&self,
@@ -374,68 +405,111 @@ async fn run_participant_plain(
session_id: &str,
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
loop {
let pkt = match transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
info!(%addr, participant = participant_id, "disconnected");
break;
}
Err(e) => {
let msg = e.to_string();
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
info!(%addr, participant = participant_id, "connection closed: {e}");
} else {
error!(%addr, participant = participant_id, "recv error: {e}");
// Media forwarding task
let media_room_mgr = room_mgr.clone();
let media_room_name = room_name.clone();
let media_transport = transport.clone();
let media_metrics = metrics.clone();
let media_session_id = session_id.to_string();
let media_task = async move {
let mut packets_forwarded = 0u64;
loop {
let pkt = match media_transport.recv_media().await {
Ok(Some(pkt)) => pkt,
Ok(None) => {
info!(%addr, participant = participant_id, "disconnected");
break;
}
break;
}
};
// Update per-session quality metrics if a quality report is present
if let Some(ref report) = pkt.quality_report {
metrics.update_session_quality(session_id, report);
}
// Get current list of other participants
let others = {
let mgr = room_mgr.lock().await;
mgr.others(&room_name, participant_id)
};
// Forward to all others
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
match other {
ParticipantSender::Quic(t) => {
let _ = t.send_media(&pkt).await;
Err(e) => {
let msg = e.to_string();
if msg.contains("timed out") || msg.contains("reset") || msg.contains("closed") {
info!(%addr, participant = participant_id, "connection closed: {e}");
} else {
error!(%addr, participant = participant_id, "recv error: {e}");
}
break;
}
ParticipantSender::WebSocket(_) => {
// WS clients receive raw payload bytes
let _ = other.send_raw(&pkt.payload).await;
}
}
}
let fan_out = others.len() as u64;
metrics.packets_forwarded.inc_by(fan_out);
metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
if packets_forwarded % 500 == 0 {
let room_size = {
let mgr = room_mgr.lock().await;
mgr.room_size(&room_name)
};
info!(
room = %room_name,
participant = participant_id,
forwarded = packets_forwarded,
room_size,
"participant stats"
);
if let Some(ref report) = pkt.quality_report {
media_metrics.update_session_quality(&media_session_id, report);
}
let others = {
let mgr = media_room_mgr.lock().await;
mgr.others(&media_room_name, participant_id)
};
let pkt_bytes = pkt.payload.len() as u64;
for other in &others {
match other {
ParticipantSender::Quic(t) => {
let _ = t.send_media(&pkt).await;
}
ParticipantSender::WebSocket(_) => {
let _ = other.send_raw(&pkt.payload).await;
}
}
}
let fan_out = others.len() as u64;
media_metrics.packets_forwarded.inc_by(fan_out);
media_metrics.bytes_forwarded.inc_by(pkt_bytes * fan_out);
packets_forwarded += 1;
if packets_forwarded % 500 == 0 {
let room_size = {
let mgr = media_room_mgr.lock().await;
mgr.room_size(&media_room_name)
};
info!(
room = %media_room_name,
participant = participant_id,
forwarded = packets_forwarded,
room_size,
"participant stats"
);
}
}
};
// Signal handling task — processes SetAlias and other in-call signals
let signal_room_mgr = room_mgr.clone();
let signal_room_name = room_name.clone();
let signal_transport = transport.clone();
let signal_task = async move {
loop {
match signal_transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::SetAlias { alias })) => {
info!(%addr, participant = participant_id, %alias, "SetAlias received");
let mut mgr = signal_room_mgr.lock().await;
if let Some((update, senders)) =
mgr.set_alias(&signal_room_name, participant_id, alias)
{
drop(mgr);
broadcast_signal(&senders, &update).await;
}
}
Ok(Some(wzp_proto::SignalMessage::Hangup { .. })) => {
info!(%addr, participant = participant_id, "hangup received");
break;
}
Ok(Some(msg)) => {
info!(%addr, participant = participant_id, "signal: {:?}", std::mem::discriminant(&msg));
}
Ok(None) => break,
Err(e) => {
warn!(%addr, participant = participant_id, "signal recv error: {e}");
break;
}
}
}
};
// Run both in parallel — exit when either finishes (disconnection)
tokio::select! {
_ = media_task => {}
_ = signal_task => {}
}
// Clean up — leave room and broadcast update to remaining participants