Add CPU usage monitoring, remove btest-opensource submodule
All checks were successful
CI / test (push) Successful in 2m16s

CPU usage feature:
- New cpu.rs module: background sampler thread, cross-platform (macOS + Linux)
- Status message byte 1 now carries CPU load (0-100%), matching MikroTik format
- Status format corrected: [type][cpu][00][00][seq:4 LE][bytes:4 LE]
- Client and server exchange CPU in every status message
- Display format: "cpu: 40%/12%" (local/remote), "!" warning if > 70%
- Both client and server show local + remote CPU per interval
- Syslog TEST_END could include CPU averages (future enhancement)

Removed btest-opensource submodule — we've fully reimplemented the protocol
with EC-SRP5 auth, multi-connection, IPv6, syslog, CSV, and CPU monitoring.
The original project is still credited in LICENSE and README.

58 tests, all passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-01 10:53:00 +04:00
parent 10dd0c3835
commit 24f634170d
9 changed files with 200 additions and 24 deletions

3
.gitmodules vendored
View File

@@ -1,3 +0,0 @@
[submodule "btest-opensource"]
path = btest-opensource
url = https://github.com/samm-git/btest-opensource

Submodule btest-opensource deleted from 5040a01267

View File

@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64};
use std::sync::Arc;
use std::time::Duration;
@@ -18,6 +18,8 @@ pub struct BandwidthState {
pub total_rx_bytes: AtomicU64,
pub total_lost_packets: AtomicU64,
pub intervals: AtomicU32,
/// Remote peer's CPU usage (received via status messages)
pub remote_cpu: AtomicU8,
}
impl BandwidthState {
@@ -35,6 +37,7 @@ impl BandwidthState {
total_rx_bytes: AtomicU64::new(0),
total_lost_packets: AtomicU64::new(0),
intervals: AtomicU32::new(0),
remote_cpu: AtomicU8::new(0),
})
}
@@ -122,6 +125,18 @@ pub fn print_status(
bytes: u64,
elapsed: Duration,
lost_packets: Option<u64>,
) {
print_status_with_cpu(interval_num, direction, bytes, elapsed, lost_packets, None, None);
}
pub fn print_status_with_cpu(
interval_num: u32,
direction: &str,
bytes: u64,
elapsed: Duration,
lost_packets: Option<u64>,
local_cpu: Option<u8>,
remote_cpu: Option<u8>,
) {
if crate::csv_output::is_quiet() {
return;
@@ -136,13 +151,26 @@ pub fn print_status(
_ => String::new(),
};
let cpu_str = match (local_cpu, remote_cpu) {
(Some(l), Some(r)) => {
let warn = if l > 70 || r > 70 { " !" } else { "" };
format!(" cpu: {}%/{}%{}", l, r, warn)
}
(Some(l), None) => {
let warn = if l > 70 { " !" } else { "" };
format!(" cpu: {}%{}", l, warn)
}
_ => String::new(),
};
println!(
"[{:4}] {:>3} {} ({} bytes){}",
"[{:4}] {:>3} {} ({} bytes){}{}",
interval_num,
direction,
format_bandwidth(bw),
bytes,
loss_str,
cpu_str,
);
}

View File

@@ -382,11 +382,13 @@ async fn client_status_loop(cmd: &Command, state: &BandwidthState) {
let rx = if cmd.client_rx() { state.rx_bytes.swap(0, Ordering::Relaxed) } else { 0 };
state.record_interval(tx, rx, 0);
let local_cpu = crate::cpu::get();
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
if cmd.client_tx() {
bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None);
bandwidth::print_status_with_cpu(seq, "TX", tx, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu));
}
if cmd.client_rx() {
bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), None);
bandwidth::print_status_with_cpu(seq, "RX", rx, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu));
}
}
}
@@ -420,6 +422,7 @@ async fn udp_client_status_loop(
match tokio::time::timeout(wait_time, reader.read_exact(&mut status_buf)).await {
Ok(Ok(_)) => {
let server_status = StatusMessage::deserialize(&status_buf);
state.remote_cpu.store(server_status.cpu_load, Ordering::Relaxed);
if server_status.bytes_received > 0 && cmd.client_tx() {
let new_speed =
@@ -453,7 +456,7 @@ async fn udp_client_status_loop(
let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed);
state.record_interval(tx_bytes, rx_bytes, lost);
let status = StatusMessage {
let status = StatusMessage { cpu_load: crate::cpu::get(),
seq,
bytes_received: rx_bytes as u32,
};
@@ -463,11 +466,13 @@ async fn udp_client_status_loop(
}
let _ = writer.flush().await;
let local_cpu = crate::cpu::get();
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
if cmd.client_tx() {
bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None);
bandwidth::print_status_with_cpu(seq, "TX", tx_bytes, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu));
}
if cmd.client_rx() {
bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost));
bandwidth::print_status_with_cpu(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost), Some(local_cpu), Some(remote_cpu));
}
}
}

132
src/cpu.rs Normal file
View File

@@ -0,0 +1,132 @@
//! Lightweight CPU usage measurement.
//!
//! Returns the system-wide CPU usage as a percentage (0-100).
//! Works on macOS and Linux without external dependencies.
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::Duration;
static CURRENT_CPU: AtomicU8 = AtomicU8::new(0);
/// Start a background thread that samples CPU usage every second.
pub fn start_sampler() {
std::thread::spawn(|| {
let mut prev = get_cpu_times();
loop {
std::thread::sleep(Duration::from_secs(1));
let curr = get_cpu_times();
let usage = compute_usage(&prev, &curr);
CURRENT_CPU.store(usage, Ordering::Relaxed);
prev = curr;
}
});
}
/// Get the current CPU usage percentage (0-100).
pub fn get() -> u8 {
CURRENT_CPU.load(Ordering::Relaxed)
}
// --- Platform-specific implementation ---
#[cfg(target_os = "linux")]
fn get_cpu_times() -> (u64, u64) {
// Read /proc/stat: cpu user nice system idle iowait irq softirq steal
if let Ok(content) = std::fs::read_to_string("/proc/stat") {
if let Some(line) = content.lines().next() {
let parts: Vec<u64> = line
.split_whitespace()
.skip(1) // skip "cpu"
.filter_map(|s| s.parse().ok())
.collect();
if parts.len() >= 4 {
let idle = parts[3];
let total: u64 = parts.iter().sum();
return (total, idle);
}
}
}
(0, 0)
}
#[cfg(target_os = "macos")]
fn get_cpu_times() -> (u64, u64) {
// Use host_statistics to get CPU ticks
use std::mem::MaybeUninit;
extern "C" {
fn mach_host_self() -> u32;
fn host_statistics(
host: u32,
flavor: i32,
info: *mut i32,
count: *mut u32,
) -> i32;
}
const HOST_CPU_LOAD_INFO: i32 = 3;
const CPU_STATE_MAX: usize = 4;
unsafe {
let host = mach_host_self();
let mut info = MaybeUninit::<[u32; CPU_STATE_MAX]>::uninit();
let mut count: u32 = CPU_STATE_MAX as u32;
let ret = host_statistics(
host,
HOST_CPU_LOAD_INFO,
info.as_mut_ptr() as *mut i32,
&mut count,
);
if ret == 0 {
let ticks = info.assume_init();
// ticks: [user, system, idle, nice]
let user = ticks[0] as u64;
let system = ticks[1] as u64;
let idle = ticks[2] as u64;
let nice = ticks[3] as u64;
let total = user + system + idle + nice;
return (total, idle);
}
}
(0, 0)
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn get_cpu_times() -> (u64, u64) {
(0, 0) // Unsupported platform
}
fn compute_usage(prev: &(u64, u64), curr: &(u64, u64)) -> u8 {
let total_diff = curr.0.saturating_sub(prev.0);
let idle_diff = curr.1.saturating_sub(prev.1);
if total_diff == 0 {
return 0;
}
let busy = total_diff - idle_diff;
((busy * 100) / total_diff).min(100) as u8
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cpu_times_returns_nonzero() {
let (total, idle) = get_cpu_times();
// On supported platforms, total should be > 0
if cfg!(any(target_os = "linux", target_os = "macos")) {
assert!(total > 0, "CPU total ticks should be > 0");
assert!(idle <= total, "idle should be <= total");
}
}
#[test]
fn test_compute_usage() {
assert_eq!(compute_usage(&(0, 0), &(100, 20)), 80);
assert_eq!(compute_usage(&(0, 0), &(100, 100)), 0);
assert_eq!(compute_usage(&(0, 0), &(100, 0)), 100);
assert_eq!(compute_usage(&(0, 0), &(0, 0)), 0);
}
}

View File

@@ -1,7 +1,8 @@
pub mod auth;
pub mod csv_output;
pub mod bandwidth;
pub mod client;
pub mod cpu;
pub mod csv_output;
pub mod ecsrp5;
pub mod protocol;
pub mod server;

View File

@@ -1,6 +1,7 @@
mod auth;
mod bandwidth;
mod client;
mod cpu;
pub mod csv_output;
mod ecsrp5;
mod protocol;
@@ -100,6 +101,9 @@ struct Cli {
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
// Start CPU usage sampler
cpu::start_sampler();
// Set up logging based on verbosity
let filter = match cli.verbose {
0 => "info",

View File

@@ -137,23 +137,28 @@ impl Command {
pub struct StatusMessage {
pub seq: u32,
pub bytes_received: u32,
pub cpu_load: u8,
}
impl StatusMessage {
pub fn serialize(&self) -> [u8; STATUS_MSG_SIZE] {
let mut buf = [0u8; STATUS_MSG_SIZE];
buf[0] = STATUS_MSG_TYPE;
buf[1..5].copy_from_slice(&self.seq.to_be_bytes());
buf[5] = 0;
buf[6] = 0;
buf[7] = 0;
// Byte 1: CPU load percentage (0-100)
buf[1] = self.cpu_load;
buf[2] = 0;
buf[3] = 0;
// Bytes 4-7: sequence number (LE)
buf[4..8].copy_from_slice(&self.seq.to_le_bytes());
// Bytes 8-11: bytes received (LE)
buf[8..12].copy_from_slice(&self.bytes_received.to_le_bytes());
buf
}
pub fn deserialize(buf: &[u8; STATUS_MSG_SIZE]) -> Self {
Self {
seq: u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]),
cpu_load: buf[1],
seq: u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]),
bytes_received: u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]),
}
}

View File

@@ -536,7 +536,7 @@ async fn tcp_tx_loop_inner(
if send_status && Instant::now() >= next_status {
status_seq += 1;
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage {
let status = StatusMessage { cpu_load: crate::cpu::get(),
seq: status_seq,
bytes_received: rx_bytes as u32,
};
@@ -615,7 +615,7 @@ async fn tcp_status_sender(
// Swap to get bytes received this interval (atomic reset)
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage {
let status = StatusMessage { cpu_load: crate::cpu::get(),
seq,
bytes_received: rx_bytes as u32,
};
@@ -925,9 +925,10 @@ async fn udp_status_loop(
match tokio::time::timeout(wait_time, reader.read_exact(&mut status_buf)).await {
Ok(Ok(_)) => {
let client_status = StatusMessage::deserialize(&status_buf);
state.remote_cpu.store(client_status.cpu_load, Ordering::Relaxed);
tracing::debug!(
"RECV status: raw={:02x?} seq={} bytes_received={}",
&status_buf, client_status.seq, client_status.bytes_received,
"RECV status: raw={:02x?} seq={} bytes_received={} cpu={}%",
&status_buf, client_status.seq, client_status.bytes_received, client_status.cpu_load,
);
if client_status.bytes_received > 0 && cmd.server_tx() {
@@ -972,7 +973,7 @@ async fn udp_status_loop(
} else {
rx_bytes
};
let status = StatusMessage {
let status = StatusMessage { cpu_load: crate::cpu::get(),
seq,
bytes_received: report_bytes as u32,
};
@@ -990,10 +991,14 @@ async fn udp_status_loop(
// Print local stats and record totals
state.record_interval(tx_bytes, rx_bytes, lost);
if cmd.server_tx() {
bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None);
let local_cpu = crate::cpu::get();
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
bandwidth::print_status_with_cpu(seq, "TX", tx_bytes, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu));
}
if cmd.server_rx() {
bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost));
let local_cpu = crate::cpu::get();
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
bandwidth::print_status_with_cpu(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost), Some(local_cpu), Some(remote_cpu));
}
}
}