fix: graceful Ctrl+C recording + relay echo mode

- --record now handles Ctrl+C: saves PCM file before exiting
- Relay without --remote runs in echo mode (loops packets back to sender)
  instead of sink mode, enabling single-relay audio testing
- recv task returns collected PCM via channel for clean file write

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-03-27 16:32:12 +04:00
parent df80ad5343
commit b147de5ae9
2 changed files with 77 additions and 53 deletions

View File

@@ -256,13 +256,14 @@ async fn run_file_mode(
// --- Recv task: decode and write to file --- // --- Recv task: decode and write to file ---
let recv_transport = transport.clone(); let recv_transport = transport.clone();
let record_path = record_file.clone();
let recv_handle = tokio::spawn(async move { let recv_handle = tokio::spawn(async move {
let record_path = match record_file { let record_path = match record_path {
Some(p) => p, Some(p) => p,
None => { None => {
// No recording, just wait // No recording, just wait for send to finish or Ctrl+C
tokio::signal::ctrl_c().await.ok(); tokio::signal::ctrl_c().await.ok();
return; return Vec::new();
} }
}; };
@@ -271,64 +272,80 @@ async fn run_file_mode(
let mut all_pcm: Vec<i16> = Vec::new(); let mut all_pcm: Vec<i16> = Vec::new();
let mut frames_received = 0u64; let mut frames_received = 0u64;
info!(file = %record_path, "recording received audio"); info!(file = %record_path, "recording received audio (Ctrl+C to stop and save)");
loop { loop {
match recv_transport.recv_media().await { tokio::select! {
Ok(Some(pkt)) => { result = recv_transport.recv_media() => {
decoder.ingest(pkt); match result {
while let Some(n) = decoder.decode_next(&mut pcm_buf) { Ok(Some(pkt)) => {
all_pcm.extend_from_slice(&pcm_buf[..n]); decoder.ingest(pkt);
frames_received += 1; while let Some(n) = decoder.decode_next(&mut pcm_buf) {
if frames_received % 250 == 0 { all_pcm.extend_from_slice(&pcm_buf[..n]);
info!( frames_received += 1;
frames = frames_received, if frames_received % 250 == 0 {
samples = all_pcm.len(), info!(
"recv progress" frames = frames_received,
); samples = all_pcm.len(),
"recv progress"
);
}
}
}
Ok(None) => {
info!("connection closed by remote");
break;
}
Err(e) => {
error!("recv error: {e}");
break;
} }
} }
} }
Ok(None) => { _ = tokio::signal::ctrl_c() => {
info!("connection closed by remote"); info!("Ctrl+C received, saving recording...");
break;
}
Err(e) => {
error!("recv error: {e}");
break; break;
} }
} }
} }
// Write raw PCM to file all_pcm
});
// Wait for send to finish (or ctrl+c in recv)
let _ = send_handle.await;
// If send finished but recv is still going, give it a moment then stop
let all_pcm = if record_file.is_some() {
// Wait a bit for remaining packets after sender finishes
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// The recv task will be aborted when we drop it, but first
// let's signal it by closing transport
transport.close().await?;
recv_handle.await.unwrap_or_default()
} else {
recv_handle.await.unwrap_or_default()
};
// Write recorded audio to file
if let Some(ref path) = record_file {
if !all_pcm.is_empty() { if !all_pcm.is_empty() {
let bytes: Vec<u8> = all_pcm let bytes: Vec<u8> = all_pcm.iter().flat_map(|s| s.to_le_bytes()).collect();
.iter() std::fs::write(path, &bytes)?;
.flat_map(|s| s.to_le_bytes()) let duration_secs = all_pcm.len() as f64 / 48_000.0;
.collect(); info!(
if let Err(e) = std::fs::write(&record_path, &bytes) { file = %path,
error!(file = %record_path, "write error: {e}"); samples = all_pcm.len(),
} else { duration = format!("{:.1}s", duration_secs),
let duration_secs = all_pcm.len() as f64 / 48_000.0; bytes = bytes.len(),
info!( "recording saved"
file = %record_path, );
frames = frames_received, info!("play with: ffplay -f s16le -ar 48000 -ac 1 {path}");
samples = all_pcm.len(),
duration_secs = format!("{:.1}", duration_secs),
bytes = bytes.len(),
"recording saved"
);
info!("play with: ffplay -f s16le -ar 48000 -ac 1 {record_path}");
}
} else { } else {
info!("no audio received, nothing to write"); info!("no audio received, nothing to write");
} }
}); }
// Wait for both tasks
let _ = tokio::join!(send_handle, recv_handle);
transport.close().await?;
Ok(()) Ok(())
} }

View File

@@ -290,16 +290,23 @@ async fn main() -> anyhow::Result<()> {
info!(%remote_addr, "session ended"); info!(%remote_addr, "session ended");
} }
None => { None => {
// No remote relay configured — just receive and log (sink mode) // No remote relay — echo mode: send packets back to the sender.
warn!("no remote relay configured, running in sink mode"); // This lets two clients on the same relay talk to each other,
// or a single client hear its own audio looped back.
info!("no remote relay configured, running in echo mode");
let mut echo_count = 0u64;
loop { loop {
match client_transport.recv_media().await { match client_transport.recv_media().await {
Ok(Some(packet)) => { Ok(Some(packet)) => {
tracing::trace!( // Echo the packet back to the sender
seq = packet.header.seq, if let Err(e) = client_transport.send_media(&packet).await {
block = packet.header.fec_block, error!("echo send error: {e}");
"received media packet (sink)" break;
); }
echo_count += 1;
if echo_count % 250 == 0 {
info!(echoed = echo_count, "echo mode stats");
}
} }
Ok(None) => { Ok(None) => {
info!(%remote_addr, "connection closed"); info!(%remote_addr, "connection closed");