fix(signal): add call_id to Hangup — prevents stale hangup killing new calls
Root cause: Hangup had no call_id field. The relay forwarded hangups to ALL active calls for a user. When user A hung up call 1 and user B immediately placed call 2, the relay's processing of A's hangup would also kill call 2 (race window ~1-2s). Fix: add optional call_id to Hangup (backwards-compatible via serde skip_serializing_if). When present, the relay only ends the named call. Old clients send call_id=None and get the legacy broadcast behavior. Also: clear pending_path_report in Hangup recv handler and internal_deregister to prevent stale oneshot channels from blocking subsequent call setups. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -355,7 +355,7 @@ impl WzpEngine {
|
|||||||
// Store call setup info for Kotlin to pick up
|
// Store call setup info for Kotlin to pick up
|
||||||
stats.incoming_call_id = Some(format!("{relay_addr}|{room}"));
|
stats.incoming_call_id = Some(format!("{relay_addr}|{room}"));
|
||||||
}
|
}
|
||||||
Ok(Some(SignalMessage::Hangup { reason })) => {
|
Ok(Some(SignalMessage::Hangup { reason, .. })) => {
|
||||||
info!(reason = ?reason, "signal: call ended by remote");
|
info!(reason = ?reason, "signal: call ended by remote");
|
||||||
let mut stats = signal_state.stats.lock().unwrap();
|
let mut stats = signal_state.stats.lock().unwrap();
|
||||||
stats.state = crate::stats::CallState::Closed;
|
stats.state = crate::stats::CallState::Closed;
|
||||||
|
|||||||
@@ -424,6 +424,7 @@ async fn run_silence(transport: Arc<wzp_transport::QuinnTransport>) -> anyhow::R
|
|||||||
info!(total_source, total_repair, total_bytes, "done — closing");
|
info!(total_source, total_repair, total_bytes, "done — closing");
|
||||||
let hangup = wzp_proto::SignalMessage::Hangup {
|
let hangup = wzp_proto::SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
};
|
};
|
||||||
transport.send_signal(&hangup).await.ok();
|
transport.send_signal(&hangup).await.ok();
|
||||||
transport.close().await?;
|
transport.close().await?;
|
||||||
@@ -575,6 +576,7 @@ async fn run_file_mode(
|
|||||||
// Send Hangup signal so the relay knows we're done
|
// Send Hangup signal so the relay knows we're done
|
||||||
let hangup = wzp_proto::SignalMessage::Hangup {
|
let hangup = wzp_proto::SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
};
|
};
|
||||||
transport.send_signal(&hangup).await.ok();
|
transport.send_signal(&hangup).await.ok();
|
||||||
|
|
||||||
@@ -865,6 +867,7 @@ async fn run_signal_mode(
|
|||||||
info!("hanging up...");
|
info!("hanging up...");
|
||||||
let _ = signal_transport.send_signal(&SignalMessage::Hangup {
|
let _ = signal_transport.send_signal(&SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
}).await;
|
}).await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -881,7 +884,7 @@ async fn run_signal_mode(
|
|||||||
Err(e) => error!("media connect failed: {e}"),
|
Err(e) => error!("media connect failed: {e}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SignalMessage::Hangup { reason } => {
|
SignalMessage::Hangup { reason, .. } => {
|
||||||
info!(reason = ?reason, "call ended by remote");
|
info!(reason = ?reason, "call ended by remote");
|
||||||
}
|
}
|
||||||
SignalMessage::Pong { .. } => {}
|
SignalMessage::Pong { .. } => {}
|
||||||
|
|||||||
@@ -170,6 +170,7 @@ mod tests {
|
|||||||
|
|
||||||
let hangup = SignalMessage::Hangup {
|
let hangup = SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
};
|
};
|
||||||
assert!(matches!(signal_to_call_type(&hangup), CallSignalType::Hangup));
|
assert!(matches!(signal_to_call_type(&hangup), CallSignalType::Hangup));
|
||||||
|
|
||||||
|
|||||||
@@ -199,6 +199,7 @@ fn wzp_answer_round_trips_through_fc_callsignal() {
|
|||||||
fn wzp_hangup_round_trips_through_fc_callsignal() {
|
fn wzp_hangup_round_trips_through_fc_callsignal() {
|
||||||
let hangup = wzp_proto::SignalMessage::Hangup {
|
let hangup = wzp_proto::SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let payload = wzp_client::featherchat::encode_call_payload(&hangup, None, None);
|
let payload = wzp_client::featherchat::encode_call_payload(&hangup, None, None);
|
||||||
@@ -302,6 +303,7 @@ fn all_signal_types_map_correctly() {
|
|||||||
(
|
(
|
||||||
wzp_proto::SignalMessage::Hangup {
|
wzp_proto::SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
},
|
},
|
||||||
"Hangup",
|
"Hangup",
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -608,8 +608,14 @@ pub enum SignalMessage {
|
|||||||
Ping { timestamp_ms: u64 },
|
Ping { timestamp_ms: u64 },
|
||||||
Pong { timestamp_ms: u64 },
|
Pong { timestamp_ms: u64 },
|
||||||
|
|
||||||
/// End the call.
|
/// End the call. `call_id` is optional for backwards compatibility
|
||||||
Hangup { reason: HangupReason },
|
/// with older clients that send Hangup without it — the relay falls
|
||||||
|
/// back to ending ALL active calls for the sender in that case.
|
||||||
|
Hangup {
|
||||||
|
reason: HangupReason,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
call_id: Option<String>,
|
||||||
|
},
|
||||||
|
|
||||||
/// featherChat bearer token for relay authentication.
|
/// featherChat bearer token for relay authentication.
|
||||||
/// Sent as the first signal message when --auth-url is configured.
|
/// Sent as the first signal message when --auth-url is configured.
|
||||||
@@ -1138,7 +1144,7 @@ mod tests {
|
|||||||
callee_local_addrs: Vec::new(),
|
callee_local_addrs: Vec::new(),
|
||||||
},
|
},
|
||||||
SignalMessage::CallRinging { call_id: "c1".into() },
|
SignalMessage::CallRinging { call_id: "c1".into() },
|
||||||
SignalMessage::Hangup { reason: HangupReason::Normal },
|
SignalMessage::Hangup { reason: HangupReason::Normal, call_id: None },
|
||||||
];
|
];
|
||||||
for inner in cases {
|
for inner in cases {
|
||||||
let inner_disc = std::mem::discriminant(&inner);
|
let inner_disc = std::mem::discriminant(&inner);
|
||||||
@@ -1296,7 +1302,7 @@ mod tests {
|
|||||||
let cases = vec![
|
let cases = vec![
|
||||||
SignalMessage::Ping { timestamp_ms: 12345 },
|
SignalMessage::Ping { timestamp_ms: 12345 },
|
||||||
SignalMessage::Hold,
|
SignalMessage::Hold,
|
||||||
SignalMessage::Hangup { reason: HangupReason::Normal },
|
SignalMessage::Hangup { reason: HangupReason::Normal, call_id: None },
|
||||||
SignalMessage::CallRinging { call_id: "abcd".into() },
|
SignalMessage::CallRinging { call_id: "abcd".into() },
|
||||||
];
|
];
|
||||||
for m in cases {
|
for m in cases {
|
||||||
|
|||||||
@@ -611,6 +611,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
&caller_fp,
|
&caller_fp,
|
||||||
&SignalMessage::Hangup {
|
&SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -1071,6 +1072,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
info!(%addr, target = %target_fp, "call target not online (no federation route)");
|
info!(%addr, target = %target_fp, "call target not online (no federation route)");
|
||||||
let _ = transport.send_signal(&SignalMessage::Hangup {
|
let _ = transport.send_signal(&SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
}).await;
|
}).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -1186,6 +1188,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
if let Some(ref fm) = federation_mgr {
|
if let Some(ref fm) = federation_mgr {
|
||||||
let hangup = SignalMessage::Hangup {
|
let hangup = SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: Some(call_id.clone()),
|
||||||
};
|
};
|
||||||
let forward = SignalMessage::FederatedSignalForward {
|
let forward = SignalMessage::FederatedSignalForward {
|
||||||
inner: Box::new(hangup),
|
inner: Box::new(hangup),
|
||||||
@@ -1199,6 +1202,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let hub = signal_hub.lock().await;
|
let hub = signal_hub.lock().await;
|
||||||
let _ = hub.send_to(&peer_fp, &SignalMessage::Hangup {
|
let _ = hub.send_to(&peer_fp, &SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: Some(call_id.clone()),
|
||||||
}).await;
|
}).await;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -1303,25 +1307,40 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SignalMessage::Hangup { .. } => {
|
SignalMessage::Hangup { ref call_id, .. } => {
|
||||||
// Forward hangup to all active calls for this user
|
// If the client sent a call_id, only end
|
||||||
|
// that specific call. Otherwise (old clients)
|
||||||
|
// fall back to ending ALL active calls for
|
||||||
|
// this user — which can race with new calls.
|
||||||
let calls = {
|
let calls = {
|
||||||
let reg = call_registry.lock().await;
|
let reg = call_registry.lock().await;
|
||||||
reg.calls_for_fingerprint(&client_fp)
|
if let Some(cid) = call_id {
|
||||||
.iter()
|
// Targeted hangup: only the named call
|
||||||
.map(|c| (c.call_id.clone(), if c.caller_fingerprint == client_fp {
|
reg.get(cid)
|
||||||
c.callee_fingerprint.clone()
|
.map(|c| vec![(c.call_id.clone(), if c.caller_fingerprint == client_fp {
|
||||||
} else {
|
c.callee_fingerprint.clone()
|
||||||
c.caller_fingerprint.clone()
|
} else {
|
||||||
}))
|
c.caller_fingerprint.clone()
|
||||||
.collect::<Vec<_>>()
|
})])
|
||||||
|
.unwrap_or_default()
|
||||||
|
} else {
|
||||||
|
// Legacy: end all calls for this user
|
||||||
|
reg.calls_for_fingerprint(&client_fp)
|
||||||
|
.iter()
|
||||||
|
.map(|c| (c.call_id.clone(), if c.caller_fingerprint == client_fp {
|
||||||
|
c.callee_fingerprint.clone()
|
||||||
|
} else {
|
||||||
|
c.caller_fingerprint.clone()
|
||||||
|
}))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
}
|
||||||
};
|
};
|
||||||
for (call_id, peer_fp) in &calls {
|
for (cid, peer_fp) in &calls {
|
||||||
let hub = signal_hub.lock().await;
|
let hub = signal_hub.lock().await;
|
||||||
let _ = hub.send_to(peer_fp, &msg).await;
|
let _ = hub.send_to(peer_fp, &msg).await;
|
||||||
drop(hub);
|
drop(hub);
|
||||||
let mut reg = call_registry.lock().await;
|
let mut reg = call_registry.lock().await;
|
||||||
reg.end_call(call_id);
|
reg.end_call(cid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1440,6 +1459,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let hub = signal_hub.lock().await;
|
let hub = signal_hub.lock().await;
|
||||||
let _ = hub.send_to(peer_fp, &SignalMessage::Hangup {
|
let _ = hub.send_to(peer_fp, &SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: Some(call_id.clone()),
|
||||||
}).await;
|
}).await;
|
||||||
drop(hub);
|
drop(hub);
|
||||||
let mut reg = call_registry.lock().await;
|
let mut reg = call_registry.lock().await;
|
||||||
|
|||||||
@@ -971,6 +971,7 @@ async fn internal_deregister(
|
|||||||
sig.incoming_caller_fp = None;
|
sig.incoming_caller_fp = None;
|
||||||
sig.incoming_caller_alias = None;
|
sig.incoming_caller_alias = None;
|
||||||
sig.pending_reflect = None;
|
sig.pending_reflect = None;
|
||||||
|
sig.pending_path_report = None;
|
||||||
sig.own_reflex_addr = None;
|
sig.own_reflex_addr = None;
|
||||||
if !keep_desired {
|
if !keep_desired {
|
||||||
sig.desired_relay_addr = None;
|
sig.desired_relay_addr = None;
|
||||||
@@ -1151,10 +1152,14 @@ fn do_register_signal(
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(Some(SignalMessage::Hangup { reason })) => {
|
Ok(Some(SignalMessage::Hangup { reason, .. })) => {
|
||||||
tracing::info!(?reason, "signal: Hangup");
|
tracing::info!(?reason, "signal: Hangup");
|
||||||
emit_call_debug(&app_clone, "recv:Hangup", serde_json::json!({ "reason": format!("{:?}", reason) }));
|
emit_call_debug(&app_clone, "recv:Hangup", serde_json::json!({ "reason": format!("{:?}", reason) }));
|
||||||
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None; sig.ipv6_endpoint = None;
|
let mut sig = signal_state.lock().await;
|
||||||
|
sig.signal_status = "registered".into();
|
||||||
|
sig.incoming_call_id = None;
|
||||||
|
sig.ipv6_endpoint = None;
|
||||||
|
sig.pending_path_report = None;
|
||||||
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
|
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
|
||||||
}
|
}
|
||||||
Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => {
|
Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => {
|
||||||
@@ -1853,6 +1858,7 @@ async fn hangup_call(
|
|||||||
match transport
|
match transport
|
||||||
.send_signal(&SignalMessage::Hangup {
|
.send_signal(&SignalMessage::Hangup {
|
||||||
reason: wzp_proto::HangupReason::Normal,
|
reason: wzp_proto::HangupReason::Normal,
|
||||||
|
call_id: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user