T3.3: SignalMessage version field

This commit is contained in:
Siavash Sameni
2026-05-12 06:08:31 +04:00
parent 1b4f7b0772
commit e73f8a7150
30 changed files with 531 additions and 69 deletions

View File

@@ -15,7 +15,7 @@ use sha2::{Digest, Sha256};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
use wzp_transport::QuinnTransport;
use crate::config::{PeerConfig, TrustedConfig};
@@ -520,7 +520,11 @@ async fn run_room_event_dispatcher(
if fm.is_global_room(&room) {
let participants = fm.room_mgr.local_participant_list(&room);
info!(room = %room, count = participants.len(), "global room now active, announcing to peers");
let msg = SignalMessage::GlobalRoomActive { room, participants };
let msg = SignalMessage::GlobalRoomActive {
version: default_signal_version(),
room,
participants,
};
let transports: Vec<Arc<QuinnTransport>> = {
let links = fm.peer_links.lock().await;
links.values().map(|l| l.transport.clone()).collect()
@@ -533,7 +537,10 @@ async fn run_room_event_dispatcher(
Ok(RoomEvent::LocalLeave { room }) => {
if fm.is_global_room(&room) {
info!(room = %room, "global room now inactive, announcing to peers");
let msg = SignalMessage::GlobalRoomInactive { room };
let msg = SignalMessage::GlobalRoomInactive {
version: default_signal_version(),
room,
};
let transports: Vec<Arc<QuinnTransport>> = {
let links = fm.peer_links.lock().await;
links.values().map(|l| l.transport.clone()).collect()
@@ -609,6 +616,7 @@ async fn run_stale_presence_sweeper(fm: Arc<FederationManager>) {
let mut seen = HashSet::new();
all_participants.retain(|p| seen.insert(p.fingerprint.clone()));
let update = SignalMessage::RoomUpdate {
version: default_signal_version(),
count: all_participants.len() as u32,
participants: all_participants,
};
@@ -659,6 +667,7 @@ async fn connect_to_peer(
// Send hello with our TLS fingerprint
let hello = SignalMessage::FederationHello {
version: default_signal_version(),
tls_fingerprint: fm.local_tls_fp.clone(),
};
transport
@@ -710,6 +719,7 @@ async fn run_federation_link(
let participants = fm.room_mgr.local_participant_list(room_name);
info!(peer = %peer_label, room = %room_name, participants = participants.len(), "announcing local global room to new peer");
msgs.push(SignalMessage::GlobalRoomActive {
version: default_signal_version(),
room: room_name.clone(),
participants,
});
@@ -724,6 +734,7 @@ async fn run_federation_link(
if fm.is_global_room(room) {
info!(peer = %peer_label, room = %room, via = %link.label, "propagating remote room to new peer");
msgs.push(SignalMessage::GlobalRoomActive {
version: default_signal_version(),
room: room.clone(),
participants: participants.clone(),
});
@@ -837,7 +848,9 @@ async fn handle_signal(
}
match msg {
SignalMessage::GlobalRoomActive { room, participants } => {
SignalMessage::GlobalRoomActive {
room, participants, ..
} => {
if fm.is_global_room(&room) {
info!(peer = %peer_label, room = %room, remote_participants = participants.len(), "peer has global room active");
let mut links = fm.peer_links.lock().await;
@@ -882,6 +895,7 @@ async fn handle_signal(
let _ = link
.transport
.send_signal(&SignalMessage::GlobalRoomActive {
version: default_signal_version(),
room: room.clone(),
participants: tagged_for_propagation.clone(),
})
@@ -923,6 +937,7 @@ async fn handle_signal(
let mut seen = HashSet::new();
all_participants.retain(|p| seen.insert(p.fingerprint.clone()));
let update = SignalMessage::RoomUpdate {
version: default_signal_version(),
count: all_participants.len() as u32,
participants: all_participants,
};
@@ -933,7 +948,7 @@ async fn handle_signal(
}
}
}
SignalMessage::GlobalRoomInactive { room } => {
SignalMessage::GlobalRoomInactive { room, .. } => {
info!(peer = %peer_label, room = %room, "peer global room now inactive");
let mut links = fm.peer_links.lock().await;
if let Some(link) = links.get_mut(peer_fp) {
@@ -999,6 +1014,7 @@ async fn handle_signal(
}
}
let msg = SignalMessage::GlobalRoomActive {
version: default_signal_version(),
room: room.clone(),
participants: updated_participants,
};
@@ -1007,7 +1023,10 @@ async fn handle_signal(
}
} else {
// No participants left anywhere — propagate inactive
let msg = SignalMessage::GlobalRoomInactive { room: room.clone() };
let msg = SignalMessage::GlobalRoomInactive {
version: default_signal_version(),
room: room.clone(),
};
for transport in &peer_sends {
let _ = transport.send_signal(&msg).await;
}
@@ -1025,6 +1044,7 @@ async fn handle_signal(
let mut seen = HashSet::new();
all_participants.retain(|p| seen.insert(p.fingerprint.clone()));
let update = SignalMessage::RoomUpdate {
version: default_signal_version(),
count: all_participants.len() as u32,
participants: all_participants,
};
@@ -1050,6 +1070,7 @@ async fn handle_signal(
SignalMessage::FederatedSignalForward {
inner,
origin_relay_fp,
..
} => {
if origin_relay_fp == fm.local_tls_fp {
tracing::debug!(

View File

@@ -4,7 +4,7 @@
//! recv `CallOffer` → verify → generate ephemeral → derive session → send `CallAnswer`.
use wzp_crypto::{CryptoSession, KeyExchange, WarzoneKeyExchange};
use wzp_proto::{MediaTransport, QualityProfile, SignalMessage};
use wzp_proto::{MediaTransport, QualityProfile, SignalMessage, default_signal_version};
/// Accept the relay (callee) side of the cryptographic handshake.
///
@@ -51,6 +51,7 @@ pub async fn accept_handshake(
alias,
protocol_version,
supported_versions: _,
..
} => (
identity_pub,
ephemeral_pub,
@@ -70,6 +71,7 @@ pub async fn accept_handshake(
// 1a. Protocol version check — we only speak v2.
if protocol_version != 2 {
let mismatch = SignalMessage::Hangup {
version: default_signal_version(),
reason: wzp_proto::HangupReason::ProtocolVersionMismatch {
server_supported: vec![2],
},
@@ -108,6 +110,7 @@ pub async fn accept_handshake(
// 6. Send CallAnswer
let answer = SignalMessage::CallAnswer {
version: default_signal_version(),
identity_pub,
ephemeral_pub,
signature,

View File

@@ -16,7 +16,7 @@ use clap::Parser;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
use wzp_relay::config::RelayConfig;
use wzp_relay::metrics::RelayMetrics;
use wzp_relay::pipeline::{PipelineConfig, RelayPipeline};
@@ -640,6 +640,7 @@ async fn main() -> anyhow::Result<()> {
.send_to(
&caller_fp,
&SignalMessage::Hangup {
version: default_signal_version(),
reason: wzp_proto::HangupReason::Normal,
call_id: None,
},
@@ -685,6 +686,7 @@ async fn main() -> anyhow::Result<()> {
// Emit the LOCAL CallSetup to our local caller.
let setup = SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: room_name.clone(),
relay_addr: advertised_addr_d.clone(),
@@ -703,7 +705,7 @@ async fn main() -> anyhow::Result<()> {
);
}
SignalMessage::CallRinging { ref call_id } => {
SignalMessage::CallRinging { ref call_id, .. } => {
// Forward to local caller for "ringing..." UX.
let caller_fp = {
let reg = call_registry_d.lock().await;
@@ -866,9 +868,12 @@ async fn main() -> anyhow::Result<()> {
info!(%addr, "probe connection detected, entering Ping/Pong + presence responder");
loop {
match transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::Ping { timestamp_ms })) => {
Ok(Some(wzp_proto::SignalMessage::Ping { timestamp_ms, .. })) => {
if let Err(e) = transport
.send_signal(&wzp_proto::SignalMessage::Pong { timestamp_ms })
.send_signal(&wzp_proto::SignalMessage::Pong {
version: default_signal_version(),
timestamp_ms,
})
.await
{
error!(%addr, "probe pong send error: {e}");
@@ -878,6 +883,7 @@ async fn main() -> anyhow::Result<()> {
Ok(Some(wzp_proto::SignalMessage::PresenceUpdate {
fingerprints,
relay_addr,
..
})) => {
// A peer relay is telling us which fingerprints it has
let peer_addr: std::net::SocketAddr =
@@ -894,6 +900,7 @@ async fn main() -> anyhow::Result<()> {
reg.local_fingerprints().into_iter().collect()
};
let reply = wzp_proto::SignalMessage::PresenceUpdate {
version: default_signal_version(),
fingerprints: local_fps,
relay_addr: addr.to_string(),
};
@@ -902,7 +909,9 @@ async fn main() -> anyhow::Result<()> {
break;
}
}
Ok(Some(wzp_proto::SignalMessage::RouteQuery { fingerprint, ttl })) => {
Ok(Some(wzp_proto::SignalMessage::RouteQuery {
fingerprint, ttl, ..
})) => {
// Look up the fingerprint in our local registry
let reg = presence.lock().await;
let route = route_resolver.resolve(&reg, &fingerprint);
@@ -930,6 +939,7 @@ async fn main() -> anyhow::Result<()> {
};
let reply = wzp_proto::SignalMessage::RouteResponse {
version: default_signal_version(),
fingerprint,
found,
relay_chain,
@@ -968,6 +978,7 @@ async fn main() -> anyhow::Result<()> {
{
Ok(Ok(Some(wzp_proto::SignalMessage::FederationHello {
tls_fingerprint,
..
}))) => tls_fingerprint,
_ => {
warn!(%addr, "federation: no hello received, closing");
@@ -1004,7 +1015,7 @@ async fn main() -> anyhow::Result<()> {
// Optional auth
let auth_fp: Option<String> = if let Some(ref url) = auth_url {
match transport.recv_signal().await {
Ok(Some(SignalMessage::AuthToken { token })) => {
Ok(Some(SignalMessage::AuthToken { token, .. })) => {
match wzp_relay::auth::validate_token(url, &token).await {
Ok(client) => Some(client.fingerprint),
Err(e) => {
@@ -1033,6 +1044,7 @@ async fn main() -> anyhow::Result<()> {
identity_pub,
signature: _,
alias,
..
}))) => {
// Compute fingerprint: SHA-256(Ed25519 pub key)[:16], same as Fingerprint type
let fp = {
@@ -1067,6 +1079,7 @@ async fn main() -> anyhow::Result<()> {
// Send ack
let _ = transport
.send_signal(&SignalMessage::RegisterPresenceAck {
version: default_signal_version(),
success: true,
error: None,
relay_build: Some(BUILD_GIT_HASH.to_string()),
@@ -1126,6 +1139,7 @@ async fn main() -> anyhow::Result<()> {
// federation has a matching entry.
let forwarded = if let Some(ref fm) = federation_mgr {
let forward = SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
@@ -1149,6 +1163,7 @@ async fn main() -> anyhow::Result<()> {
info!(%addr, target = %target_fp, "call target not online (no federation route)");
let _ = transport
.send_signal(&SignalMessage::Hangup {
version: default_signal_version(),
reason: wzp_proto::HangupReason::Normal,
call_id: None,
})
@@ -1193,6 +1208,7 @@ async fn main() -> anyhow::Result<()> {
// federated delivery is in flight.
let _ = transport
.send_signal(&SignalMessage::CallRinging {
version: default_signal_version(),
call_id: call_id.clone(),
})
.await;
@@ -1236,6 +1252,7 @@ async fn main() -> anyhow::Result<()> {
drop(hub);
let _ = transport
.send_signal(&SignalMessage::CallRinging {
version: default_signal_version(),
call_id: call_id.clone(),
})
.await;
@@ -1293,11 +1310,13 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref origin_fp) = peer_relay_fp {
if let Some(ref fm) = federation_mgr {
let hangup = SignalMessage::Hangup {
version: default_signal_version(),
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
};
let forward =
SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(hangup),
origin_relay_fp: tls_fp.clone(),
};
@@ -1314,6 +1333,7 @@ async fn main() -> anyhow::Result<()> {
.send_to(
&peer_fp,
&SignalMessage::Hangup {
version: default_signal_version(),
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
},
@@ -1390,6 +1410,7 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref fm) = federation_mgr {
let forward =
SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
@@ -1407,6 +1428,7 @@ async fn main() -> anyhow::Result<()> {
}
let setup_for_callee = SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: room.clone(),
relay_addr: relay_addr_for_setup,
@@ -1429,6 +1451,7 @@ async fn main() -> anyhow::Result<()> {
// cross-wired candidates (Phase 5.5 ICE
// + Phase 8 port-mapped addrs).
let setup_for_caller = SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: room.clone(),
relay_addr: relay_addr_for_setup.clone(),
@@ -1437,6 +1460,7 @@ async fn main() -> anyhow::Result<()> {
peer_mapped_addr: callee_mapped,
};
let setup_for_callee = SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: room.clone(),
relay_addr: relay_addr_for_setup,
@@ -1524,6 +1548,7 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref fm) = federation_mgr {
let forward =
SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
@@ -1568,6 +1593,7 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref fm) = federation_mgr {
let forward =
SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
@@ -1615,6 +1641,7 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref fm) = federation_mgr {
let forward =
SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(msg.clone()),
origin_relay_fp: tls_fp.clone(),
};
@@ -1629,9 +1656,12 @@ async fn main() -> anyhow::Result<()> {
}
}
SignalMessage::Ping { timestamp_ms } => {
SignalMessage::Ping { timestamp_ms, .. } => {
let _ = transport
.send_signal(&SignalMessage::Pong { timestamp_ms })
.send_signal(&SignalMessage::Pong {
version: default_signal_version(),
timestamp_ms,
})
.await;
}
@@ -1651,6 +1681,7 @@ async fn main() -> anyhow::Result<()> {
let observed_addr = addr.to_string();
if let Err(e) = transport
.send_signal(&SignalMessage::ReflectResponse {
version: default_signal_version(),
observed_addr: observed_addr.clone(),
})
.await
@@ -1710,6 +1741,7 @@ async fn main() -> anyhow::Result<()> {
.send_to(
peer_fp,
&SignalMessage::Hangup {
version: default_signal_version(),
reason: wzp_proto::HangupReason::Normal,
call_id: Some(call_id.clone()),
},
@@ -1741,7 +1773,7 @@ async fn main() -> anyhow::Result<()> {
let authenticated_fp: Option<String> = if let Some(ref url) = auth_url {
info!(%addr, "waiting for auth token...");
match transport.recv_signal().await {
Ok(Some(wzp_proto::SignalMessage::AuthToken { token })) => {
Ok(Some(wzp_proto::SignalMessage::AuthToken { token, .. })) => {
match wzp_relay::auth::validate_token(url, &token).await {
Ok(client) => {
metrics.auth_attempts.with_label_values(&["ok"]).inc();
@@ -1913,6 +1945,7 @@ async fn main() -> anyhow::Result<()> {
if let SignalMessage::RoomUpdate {
count: _,
participants: mut local_parts,
..
} = update
{
let remote = fm.get_remote_participants(&room_name).await;
@@ -1921,6 +1954,7 @@ async fn main() -> anyhow::Result<()> {
let mut seen = std::collections::HashSet::new();
local_parts.retain(|p| seen.insert(p.fingerprint.clone()));
SignalMessage::RoomUpdate {
version: default_signal_version(),
count: local_parts.len() as u32,
participants: local_parts,
}

View File

@@ -13,7 +13,7 @@ use prometheus::{Gauge, IntGauge, Opts, Registry};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
/// Configuration for a single probe target.
#[derive(Clone, Debug)]
@@ -229,7 +229,7 @@ impl ProbeRunner {
let recv_handle = tokio::spawn(async move {
loop {
match recv_transport.recv_signal().await {
Ok(Some(SignalMessage::Pong { timestamp_ms })) => {
Ok(Some(SignalMessage::Pong { timestamp_ms, .. })) => {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
@@ -244,6 +244,7 @@ impl ProbeRunner {
Ok(Some(SignalMessage::PresenceUpdate {
fingerprints,
relay_addr,
..
})) => {
if let Some(ref reg) = recv_presence {
// Parse the relay_addr; fall back to the connection target
@@ -293,7 +294,10 @@ impl ProbeRunner {
}
if let Err(e) = transport
.send_signal(&SignalMessage::Ping { timestamp_ms })
.send_signal(&SignalMessage::Ping {
version: default_signal_version(),
timestamp_ms,
})
.await
{
error!(target = %self.config.target, "probe ping send error: {e}");
@@ -310,6 +314,7 @@ impl ProbeRunner {
r.local_fingerprints().into_iter().collect()
};
let msg = SignalMessage::PresenceUpdate {
version: default_signal_version(),
fingerprints: fps,
relay_addr: self.config.target.to_string(),
};
@@ -426,9 +431,10 @@ pub fn mesh_summary(registry: &Registry) -> String {
/// Handle an incoming Ping signal by replying with a Pong carrying the same timestamp.
/// Returns true if the message was a Ping and was handled, false otherwise.
pub async fn handle_ping(transport: &wzp_transport::QuinnTransport, msg: &SignalMessage) -> bool {
if let SignalMessage::Ping { timestamp_ms } = msg {
if let SignalMessage::Ping { timestamp_ms, .. } = msg {
if let Err(e) = transport
.send_signal(&SignalMessage::Pong {
version: default_signal_version(),
timestamp_ms: *timestamp_ms,
})
.await

View File

@@ -333,10 +333,11 @@ mod tests {
#[test]
fn session_forward_signal_roundtrip() {
use wzp_proto::SignalMessage;
use wzp_proto::{SignalMessage, default_signal_version};
// SessionForward roundtrip
let msg = SignalMessage::SessionForward {
version: default_signal_version(),
session_id: "abcd1234".to_string(),
target_fingerprint: "deadbeef".to_string(),
source_relay: "10.0.0.1:4433".to_string(),
@@ -348,6 +349,7 @@ mod tests {
session_id,
target_fingerprint,
source_relay,
..
} => {
assert_eq!(session_id, "abcd1234");
assert_eq!(target_fingerprint, "deadbeef");
@@ -358,6 +360,7 @@ mod tests {
// SessionForwardAck roundtrip
let ack = SignalMessage::SessionForwardAck {
version: default_signal_version(),
session_id: "abcd1234".to_string(),
room_name: "relay-room-42".to_string(),
};
@@ -367,6 +370,7 @@ mod tests {
SignalMessage::SessionForwardAck {
session_id,
room_name,
..
} => {
assert_eq!(session_id, "abcd1234");
assert_eq!(room_name, "relay-room-42");

View File

@@ -13,10 +13,10 @@ use bytes::Bytes;
use dashmap::DashMap;
use tracing::{error, info, warn};
use wzp_proto::MediaTransport;
use wzp_proto::packet::TrunkFrame;
use wzp_proto::quality::{AdaptiveQualityController, Tier};
use wzp_proto::traits::QualityController;
use wzp_proto::{MediaTransport, default_signal_version};
use crate::conformance::ConformanceMeter;
use crate::metrics::RelayMetrics;
@@ -64,6 +64,7 @@ impl DebugTap {
wzp_proto::SignalMessage::RoomUpdate {
count,
participants,
..
} => {
let names: Vec<&str> = participants
.iter()
@@ -81,6 +82,7 @@ impl DebugTap {
wzp_proto::SignalMessage::QualityDirective {
recommended_profile,
reason,
..
} => {
info!(
target: "debug_tap",
@@ -493,6 +495,7 @@ impl RoomManager {
);
room.qualities.insert(id, ParticipantQuality::new());
let update = wzp_proto::SignalMessage::RoomUpdate {
version: default_signal_version(),
count: room.len() as u32,
participants: room.participant_list(),
};
@@ -570,6 +573,7 @@ impl RoomManager {
return None;
}
let update = wzp_proto::SignalMessage::RoomUpdate {
version: default_signal_version(),
count: room.len() as u32,
participants: room.participant_list(),
};
@@ -654,6 +658,7 @@ impl RoomManager {
);
let directive = wzp_proto::SignalMessage::QualityDirective {
version: default_signal_version(),
recommended_profile: profile,
reason: Some(format!("weakest link: {weakest:?}")),
};

View File

@@ -201,9 +201,10 @@ mod tests {
#[test]
fn route_query_signal_roundtrip() {
use wzp_proto::SignalMessage;
use wzp_proto::{SignalMessage, default_signal_version};
let query = SignalMessage::RouteQuery {
version: default_signal_version(),
fingerprint: "aabbccdd".to_string(),
ttl: 3,
};
@@ -211,11 +212,12 @@ mod tests {
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(
decoded,
SignalMessage::RouteQuery { ref fingerprint, ttl }
SignalMessage::RouteQuery { ref fingerprint, ttl, ..}
if fingerprint == "aabbccdd" && ttl == 3
));
let response = SignalMessage::RouteResponse {
version: default_signal_version(),
fingerprint: "aabbccdd".to_string(),
found: true,
relay_chain: vec!["10.0.0.1:4433".to_string(), "10.0.0.2:4433".to_string()],
@@ -224,7 +226,7 @@ mod tests {
let decoded: SignalMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(
decoded,
SignalMessage::RouteResponse { ref fingerprint, found, ref relay_chain }
SignalMessage::RouteResponse { ref fingerprint, found, ref relay_chain, ..}
if fingerprint == "aabbccdd" && found && relay_chain.len() == 2
));
}

View File

@@ -8,7 +8,7 @@ use std::sync::Arc;
use std::time::Instant;
use tracing::info;
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
use wzp_transport::QuinnTransport;
/// A client connected via `_signal` for direct calling.
@@ -101,7 +101,10 @@ impl SignalHub {
alias: c.alias.clone(),
})
.collect();
SignalMessage::PresenceList { users }
SignalMessage::PresenceList {
version: default_signal_version(),
users,
}
}
/// Broadcast a message to ALL connected signal clients.

View File

@@ -24,7 +24,7 @@
//! Bob's CallSetup carries Alice's reflex addr — cross-wired
//! through two relays + a federation link.
use wzp_proto::{CallAcceptMode, SignalMessage};
use wzp_proto::{CallAcceptMode, SignalMessage, default_signal_version};
use wzp_relay::call_registry::CallRegistry;
// ────────────────────────────────────────────────────────────────
@@ -42,6 +42,7 @@ const RELAY_B_ADDR: &str = "203.0.113.10:4433";
/// Helper that Alice's place_call sends.
fn alice_offer(call_id: &str) -> SignalMessage {
SignalMessage::DirectCallOffer {
version: default_signal_version(),
caller_fingerprint: "alice".into(),
caller_alias: None,
target_fingerprint: "bob".into(),
@@ -84,6 +85,7 @@ fn relay_a_handle_offer(reg_a: &mut CallRegistry, offer: &SignalMessage) -> Sign
// Build the federation envelope the main loop would
// broadcast.
SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(offer.clone()),
origin_relay_fp: RELAY_A_TLS_FP.into(),
}
@@ -97,6 +99,7 @@ fn relay_b_handle_forwarded_offer(reg_b: &mut CallRegistry, forward: &SignalMess
SignalMessage::FederatedSignalForward {
inner,
origin_relay_fp,
..
} => (inner.as_ref().clone(), origin_relay_fp.clone()),
_ => panic!("not a forward"),
};
@@ -123,6 +126,7 @@ fn relay_b_handle_forwarded_offer(reg_b: &mut CallRegistry, forward: &SignalMess
/// Bob's answer — AcceptTrusted with his reflex addr.
fn bob_answer(call_id: &str) -> SignalMessage {
SignalMessage::DirectCallAnswer {
version: default_signal_version(),
call_id: call_id.into(),
accept_mode: CallAcceptMode::AcceptTrusted,
identity_pub: None,
@@ -166,12 +170,14 @@ fn relay_b_handle_local_answer(
// Forward the answer back over federation.
let forward = SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(answer.clone()),
origin_relay_fp: RELAY_B_TLS_FP.into(),
};
// Local CallSetup for Bob — peer_direct_addr = Alice's addr.
let setup_for_bob = SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: format!("call-{call_id}"),
relay_addr: RELAY_B_ADDR.into(),
@@ -194,6 +200,7 @@ fn relay_a_handle_forwarded_answer(
SignalMessage::FederatedSignalForward {
inner,
origin_relay_fp,
..
} => (inner.as_ref().clone(), origin_relay_fp.clone()),
_ => panic!("not a forward"),
};
@@ -215,6 +222,7 @@ fn relay_a_handle_forwarded_answer(
// Alice's CallSetup — peer_direct_addr = Bob's addr.
SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: format!("call-{call_id}"),
relay_addr: RELAY_A_ADDR.into(),
@@ -312,6 +320,7 @@ fn cross_relay_loop_prevention_drops_self_sourced_forward() {
// A FederatedSignalForward that circles back to the origin
// relay should be dropped before it hits the call registry.
let forward = SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(alice_offer("c-loop")),
origin_relay_fp: RELAY_B_TLS_FP.into(),
};

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
use wzp_relay::config::{PeerConfig, TrustedConfig};
use wzp_relay::event_log::EventLogger;
use wzp_relay::federation::{FederationManager, room_hash};
@@ -348,7 +348,9 @@ async fn broadcast_signal_sends_to_all_peers() {
.expect("some message");
match hello {
SignalMessage::FederationHello { tls_fingerprint } => {
SignalMessage::FederationHello {
tls_fingerprint, ..
} => {
assert_eq!(tls_fingerprint, "test-relay-fp-abc123");
}
other => panic!(
@@ -367,6 +369,7 @@ async fn broadcast_signal_sends_to_all_peers() {
// Now call broadcast_signal on the FM
let test_msg = SignalMessage::FederatedSignalForward {
version: default_signal_version(),
inner: Box::new(SignalMessage::Reflect),
origin_relay_fp: "other-relay-fp".into(),
};

View File

@@ -9,7 +9,7 @@ use std::sync::Arc;
use wzp_client::perform_handshake;
use wzp_crypto::{KeyExchange, WarzoneKeyExchange};
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
use wzp_relay::handshake::accept_handshake;
use wzp_transport::{QuinnTransport, client_config, create_endpoint, server_config};
@@ -129,6 +129,7 @@ async fn handshake_rejects_v1_protocol_version() {
let signature = kx.sign(&sign_data);
let v1_offer = SignalMessage::CallOffer {
version: 1,
identity_pub,
ephemeral_pub,
signature,
@@ -255,7 +256,7 @@ async fn auth_then_handshake() {
.expect("should receive a message");
let token = match auth_msg {
SignalMessage::AuthToken { token } => token,
SignalMessage::AuthToken { token, .. } => token,
other => panic!(
"expected AuthToken, got {:?}",
std::mem::discriminant(&other)
@@ -273,6 +274,7 @@ async fn auth_then_handshake() {
// Caller side: send AuthToken first, then perform_handshake.
let auth = SignalMessage::AuthToken {
version: default_signal_version(),
token: "bearer-test-token-12345".to_string(),
};
client_transport
@@ -344,6 +346,7 @@ async fn handshake_rejects_bad_signature() {
}
let bad_offer = SignalMessage::CallOffer {
version: default_signal_version(),
identity_pub,
ephemeral_pub,
signature,

View File

@@ -20,7 +20,7 @@
//! to reason about, no real network, and what we actually want to
//! test is the cross-wiring logic, not the whole signal stack.
use wzp_proto::{CallAcceptMode, SignalMessage};
use wzp_proto::{CallAcceptMode, SignalMessage, default_signal_version};
use wzp_relay::call_registry::CallRegistry;
/// Helper: simulate the relay's handling of a DirectCallOffer. In
@@ -77,6 +77,7 @@ fn handle_answer_and_build_setups(
};
let setup_for_caller = SignalMessage::CallSetup {
version: default_signal_version(),
call_id: call_id.clone(),
room: room.clone(),
relay_addr: "203.0.113.5:4433".into(),
@@ -85,6 +86,7 @@ fn handle_answer_and_build_setups(
peer_mapped_addr: None,
};
let setup_for_callee = SignalMessage::CallSetup {
version: default_signal_version(),
call_id,
room,
relay_addr: "203.0.113.5:4433".into(),
@@ -97,6 +99,7 @@ fn handle_answer_and_build_setups(
fn mk_offer(call_id: &str, caller_reflexive_addr: Option<&str>) -> SignalMessage {
SignalMessage::DirectCallOffer {
version: default_signal_version(),
caller_fingerprint: "alice".into(),
caller_alias: None,
target_fingerprint: "bob".into(),
@@ -118,6 +121,7 @@ fn mk_answer(
callee_reflexive_addr: Option<&str>,
) -> SignalMessage {
SignalMessage::DirectCallAnswer {
version: default_signal_version(),
call_id: call_id.into(),
accept_mode: mode,
identity_pub: None,

View File

@@ -63,6 +63,7 @@ async fn spawn_mock_relay() -> (SocketAddr, tokio::task::JoinHandle<()>) {
Ok(Some(SignalMessage::RegisterPresence { .. })) => {
let _ = t
.send_signal(&SignalMessage::RegisterPresenceAck {
version: 1,
success: true,
error: None,
relay_build: None,
@@ -74,6 +75,7 @@ async fn spawn_mock_relay() -> (SocketAddr, tokio::task::JoinHandle<()>) {
Ok(Some(SignalMessage::Reflect)) => {
let _ = t
.send_signal(&SignalMessage::ReflectResponse {
version: 1,
observed_addr: observed_addr.to_string(),
})
.await;

View File

@@ -30,7 +30,7 @@ use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use wzp_proto::{MediaTransport, SignalMessage};
use wzp_proto::{MediaTransport, SignalMessage, default_signal_version};
use wzp_transport::{QuinnTransport, client_config, create_endpoint, server_config};
/// Spawn a minimal mock relay that loops over `recv_signal`,
@@ -49,6 +49,7 @@ async fn spawn_mock_relay_with_reflect(
match server_transport.recv_signal().await {
Ok(Some(SignalMessage::Reflect)) => {
let resp = SignalMessage::ReflectResponse {
version: default_signal_version(),
observed_addr: observed.to_string(),
};
// If the send fails the client has gone; just exit.
@@ -164,7 +165,7 @@ async fn reflect_happy_path() {
.expect("some message");
let observed_addr = match resp {
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
SignalMessage::ReflectResponse { observed_addr, .. } => observed_addr,
other => panic!(
"expected ReflectResponse, got {:?}",
std::mem::discriminant(&other)
@@ -251,7 +252,7 @@ async fn reflect_two_clients_distinct_ports() {
.expect("ok")
.expect("some");
match resp {
SignalMessage::ReflectResponse { observed_addr } => observed_addr,
SignalMessage::ReflectResponse { observed_addr, .. } => observed_addr,
_ => panic!("wrong variant"),
}
};