From d61fdb1b94649e3dfd44fa76c1bd2ff5cde81a4a Mon Sep 17 00:00:00 2001 From: Siavash Sameni Date: Wed, 1 Apr 2026 14:58:19 +0400 Subject: [PATCH] Add monthly quotas, per-IP limits, user management CLI Quota system now supports: - Per-user: daily, weekly, monthly limits - Per-IP: daily, weekly, monthly limits (abuse prevention) - Per-IP connection limit - Max test duration New CLI flags: --monthly-quota, --ip-daily, --ip-weekly, --ip-monthly User management subcommands: btest-server-pro useradd btest-server-pro userdel btest-server-pro userlist btest-server-pro userset --enabled true/false --daily N --weekly N New DB tables: ip_usage (per-IP daily tracking) New methods: get_monthly_usage, get_ip_*_usage, start/end_session, delete_user, set_user_enabled, set_user_quota Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server_pro/main.rs | 133 +++++++++++++++++++++++++++++++++++--- src/server_pro/quota.rs | 114 ++++++++++++++++++++++++++------ src/server_pro/user_db.rs | 132 +++++++++++++++++++++++++++++++++++++ 3 files changed, 350 insertions(+), 29 deletions(-) diff --git a/src/server_pro/main.rs b/src/server_pro/main.rs index 67c7416..e325c12 100644 --- a/src/server_pro/main.rs +++ b/src/server_pro/main.rs @@ -62,6 +62,22 @@ struct Cli { #[arg(long = "weekly-quota", default_value_t = 0)] weekly_quota: u64, + /// Default monthly quota per user in bytes (0 = unlimited) + #[arg(long = "monthly-quota", default_value_t = 0)] + monthly_quota: u64, + + /// Daily bandwidth limit per IP in bytes (0 = unlimited) + #[arg(long = "ip-daily", default_value_t = 0)] + ip_daily: u64, + + /// Weekly bandwidth limit per IP in bytes (0 = unlimited) + #[arg(long = "ip-weekly", default_value_t = 0)] + ip_weekly: u64, + + /// Monthly bandwidth limit per IP in bytes (0 = unlimited) + #[arg(long = "ip-monthly", default_value_t = 0)] + ip_monthly: u64, + /// Maximum concurrent connections per IP (0 = unlimited) #[arg(long = "max-conn-per-ip", default_value_t = 5)] max_conn_per_ip: u32, @@ -85,6 +101,46 @@ struct Cli { /// Verbose logging #[arg(short = 'v', long = "verbose", action = clap::ArgAction::Count)] verbose: u8, + + /// User management subcommand + #[command(subcommand)] + command: Option, +} + +#[derive(clap::Subcommand, Debug)] +enum UserCommand { + /// Add a user + #[command(name = "useradd")] + UserAdd { + /// Username + username: String, + /// Password + password: String, + }, + /// Delete a user + #[command(name = "userdel")] + UserDel { + /// Username + username: String, + }, + /// List all users + #[command(name = "userlist")] + UserList, + /// Enable/disable a user + #[command(name = "userset")] + UserSet { + /// Username + username: String, + /// Enable (true/false) + #[arg(long)] + enabled: Option, + /// Daily quota in bytes + #[arg(long)] + daily: Option, + /// Weekly quota in bytes + #[arg(long)] + weekly: Option, + }, } #[tokio::main] @@ -119,10 +175,60 @@ async fn main() -> anyhow::Result<()> { } // Initialize user database - tracing::info!("Opening user database: {}", cli.users_db); let db = user_db::UserDb::open(&cli.users_db)?; db.ensure_tables()?; - tracing::info!("User database ready ({} users)", db.user_count()?); + + // Handle user management subcommands (exit after) + if let Some(cmd) = &cli.command { + match cmd { + UserCommand::UserAdd { username, password } => { + db.add_user(username, password)?; + println!("User '{}' added.", username); + return Ok(()); + } + UserCommand::UserDel { username } => { + if db.delete_user(username)? { + println!("User '{}' deleted.", username); + } else { + println!("User '{}' not found.", username); + } + return Ok(()); + } + UserCommand::UserList => { + let users = db.list_users()?; + if users.is_empty() { + println!("No users."); + } else { + println!("{:<20} {:<10} {:<15} {:<15}", "USERNAME", "ENABLED", "DAILY_QUOTA", "WEEKLY_QUOTA"); + println!("{}", "-".repeat(60)); + for u in &users { + println!("{:<20} {:<10} {:<15} {:<15}", + u.username, + if u.enabled { "yes" } else { "no" }, + if u.daily_quota == 0 { "default".to_string() } else { format!("{}B", u.daily_quota) }, + if u.weekly_quota == 0 { "default".to_string() } else { format!("{}B", u.weekly_quota) }, + ); + } + } + return Ok(()); + } + UserCommand::UserSet { username, enabled, daily, weekly } => { + if let Some(e) = enabled { + db.set_user_enabled(username, *e)?; + println!("User '{}' enabled={}", username, e); + } + if daily.is_some() || weekly.is_some() { + let d = daily.unwrap_or(0); + let w = weekly.unwrap_or(0); + db.set_user_quota(username, d, w, 0)?; + println!("User '{}' quota: daily={}, weekly={}", username, d, w); + } + return Ok(()); + } + } + } + + tracing::info!("User database: {} ({} users)", cli.users_db, db.user_count()?); // Initialize LDAP if configured if let Some(ref url) = cli.ldap_url { @@ -130,19 +236,30 @@ async fn main() -> anyhow::Result<()> { } // Initialize quota manager - let quota_mgr = quota::QuotaManager::new( + let _quota_mgr = quota::QuotaManager::new( db.clone(), cli.daily_quota, cli.weekly_quota, + cli.monthly_quota, + cli.ip_daily, + cli.ip_weekly, + cli.ip_monthly, cli.max_conn_per_ip, cli.max_duration, ); + + let fmt_q = |v: u64| if v == 0 { "unlimited".to_string() } else { format!("{}B", v) }; tracing::info!( - "Quotas: daily={}, weekly={}, max_conn_per_ip={}, max_duration={}s", - if cli.daily_quota == 0 { "unlimited".to_string() } else { format!("{}", cli.daily_quota) }, - if cli.weekly_quota == 0 { "unlimited".to_string() } else { format!("{}", cli.weekly_quota) }, - cli.max_conn_per_ip, - cli.max_duration, + "User quotas: daily={}, weekly={}, monthly={}", + fmt_q(cli.daily_quota), fmt_q(cli.weekly_quota), fmt_q(cli.monthly_quota), + ); + tracing::info!( + "IP quotas: daily={}, weekly={}, monthly={}", + fmt_q(cli.ip_daily), fmt_q(cli.ip_weekly), fmt_q(cli.ip_monthly), + ); + tracing::info!( + "Limits: max_conn_per_ip={}, max_duration={}s", + cli.max_conn_per_ip, cli.max_duration, ); tracing::info!("btest-server-pro starting on port {}", cli.port); diff --git a/src/server_pro/quota.rs b/src/server_pro/quota.rs index 924d23c..c4c9557 100644 --- a/src/server_pro/quota.rs +++ b/src/server_pro/quota.rs @@ -1,6 +1,6 @@ //! Bandwidth quota management for btest-server-pro. //! -//! Enforces per-user and per-IP bandwidth limits. +//! Enforces per-user and per-IP bandwidth limits (daily/weekly/monthly). use std::collections::HashMap; use std::net::IpAddr; @@ -11,9 +11,17 @@ use super::user_db::UserDb; #[derive(Clone)] pub struct QuotaManager { db: UserDb, + /// Per-user defaults (0 = unlimited) default_daily: u64, default_weekly: u64, + default_monthly: u64, + /// Per-IP limits (0 = unlimited) — for abuse prevention + ip_daily: u64, + ip_weekly: u64, + ip_monthly: u64, + /// Max simultaneous connections from one IP max_conn_per_ip: u32, + /// Max test duration in seconds max_duration: u64, active_connections: Arc>>, } @@ -22,6 +30,10 @@ pub struct QuotaManager { pub enum QuotaError { DailyExceeded { used: u64, limit: u64 }, WeeklyExceeded { used: u64, limit: u64 }, + MonthlyExceeded { used: u64, limit: u64 }, + IpDailyExceeded { used: u64, limit: u64 }, + IpWeeklyExceeded { used: u64, limit: u64 }, + IpMonthlyExceeded { used: u64, limit: u64 }, TooManyConnections { current: u32, limit: u32 }, UserDisabled, UserNotFound, @@ -31,9 +43,17 @@ impl std::fmt::Display for QuotaError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::DailyExceeded { used, limit } => - write!(f, "Daily quota exceeded: {}/{} bytes", used, limit), + write!(f, "User daily quota exceeded: {}/{} bytes", used, limit), Self::WeeklyExceeded { used, limit } => - write!(f, "Weekly quota exceeded: {}/{} bytes", used, limit), + write!(f, "User weekly quota exceeded: {}/{} bytes", used, limit), + Self::MonthlyExceeded { used, limit } => + write!(f, "User monthly quota exceeded: {}/{} bytes", used, limit), + Self::IpDailyExceeded { used, limit } => + write!(f, "IP daily quota exceeded: {}/{} bytes", used, limit), + Self::IpWeeklyExceeded { used, limit } => + write!(f, "IP weekly quota exceeded: {}/{} bytes", used, limit), + Self::IpMonthlyExceeded { used, limit } => + write!(f, "IP monthly quota exceeded: {}/{} bytes", used, limit), Self::TooManyConnections { current, limit } => write!(f, "Too many connections from this IP: {}/{}", current, limit), Self::UserDisabled => write!(f, "User account is disabled"), @@ -47,6 +67,10 @@ impl QuotaManager { db: UserDb, default_daily: u64, default_weekly: u64, + default_monthly: u64, + ip_daily: u64, + ip_weekly: u64, + ip_monthly: u64, max_conn_per_ip: u32, max_duration: u64, ) -> Self { @@ -54,6 +78,10 @@ impl QuotaManager { db, default_daily, default_weekly, + default_monthly, + ip_daily, + ip_weekly, + ip_monthly, max_conn_per_ip, max_duration, active_connections: Arc::new(Mutex::new(HashMap::new())), @@ -70,7 +98,7 @@ impl QuotaManager { return Err(QuotaError::UserDisabled); } - // Check daily quota + // Daily let daily_limit = if user.daily_quota > 0 { user.daily_quota as u64 } else { self.default_daily }; if daily_limit > 0 { let (tx, rx) = self.db.get_daily_usage(username).unwrap_or((0, 0)); @@ -80,7 +108,7 @@ impl QuotaManager { } } - // Check weekly quota + // Weekly let weekly_limit = if user.weekly_quota > 0 { user.weekly_quota as u64 } else { self.default_weekly }; if weekly_limit > 0 { let (tx, rx) = self.db.get_weekly_usage(username).unwrap_or((0, 0)); @@ -90,32 +118,69 @@ impl QuotaManager { } } + // Monthly + if self.default_monthly > 0 { + let (tx, rx) = self.db.get_monthly_usage(username).unwrap_or((0, 0)); + let used = tx + rx; + if used >= self.default_monthly { + return Err(QuotaError::MonthlyExceeded { used, limit: self.default_monthly }); + } + } + Ok(()) } - /// Check if an IP is allowed to connect. + /// Check if an IP is allowed to connect (connection count + bandwidth quotas). pub fn check_ip(&self, ip: &IpAddr) -> Result<(), QuotaError> { - if self.max_conn_per_ip == 0 { - return Ok(()); + // Connection limit + if self.max_conn_per_ip > 0 { + let conns = self.active_connections.lock().unwrap(); + let current = conns.get(ip).copied().unwrap_or(0); + if current >= self.max_conn_per_ip { + return Err(QuotaError::TooManyConnections { + current, + limit: self.max_conn_per_ip, + }); + } } - let conns = self.active_connections.lock().unwrap(); - let current = conns.get(ip).copied().unwrap_or(0); - if current >= self.max_conn_per_ip { - return Err(QuotaError::TooManyConnections { - current, - limit: self.max_conn_per_ip, - }); + + let ip_str = ip.to_string(); + + // IP daily + if self.ip_daily > 0 { + let (tx, rx) = self.db.get_ip_daily_usage(&ip_str).unwrap_or((0, 0)); + let used = tx + rx; + if used >= self.ip_daily { + return Err(QuotaError::IpDailyExceeded { used, limit: self.ip_daily }); + } } + + // IP weekly + if self.ip_weekly > 0 { + let (tx, rx) = self.db.get_ip_weekly_usage(&ip_str).unwrap_or((0, 0)); + let used = tx + rx; + if used >= self.ip_weekly { + return Err(QuotaError::IpWeeklyExceeded { used, limit: self.ip_weekly }); + } + } + + // IP monthly + if self.ip_monthly > 0 { + let (tx, rx) = self.db.get_ip_monthly_usage(&ip_str).unwrap_or((0, 0)); + let used = tx + rx; + if used >= self.ip_monthly { + return Err(QuotaError::IpMonthlyExceeded { used, limit: self.ip_monthly }); + } + } + Ok(()) } - /// Register an active connection from an IP. pub fn connect(&self, ip: &IpAddr) { let mut conns = self.active_connections.lock().unwrap(); *conns.entry(*ip).or_insert(0) += 1; } - /// Unregister a connection from an IP. pub fn disconnect(&self, ip: &IpAddr) { let mut conns = self.active_connections.lock().unwrap(); if let Some(count) = conns.get_mut(ip) { @@ -126,15 +191,22 @@ impl QuotaManager { } } - /// Record usage after a test completes. - pub fn record_usage(&self, username: &str, tx_bytes: u64, rx_bytes: u64) { + /// Record usage after a test completes (both user and IP). + pub fn record_usage(&self, username: &str, ip: &str, tx_bytes: u64, rx_bytes: u64) { if let Err(e) = self.db.record_usage(username, tx_bytes, rx_bytes) { - tracing::error!("Failed to record usage for {}: {}", username, e); + tracing::error!("Failed to record user usage for {}: {}", username, e); + } + if let Err(e) = self.db.record_ip_usage(ip, tx_bytes, rx_bytes) { + tracing::error!("Failed to record IP usage for {}: {}", ip, e); } } - /// Get the maximum test duration in seconds. pub fn max_duration(&self) -> u64 { self.max_duration } + + pub fn active_connections_count(&self, ip: &IpAddr) -> u32 { + let conns = self.active_connections.lock().unwrap(); + conns.get(ip).copied().unwrap_or(0) + } } diff --git a/src/server_pro/user_db.rs b/src/server_pro/user_db.rs index 5993b42..9462ef5 100644 --- a/src/server_pro/user_db.rs +++ b/src/server_pro/user_db.rs @@ -61,6 +61,16 @@ impl UserDb { UNIQUE(username, date) ); + CREATE TABLE IF NOT EXISTS ip_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ip TEXT NOT NULL, + date TEXT NOT NULL, + tx_bytes INTEGER DEFAULT 0, + rx_bytes INTEGER DEFAULT 0, + test_count INTEGER DEFAULT 0, + UNIQUE(ip, date) + ); + CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, @@ -74,6 +84,7 @@ impl UserDb { ); CREATE INDEX IF NOT EXISTS idx_usage_user_date ON usage(username, date); + CREATE INDEX IF NOT EXISTS idx_ip_usage_date ON ip_usage(ip, date); CREATE INDEX IF NOT EXISTS idx_sessions_peer ON sessions(peer_ip, started_at); ")?; Ok(()) @@ -166,6 +177,127 @@ impl UserDb { Ok(result) } + pub fn get_monthly_usage(&self, username: &str) -> anyhow::Result<(u64, u64)> { + let conn = self.conn.lock().unwrap(); + let result = conn.query_row( + "SELECT COALESCE(SUM(tx_bytes),0), COALESCE(SUM(rx_bytes),0) FROM usage + WHERE username = ?1 AND date >= date('now', '-30 days')", + params![username], + |row| { + let a: i64 = row.get(0)?; + let b: i64 = row.get(1)?; + Ok((a as u64, b as u64)) + }, + )?; + Ok(result) + } + + // --- Per-IP usage tracking --- + + pub fn record_ip_usage(&self, ip: &str, tx_bytes: u64, rx_bytes: u64) -> anyhow::Result<()> { + let conn = self.conn.lock().unwrap(); + let today = chrono_date_today(); + conn.execute( + "INSERT INTO ip_usage (ip, date, tx_bytes, rx_bytes, test_count) + VALUES (?1, ?2, ?3, ?4, 1) + ON CONFLICT(ip, date) DO UPDATE SET + tx_bytes = tx_bytes + ?3, + rx_bytes = rx_bytes + ?4, + test_count = test_count + 1", + params![ip, today, tx_bytes as i64, rx_bytes as i64], + )?; + Ok(()) + } + + pub fn get_ip_daily_usage(&self, ip: &str) -> anyhow::Result<(u64, u64)> { + let conn = self.conn.lock().unwrap(); + let today = chrono_date_today(); + let result = conn.query_row( + "SELECT COALESCE(SUM(tx_bytes),0), COALESCE(SUM(rx_bytes),0) FROM ip_usage WHERE ip = ?1 AND date = ?2", + params![ip, today], + |row| { + let a: i64 = row.get(0)?; + let b: i64 = row.get(1)?; + Ok((a as u64, b as u64)) + }, + )?; + Ok(result) + } + + pub fn get_ip_weekly_usage(&self, ip: &str) -> anyhow::Result<(u64, u64)> { + let conn = self.conn.lock().unwrap(); + let result = conn.query_row( + "SELECT COALESCE(SUM(tx_bytes),0), COALESCE(SUM(rx_bytes),0) FROM ip_usage + WHERE ip = ?1 AND date >= date('now', '-7 days')", + params![ip], + |row| { + let a: i64 = row.get(0)?; + let b: i64 = row.get(1)?; + Ok((a as u64, b as u64)) + }, + )?; + Ok(result) + } + + pub fn get_ip_monthly_usage(&self, ip: &str) -> anyhow::Result<(u64, u64)> { + let conn = self.conn.lock().unwrap(); + let result = conn.query_row( + "SELECT COALESCE(SUM(tx_bytes),0), COALESCE(SUM(rx_bytes),0) FROM ip_usage + WHERE ip = ?1 AND date >= date('now', '-30 days')", + params![ip], + |row| { + let a: i64 = row.get(0)?; + let b: i64 = row.get(1)?; + Ok((a as u64, b as u64)) + }, + )?; + Ok(result) + } + + // --- Session tracking --- + + pub fn start_session(&self, username: &str, peer_ip: &str, protocol: &str, direction: &str) -> anyhow::Result { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO sessions (username, peer_ip, protocol, direction) VALUES (?1, ?2, ?3, ?4)", + params![username, peer_ip, protocol, direction], + )?; + Ok(conn.last_insert_rowid()) + } + + pub fn end_session(&self, session_id: i64, tx_bytes: u64, rx_bytes: u64) -> anyhow::Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE sessions SET ended_at = datetime('now'), tx_bytes = ?1, rx_bytes = ?2 WHERE id = ?3", + params![tx_bytes as i64, rx_bytes as i64, session_id], + )?; + Ok(()) + } + + pub fn delete_user(&self, username: &str) -> anyhow::Result { + let conn = self.conn.lock().unwrap(); + let rows = conn.execute("DELETE FROM users WHERE username = ?1", params![username])?; + Ok(rows > 0) + } + + pub fn set_user_enabled(&self, username: &str, enabled: bool) -> anyhow::Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE users SET enabled = ?1 WHERE username = ?2", + params![enabled as i32, username], + )?; + Ok(()) + } + + pub fn set_user_quota(&self, username: &str, daily: i64, weekly: i64, monthly: i64) -> anyhow::Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE users SET daily_quota = ?1, weekly_quota = ?2 WHERE username = ?3", + params![daily, weekly, username], + )?; + Ok(()) + } + pub fn list_users(&self) -> anyhow::Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare(