T3.5: Tier E per-session token bucket

This commit is contained in:
Siavash Sameni
2026-05-12 06:45:56 +04:00
parent 8454835c18
commit f1b86e0fed
6 changed files with 262 additions and 5 deletions

View File

@@ -1,4 +1,4 @@
//! Relay conformance metering — Tier A/B/C/D enforcement.
//! Relay conformance metering — Tier A/B/C/D/E enforcement.
//!
//! Each participant gets a [`ConformanceMeter`] that tracks per-second
//! traffic against the declared codec's nominal bitrate ceiling.
@@ -23,6 +23,60 @@ pub enum Violation {
TimestampDrift,
/// Sustained payload size exceeds 2× the typical bound for the declared codec (Tier D).
PayloadSizeExceeded,
/// Per-session token-bucket rate cap exceeded (Tier E).
RateCapExceeded,
}
/// Simple token bucket for per-session rate capping (Tier E).
///
/// Tokens represent bytes. The bucket refills at `refill_per_sec` bytes per
/// second, up to `capacity`. A packet is allowed only if the bucket holds
/// enough tokens for its size.
pub struct TokenBucket {
capacity: u64,
tokens: f64,
refill_per_sec: u64,
last_refill: Instant,
}
impl TokenBucket {
/// Create a new bucket with the given byte capacity and refill rate.
pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
Self {
capacity,
tokens: capacity as f64,
refill_per_sec,
last_refill: Instant::now(),
}
}
/// Per-session audio cap: 256 kbps with 30 s @ 2× burst.
/// Capacity = 30 s × 64 KB/s = 1_920_000 bytes.
pub fn for_audio_session() -> Self {
let refill_per_sec = 256_000 / 8; // 32_000 bytes/sec
let capacity = refill_per_sec * 30 * 2; // 1_920_000 bytes
Self::new(capacity, refill_per_sec)
}
/// Attempt to consume `bytes` from the bucket.
///
/// Refills based on elapsed time since the last call, then deducts the
/// cost. Returns `Ok(())` if enough tokens were available, `Err(())`
/// otherwise.
pub fn try_consume(&mut self, bytes: u64, now: Instant) -> Result<(), ()> {
let elapsed = now.duration_since(self.last_refill);
self.last_refill = now;
self.tokens += elapsed.as_secs_f64() * self.refill_per_sec as f64;
if self.tokens > self.capacity as f64 {
self.tokens = self.capacity as f64;
}
if self.tokens >= bytes as f64 {
self.tokens -= bytes as f64;
Ok(())
} else {
Err(())
}
}
}
/// Per-participant traffic conformance meter.
@@ -34,6 +88,8 @@ pub struct ConformanceMeter {
drift_window: VecDeque<(u32, u32)>,
/// EWMA of payload size for Tier D sanity checks.
ewma_payload_size: f64,
/// Optional token bucket for Tier E per-session rate cap.
token_bucket: Option<TokenBucket>,
}
impl ConformanceMeter {
@@ -44,9 +100,17 @@ impl ConformanceMeter {
packets_in_window: 0,
drift_window: VecDeque::with_capacity(DRIFT_WINDOW_SIZE),
ewma_payload_size: 0.0,
token_bucket: None,
}
}
/// Create a meter with a Tier E token bucket for per-session rate capping.
pub fn with_token_bucket(bucket: TokenBucket) -> Self {
let mut meter = Self::new();
meter.token_bucket = Some(bucket);
meter
}
/// Inspect an incoming media packet and accumulate it against the
/// current 1-second window. Returns [`Err(Violation)`] when a limit
/// is crossed.
@@ -113,6 +177,14 @@ impl ConformanceMeter {
return Err(Violation::PayloadSizeExceeded);
}
// Tier E — per-session token-bucket rate cap.
if let Some(ref mut bucket) = self.token_bucket {
let packet_size = (MediaHeader::WIRE_SIZE + payload_len) as u64;
if bucket.try_consume(packet_size, now).is_err() {
return Err(Violation::RateCapExceeded);
}
}
Ok(())
}
}
@@ -388,4 +460,80 @@ mod tests {
);
}
}
// ------------------------------------------------------------------
// Tier E — token-bucket rate cap
// ------------------------------------------------------------------
#[test]
fn token_bucket_small_burst_ok() {
let mut bucket = TokenBucket::new(100_000, 32_000);
let now = Instant::now();
// 50 KB burst fits inside 100 KB capacity.
assert!(bucket.try_consume(50_000, now).is_ok());
}
#[test]
fn token_bucket_large_burst_fails() {
let mut bucket = TokenBucket::new(100_000, 32_000);
let now = Instant::now();
// 1 MB exceeds 100 KB capacity.
assert!(bucket.try_consume(1_000_000, now).is_err());
}
#[test]
fn token_bucket_refills_over_time() {
let mut bucket = TokenBucket::new(100_000, 32_000);
let t0 = Instant::now();
// Drain the bucket.
assert!(bucket.try_consume(100_000, t0).is_ok());
// Immediately try again — should fail.
assert!(bucket.try_consume(10_000, t0).is_err());
// Wait 1 second — bucket refills 32_000 bytes.
let t1 = t0 + Duration::from_secs(1);
assert!(bucket.try_consume(30_000, t1).is_ok());
// 40_000 is more than the 32_000 refilled.
assert!(bucket.try_consume(40_000, t1).is_err());
}
#[test]
fn token_bucket_sustained_rate_balanced() {
let mut bucket = TokenBucket::new(1_000_000, 32_000);
let t0 = Instant::now();
// Send 32 KB every second for 5 seconds — exactly at refill rate.
// The bucket should never empty because each second it refills
// exactly what was consumed.
for i in 0..5 {
let t = t0 + Duration::from_secs(i);
assert!(
bucket.try_consume(32_000, t).is_ok(),
"32 KB/s sustained should stay within bucket limit"
);
}
}
#[test]
fn conformance_tier_e_integration() {
// Use Opus64k (high bitrate ceiling + high payload bound) so Tiers
// A/B/D never fire on the small bursts used here. Only Tier E.
let mut meter = ConformanceMeter::with_token_bucket(TokenBucket::new(1_000, 500));
let header = make_header(CodecId::Opus64k);
let now = Instant::now();
// Two 500-byte (wire) packets = 1_000 bytes — exactly the bucket cap.
assert!(
meter
.observe(&header, 500 - MediaHeader::WIRE_SIZE, now)
.is_ok()
);
assert!(
meter
.observe(&header, 500 - MediaHeader::WIRE_SIZE, now)
.is_ok()
);
// Third packet exceeds the 1_000-byte cap.
let result = meter.observe(&header, 10, now);
assert_eq!(result, Err(Violation::RateCapExceeded));
}
}

View File

@@ -2027,6 +2027,7 @@ async fn main() -> anyhow::Result<()> {
debug_tap,
federation_tx,
federation_room_hash,
authenticated_fp.is_some(),
)
.await;

View File

@@ -406,6 +406,7 @@ impl RelayMetrics {
Violation::PacketRateExceeded => "B",
Violation::TimestampDrift => "C",
Violation::PayloadSizeExceeded => "D",
Violation::RateCapExceeded => "E",
};
let codec_id = format!("{:?}", header.codec_id);
let verdict = format!("{:?}", v);

View File

@@ -758,6 +758,7 @@ pub async fn run_participant(
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
federation_room_hash: Option<[u8; 8]>,
is_authenticated: bool,
) {
if trunking_enabled {
run_participant_trunked(
@@ -767,6 +768,7 @@ pub async fn run_participant(
transport,
metrics,
session_id,
is_authenticated,
)
.await;
} else {
@@ -780,6 +782,7 @@ pub async fn run_participant(
debug_tap,
federation_tx,
federation_room_hash,
is_authenticated,
)
.await;
}
@@ -796,6 +799,7 @@ async fn run_participant_plain(
debug_tap: Option<DebugTap>,
federation_tx: Option<tokio::sync::mpsc::Sender<FederationMediaOut>>,
federation_room_hash: Option<[u8; 8]>,
is_authenticated: bool,
) {
let addr = transport.connection().remote_address();
let mut packets_forwarded = 0u64;
@@ -804,7 +808,13 @@ async fn run_participant_plain(
let mut max_forward_ms = 0u64;
let mut send_errors = 0u64;
let mut last_log_instant = std::time::Instant::now();
let mut conformance = ConformanceMeter::new();
let mut conformance = if is_authenticated {
ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session())
} else {
// Anonymous participants get the same per-session audio cap.
// Monthly quota (1 GB vs 50 GB) is tracked separately.
ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session())
};
let mut tap_stats = if debug_tap.as_ref().map_or(false, |t| t.matches(&room_name)) {
Some(TapStats::new())
@@ -1029,6 +1039,7 @@ async fn run_participant_trunked(
transport: Arc<wzp_transport::QuinnTransport>,
metrics: Arc<RelayMetrics>,
session_id: &str,
_is_authenticated: bool,
) {
use std::collections::HashMap;
@@ -1039,7 +1050,8 @@ async fn run_participant_trunked(
let mut max_forward_ms = 0u64;
let mut send_errors = 0u64;
let mut last_log_instant = std::time::Instant::now();
let mut conformance = ConformanceMeter::new();
let mut conformance =
ConformanceMeter::with_token_bucket(crate::conformance::TokenBucket::for_audio_session());
info!(
room = %room_name,