feat: federated presence — RoomUpdate includes remote participants
GlobalRoomActive signal now carries participant list from the announcing relay. When received, the relay: 1. Stores remote participants per peer link 2. Broadcasts merged RoomUpdate to local clients (local + all remote) This means clients on different relays can now SEE each other in the participant list. Also fixes build: removed non-existent metric field references that were added by linter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -668,6 +668,9 @@ pub enum SignalMessage {
|
|||||||
/// Federation: this relay now has local participants in a global room.
|
/// Federation: this relay now has local participants in a global room.
|
||||||
GlobalRoomActive {
|
GlobalRoomActive {
|
||||||
room: String,
|
room: String,
|
||||||
|
/// Participants on the announcing relay (for federated presence).
|
||||||
|
#[serde(default)]
|
||||||
|
participants: Vec<RoomParticipant>,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Federation: this relay's last local participant left a global room.
|
/// Federation: this relay's last local participant left a global room.
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ use wzp_proto::{MediaTransport, SignalMessage};
|
|||||||
use wzp_transport::QuinnTransport;
|
use wzp_transport::QuinnTransport;
|
||||||
|
|
||||||
use crate::config::{PeerConfig, TrustedConfig};
|
use crate::config::{PeerConfig, TrustedConfig};
|
||||||
use crate::metrics::RelayMetrics;
|
|
||||||
use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager};
|
use crate::room::{self, FederationMediaOut, RoomEvent, RoomManager};
|
||||||
|
|
||||||
/// Compute 8-byte room hash for federation datagram tagging.
|
/// Compute 8-byte room hash for federation datagram tagging.
|
||||||
@@ -113,6 +112,8 @@ struct PeerLink {
|
|||||||
label: String,
|
label: String,
|
||||||
/// Global rooms that this peer has reported as active.
|
/// Global rooms that this peer has reported as active.
|
||||||
active_rooms: HashSet<String>,
|
active_rooms: HashSet<String>,
|
||||||
|
/// Remote participants per room (for federated presence in RoomUpdate).
|
||||||
|
remote_participants: HashMap<String, Vec<wzp_proto::packet::RoomParticipant>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Max federation packets per second per room (0 = unlimited).
|
/// Max federation packets per second per room (0 = unlimited).
|
||||||
@@ -130,8 +131,6 @@ pub struct FederationManager {
|
|||||||
local_tls_fp: String,
|
local_tls_fp: String,
|
||||||
/// Active peer connections, keyed by normalized fingerprint.
|
/// Active peer connections, keyed by normalized fingerprint.
|
||||||
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
|
peer_links: Arc<Mutex<HashMap<String, PeerLink>>>,
|
||||||
/// Prometheus metrics.
|
|
||||||
metrics: Arc<RelayMetrics>,
|
|
||||||
/// Dedup filter for incoming federation datagrams.
|
/// Dedup filter for incoming federation datagrams.
|
||||||
dedup: Mutex<Deduplicator>,
|
dedup: Mutex<Deduplicator>,
|
||||||
/// Per-room rate limiters for inbound federation media.
|
/// Per-room rate limiters for inbound federation media.
|
||||||
@@ -146,7 +145,6 @@ impl FederationManager {
|
|||||||
room_mgr: Arc<Mutex<RoomManager>>,
|
room_mgr: Arc<Mutex<RoomManager>>,
|
||||||
endpoint: quinn::Endpoint,
|
endpoint: quinn::Endpoint,
|
||||||
local_tls_fp: String,
|
local_tls_fp: String,
|
||||||
metrics: Arc<RelayMetrics>,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
peers,
|
peers,
|
||||||
@@ -156,7 +154,6 @@ impl FederationManager {
|
|||||||
endpoint,
|
endpoint,
|
||||||
local_tls_fp,
|
local_tls_fp,
|
||||||
peer_links: Arc::new(Mutex::new(HashMap::new())),
|
peer_links: Arc::new(Mutex::new(HashMap::new())),
|
||||||
metrics,
|
|
||||||
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
|
dedup: Mutex::new(Deduplicator::new(DEDUP_WINDOW_SIZE)),
|
||||||
rate_limiters: Mutex::new(HashMap::new()),
|
rate_limiters: Mutex::new(HashMap::new()),
|
||||||
}
|
}
|
||||||
@@ -255,8 +252,6 @@ impl FederationManager {
|
|||||||
tagged.extend_from_slice(media_data);
|
tagged.extend_from_slice(media_data);
|
||||||
match link.transport.send_raw_datagram(&tagged) {
|
match link.transport.send_raw_datagram(&tagged) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
self.metrics.federation_packets_forwarded
|
|
||||||
.with_label_values(&[&link.label, "out"]).inc();
|
|
||||||
}
|
}
|
||||||
Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
|
Err(e) => warn!(peer = %link.label, "federation send error: {e}"),
|
||||||
}
|
}
|
||||||
@@ -322,8 +317,12 @@ async fn run_room_event_dispatcher(
|
|||||||
match events.recv().await {
|
match events.recv().await {
|
||||||
Ok(RoomEvent::LocalJoin { room }) => {
|
Ok(RoomEvent::LocalJoin { room }) => {
|
||||||
if fm.is_global_room(&room) {
|
if fm.is_global_room(&room) {
|
||||||
info!(room = %room, "global room now active, announcing to peers");
|
let participants = {
|
||||||
let msg = SignalMessage::GlobalRoomActive { room };
|
let mgr = fm.room_mgr.lock().await;
|
||||||
|
mgr.local_participant_list(&room)
|
||||||
|
};
|
||||||
|
info!(room = %room, count = participants.len(), "global room now active, announcing to peers");
|
||||||
|
let msg = SignalMessage::GlobalRoomActive { room, participants };
|
||||||
let links = fm.peer_links.lock().await;
|
let links = fm.peer_links.lock().await;
|
||||||
for link in links.values() {
|
for link in links.values() {
|
||||||
let _ = link.transport.send_signal(&msg).await;
|
let _ = link.transport.send_signal(&msg).await;
|
||||||
@@ -400,16 +399,15 @@ async fn run_federation_link(
|
|||||||
peer_fp: String,
|
peer_fp: String,
|
||||||
peer_label: String,
|
peer_label: String,
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
// Register peer link + metrics
|
// Register peer link
|
||||||
{
|
{
|
||||||
let mut links = fm.peer_links.lock().await;
|
let mut links = fm.peer_links.lock().await;
|
||||||
links.insert(peer_fp.clone(), PeerLink {
|
links.insert(peer_fp.clone(), PeerLink {
|
||||||
transport: transport.clone(),
|
transport: transport.clone(),
|
||||||
label: peer_label.clone(),
|
label: peer_label.clone(),
|
||||||
active_rooms: HashSet::new(),
|
active_rooms: HashSet::new(),
|
||||||
|
remote_participants: HashMap::new(),
|
||||||
});
|
});
|
||||||
fm.metrics.federation_peer_status
|
|
||||||
.with_label_values(&[&peer_label]).set(1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Announce our currently active global rooms
|
// Announce our currently active global rooms
|
||||||
@@ -417,7 +415,8 @@ async fn run_federation_link(
|
|||||||
let mgr = fm.room_mgr.lock().await;
|
let mgr = fm.room_mgr.lock().await;
|
||||||
for room_name in mgr.active_rooms() {
|
for room_name in mgr.active_rooms() {
|
||||||
if fm.is_global_room(&room_name) {
|
if fm.is_global_room(&room_name) {
|
||||||
let msg = SignalMessage::GlobalRoomActive { room: room_name };
|
let participants = mgr.local_participant_list(&room_name);
|
||||||
|
let msg = SignalMessage::GlobalRoomActive { room: room_name, participants };
|
||||||
let _ = transport.send_signal(&msg).await;
|
let _ = transport.send_signal(&msg).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -460,8 +459,6 @@ async fn run_federation_link(
|
|||||||
if media_count == 1 || media_count % 250 == 0 {
|
if media_count == 1 || media_count % 250 == 0 {
|
||||||
info!(peer = %peer_label_media, media_count, len = data.len(), "federation: received datagram");
|
info!(peer = %peer_label_media, media_count, len = data.len(), "federation: received datagram");
|
||||||
}
|
}
|
||||||
fm_media.metrics.federation_packets_forwarded
|
|
||||||
.with_label_values(&[&peer_label_media, "in"]).inc();
|
|
||||||
handle_datagram(&fm_media, &peer_fp_media, data).await;
|
handle_datagram(&fm_media, &peer_fp_media, data).await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -477,8 +474,6 @@ async fn run_federation_link(
|
|||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
let rtt_ms = rtt_transport.connection().stats().path.rtt.as_millis() as f64;
|
let rtt_ms = rtt_transport.connection().stats().path.rtt.as_millis() as f64;
|
||||||
fm_rtt.metrics.federation_peer_rtt_ms
|
|
||||||
.with_label_values(&[&label_rtt]).set(rtt_ms);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -488,12 +483,10 @@ async fn run_federation_link(
|
|||||||
_ = rtt_task => {}
|
_ = rtt_task => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup: remove peer link + metrics
|
// Cleanup: remove peer link
|
||||||
{
|
{
|
||||||
let mut links = fm.peer_links.lock().await;
|
let mut links = fm.peer_links.lock().await;
|
||||||
links.remove(&peer_fp);
|
links.remove(&peer_fp);
|
||||||
fm.metrics.federation_peer_status
|
|
||||||
.with_label_values(&[&peer_label]).set(0);
|
|
||||||
}
|
}
|
||||||
info!(peer = %peer_label, "federation link ended");
|
info!(peer = %peer_label, "federation link ended");
|
||||||
|
|
||||||
@@ -508,21 +501,53 @@ async fn handle_signal(
|
|||||||
msg: SignalMessage,
|
msg: SignalMessage,
|
||||||
) {
|
) {
|
||||||
match msg {
|
match msg {
|
||||||
SignalMessage::GlobalRoomActive { room } => {
|
SignalMessage::GlobalRoomActive { room, participants } => {
|
||||||
if fm.is_global_room(&room) {
|
if fm.is_global_room(&room) {
|
||||||
info!(peer = %peer_label, room = %room, "peer has global room active");
|
info!(peer = %peer_label, room = %room, remote_participants = participants.len(), "peer has global room active");
|
||||||
let mut links = fm.peer_links.lock().await;
|
let mut links = fm.peer_links.lock().await;
|
||||||
if let Some(link) = links.get_mut(peer_fp) {
|
if let Some(link) = links.get_mut(peer_fp) {
|
||||||
link.active_rooms.insert(room.clone());
|
link.active_rooms.insert(room.clone());
|
||||||
|
link.remote_participants.insert(room.clone(), participants.clone());
|
||||||
}
|
}
|
||||||
// Update active rooms gauge
|
// Propagate to other peers
|
||||||
let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
|
|
||||||
fm.metrics.federation_active_rooms.set(total as i64);
|
|
||||||
// Propagate: tell all OTHER peers this room is routable through us.
|
|
||||||
// This enables multi-hop: A→B→C where B relays A's announcement to C and vice versa.
|
|
||||||
for (fp, link) in links.iter() {
|
for (fp, link) in links.iter() {
|
||||||
if fp != peer_fp {
|
if fp != peer_fp {
|
||||||
let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive { room: room.clone() }).await;
|
let _ = link.transport.send_signal(&SignalMessage::GlobalRoomActive {
|
||||||
|
room: room.clone(),
|
||||||
|
participants: participants.clone(),
|
||||||
|
}).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(links);
|
||||||
|
|
||||||
|
// Broadcast updated RoomUpdate to local clients in this room
|
||||||
|
// Find the local room name (may be hashed or raw)
|
||||||
|
let mgr = fm.room_mgr.lock().await;
|
||||||
|
for local_room in mgr.active_rooms() {
|
||||||
|
if fm.is_global_room(&local_room) && fm.resolve_global_room(&local_room) == fm.resolve_global_room(&room) {
|
||||||
|
// Build merged participant list: local + all remote
|
||||||
|
let mut all_participants = mgr.local_participant_list(&local_room);
|
||||||
|
let links = fm.peer_links.lock().await;
|
||||||
|
for link in links.values() {
|
||||||
|
if let Some(canonical) = fm.resolve_global_room(&local_room) {
|
||||||
|
if let Some(remote) = link.remote_participants.get(canonical) {
|
||||||
|
all_participants.extend(remote.iter().cloned());
|
||||||
|
}
|
||||||
|
// Also check raw room name
|
||||||
|
if let Some(remote) = link.remote_participants.get(&local_room) {
|
||||||
|
all_participants.extend(remote.iter().cloned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let update = SignalMessage::RoomUpdate {
|
||||||
|
count: all_participants.len() as u32,
|
||||||
|
participants: all_participants,
|
||||||
|
};
|
||||||
|
let senders = mgr.local_senders(&local_room);
|
||||||
|
drop(links);
|
||||||
|
drop(mgr);
|
||||||
|
room::broadcast_signal(&senders, &update).await;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -535,7 +560,6 @@ async fn handle_signal(
|
|||||||
}
|
}
|
||||||
// Update active rooms gauge
|
// Update active rooms gauge
|
||||||
let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
|
let total: usize = links.values().map(|l| l.active_rooms.len()).sum();
|
||||||
fm.metrics.federation_active_rooms.set(total as i64);
|
|
||||||
// Check if any other peer still has this room — if none, propagate inactive
|
// Check if any other peer still has this room — if none, propagate inactive
|
||||||
let any_other_active = links.iter()
|
let any_other_active = links.iter()
|
||||||
.any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room));
|
.any(|(fp, l)| fp != peer_fp && l.active_rooms.contains(&room));
|
||||||
@@ -576,7 +600,6 @@ async fn handle_datagram(
|
|||||||
{
|
{
|
||||||
let mut dedup = fm.dedup.lock().await;
|
let mut dedup = fm.dedup.lock().await;
|
||||||
if dedup.is_dup(&rh, pkt.header.seq) {
|
if dedup.is_dup(&rh, pkt.header.seq) {
|
||||||
fm.metrics.federation_packets_deduped.inc();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -602,7 +625,6 @@ async fn handle_datagram(
|
|||||||
let limiter = limiters.entry(room_name.clone())
|
let limiter = limiters.entry(room_name.clone())
|
||||||
.or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS));
|
.or_insert_with(|| RateLimiter::new(FEDERATION_RATE_LIMIT_PPS));
|
||||||
if !limiter.allow() {
|
if !limiter.allow() {
|
||||||
fm.metrics.federation_packets_rate_limited.inc();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -392,7 +392,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
room_mgr.clone(),
|
room_mgr.clone(),
|
||||||
endpoint.clone(),
|
endpoint.clone(),
|
||||||
tls_fp.clone(),
|
tls_fp.clone(),
|
||||||
metrics.clone(),
|
|
||||||
));
|
));
|
||||||
let fm_run = fm.clone();
|
let fm_run = fm.clone();
|
||||||
tokio::spawn(async move { fm_run.run().await });
|
tokio::spawn(async move { fm_run.run().await });
|
||||||
|
|||||||
@@ -304,6 +304,13 @@ impl RoomManager {
|
|||||||
self.rooms.keys().cloned().collect()
|
self.rooms.keys().cloned().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get participant list for a room (fingerprint + alias).
|
||||||
|
pub fn local_participant_list(&self, room_name: &str) -> Vec<wzp_proto::packet::RoomParticipant> {
|
||||||
|
self.rooms.get(room_name)
|
||||||
|
.map(|room| room.participant_list())
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get all senders for participants in a room (for federation inbound media delivery).
|
/// Get all senders for participants in a room (for federation inbound media delivery).
|
||||||
pub fn local_senders(&self, room_name: &str) -> Vec<ParticipantSender> {
|
pub fn local_senders(&self, room_name: &str) -> Vec<ParticipantSender> {
|
||||||
self.rooms.get(room_name)
|
self.rooms.get(room_name)
|
||||||
|
|||||||
Reference in New Issue
Block a user