diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 32de8b0..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "btest-opensource"] - path = btest-opensource - url = https://github.com/samm-git/btest-opensource diff --git a/btest-opensource b/btest-opensource deleted file mode 160000 index 5040a01..0000000 --- a/btest-opensource +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 5040a01267c3578d97bb14fa09ccf54de0f179bc diff --git a/src/bandwidth.rs b/src/bandwidth.rs index d65f0e6..b895be5 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64}; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64}; use std::sync::Arc; use std::time::Duration; @@ -18,6 +18,8 @@ pub struct BandwidthState { pub total_rx_bytes: AtomicU64, pub total_lost_packets: AtomicU64, pub intervals: AtomicU32, + /// Remote peer's CPU usage (received via status messages) + pub remote_cpu: AtomicU8, } impl BandwidthState { @@ -35,6 +37,7 @@ impl BandwidthState { total_rx_bytes: AtomicU64::new(0), total_lost_packets: AtomicU64::new(0), intervals: AtomicU32::new(0), + remote_cpu: AtomicU8::new(0), }) } @@ -122,6 +125,18 @@ pub fn print_status( bytes: u64, elapsed: Duration, lost_packets: Option, +) { + print_status_with_cpu(interval_num, direction, bytes, elapsed, lost_packets, None, None); +} + +pub fn print_status_with_cpu( + interval_num: u32, + direction: &str, + bytes: u64, + elapsed: Duration, + lost_packets: Option, + local_cpu: Option, + remote_cpu: Option, ) { if crate::csv_output::is_quiet() { return; @@ -136,13 +151,26 @@ pub fn print_status( _ => String::new(), }; + let cpu_str = match (local_cpu, remote_cpu) { + (Some(l), Some(r)) => { + let warn = if l > 70 || r > 70 { " !" } else { "" }; + format!(" cpu: {}%/{}%{}", l, r, warn) + } + (Some(l), None) => { + let warn = if l > 70 { " !" } else { "" }; + format!(" cpu: {}%{}", l, warn) + } + _ => String::new(), + }; + println!( - "[{:4}] {:>3} {} ({} bytes){}", + "[{:4}] {:>3} {} ({} bytes){}{}", interval_num, direction, format_bandwidth(bw), bytes, loss_str, + cpu_str, ); } diff --git a/src/client.rs b/src/client.rs index 1fe21dd..4424d81 100644 --- a/src/client.rs +++ b/src/client.rs @@ -382,11 +382,13 @@ async fn client_status_loop(cmd: &Command, state: &BandwidthState) { let rx = if cmd.client_rx() { state.rx_bytes.swap(0, Ordering::Relaxed) } else { 0 }; state.record_interval(tx, rx, 0); + let local_cpu = crate::cpu::get(); + let remote_cpu = state.remote_cpu.load(Ordering::Relaxed); if cmd.client_tx() { - bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); + bandwidth::print_status_with_cpu(seq, "TX", tx, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu)); } if cmd.client_rx() { - bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), None); + bandwidth::print_status_with_cpu(seq, "RX", rx, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu)); } } } @@ -420,6 +422,7 @@ async fn udp_client_status_loop( match tokio::time::timeout(wait_time, reader.read_exact(&mut status_buf)).await { Ok(Ok(_)) => { let server_status = StatusMessage::deserialize(&status_buf); + state.remote_cpu.store(server_status.cpu_load, Ordering::Relaxed); if server_status.bytes_received > 0 && cmd.client_tx() { let new_speed = @@ -453,7 +456,7 @@ async fn udp_client_status_loop( let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed); state.record_interval(tx_bytes, rx_bytes, lost); - let status = StatusMessage { + let status = StatusMessage { cpu_load: crate::cpu::get(), seq, bytes_received: rx_bytes as u32, }; @@ -463,11 +466,13 @@ async fn udp_client_status_loop( } let _ = writer.flush().await; + let local_cpu = crate::cpu::get(); + let remote_cpu = state.remote_cpu.load(Ordering::Relaxed); if cmd.client_tx() { - bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None); + bandwidth::print_status_with_cpu(seq, "TX", tx_bytes, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu)); } if cmd.client_rx() { - bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost)); + bandwidth::print_status_with_cpu(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost), Some(local_cpu), Some(remote_cpu)); } } } diff --git a/src/cpu.rs b/src/cpu.rs new file mode 100644 index 0000000..ab86655 --- /dev/null +++ b/src/cpu.rs @@ -0,0 +1,132 @@ +//! Lightweight CPU usage measurement. +//! +//! Returns the system-wide CPU usage as a percentage (0-100). +//! Works on macOS and Linux without external dependencies. + +use std::sync::atomic::{AtomicU8, Ordering}; +use std::time::Duration; + +static CURRENT_CPU: AtomicU8 = AtomicU8::new(0); + +/// Start a background thread that samples CPU usage every second. +pub fn start_sampler() { + std::thread::spawn(|| { + let mut prev = get_cpu_times(); + loop { + std::thread::sleep(Duration::from_secs(1)); + let curr = get_cpu_times(); + let usage = compute_usage(&prev, &curr); + CURRENT_CPU.store(usage, Ordering::Relaxed); + prev = curr; + } + }); +} + +/// Get the current CPU usage percentage (0-100). +pub fn get() -> u8 { + CURRENT_CPU.load(Ordering::Relaxed) +} + +// --- Platform-specific implementation --- + +#[cfg(target_os = "linux")] +fn get_cpu_times() -> (u64, u64) { + // Read /proc/stat: cpu user nice system idle iowait irq softirq steal + if let Ok(content) = std::fs::read_to_string("/proc/stat") { + if let Some(line) = content.lines().next() { + let parts: Vec = line + .split_whitespace() + .skip(1) // skip "cpu" + .filter_map(|s| s.parse().ok()) + .collect(); + if parts.len() >= 4 { + let idle = parts[3]; + let total: u64 = parts.iter().sum(); + return (total, idle); + } + } + } + (0, 0) +} + +#[cfg(target_os = "macos")] +fn get_cpu_times() -> (u64, u64) { + // Use host_statistics to get CPU ticks + use std::mem::MaybeUninit; + + extern "C" { + fn mach_host_self() -> u32; + fn host_statistics( + host: u32, + flavor: i32, + info: *mut i32, + count: *mut u32, + ) -> i32; + } + + const HOST_CPU_LOAD_INFO: i32 = 3; + const CPU_STATE_MAX: usize = 4; + + unsafe { + let host = mach_host_self(); + let mut info = MaybeUninit::<[u32; CPU_STATE_MAX]>::uninit(); + let mut count: u32 = CPU_STATE_MAX as u32; + + let ret = host_statistics( + host, + HOST_CPU_LOAD_INFO, + info.as_mut_ptr() as *mut i32, + &mut count, + ); + + if ret == 0 { + let ticks = info.assume_init(); + // ticks: [user, system, idle, nice] + let user = ticks[0] as u64; + let system = ticks[1] as u64; + let idle = ticks[2] as u64; + let nice = ticks[3] as u64; + let total = user + system + idle + nice; + return (total, idle); + } + } + (0, 0) +} + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +fn get_cpu_times() -> (u64, u64) { + (0, 0) // Unsupported platform +} + +fn compute_usage(prev: &(u64, u64), curr: &(u64, u64)) -> u8 { + let total_diff = curr.0.saturating_sub(prev.0); + let idle_diff = curr.1.saturating_sub(prev.1); + if total_diff == 0 { + return 0; + } + let busy = total_diff - idle_diff; + ((busy * 100) / total_diff).min(100) as u8 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cpu_times_returns_nonzero() { + let (total, idle) = get_cpu_times(); + // On supported platforms, total should be > 0 + if cfg!(any(target_os = "linux", target_os = "macos")) { + assert!(total > 0, "CPU total ticks should be > 0"); + assert!(idle <= total, "idle should be <= total"); + } + } + + #[test] + fn test_compute_usage() { + assert_eq!(compute_usage(&(0, 0), &(100, 20)), 80); + assert_eq!(compute_usage(&(0, 0), &(100, 100)), 0); + assert_eq!(compute_usage(&(0, 0), &(100, 0)), 100); + assert_eq!(compute_usage(&(0, 0), &(0, 0)), 0); + } +} diff --git a/src/lib.rs b/src/lib.rs index 73694dd..297ff9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,8 @@ pub mod auth; -pub mod csv_output; pub mod bandwidth; pub mod client; +pub mod cpu; +pub mod csv_output; pub mod ecsrp5; pub mod protocol; pub mod server; diff --git a/src/main.rs b/src/main.rs index 8ebae03..956706d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod auth; mod bandwidth; mod client; +mod cpu; pub mod csv_output; mod ecsrp5; mod protocol; @@ -100,6 +101,9 @@ struct Cli { async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); + // Start CPU usage sampler + cpu::start_sampler(); + // Set up logging based on verbosity let filter = match cli.verbose { 0 => "info", diff --git a/src/protocol.rs b/src/protocol.rs index 7346f35..a7e8e73 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -137,23 +137,28 @@ impl Command { pub struct StatusMessage { pub seq: u32, pub bytes_received: u32, + pub cpu_load: u8, } impl StatusMessage { pub fn serialize(&self) -> [u8; STATUS_MSG_SIZE] { let mut buf = [0u8; STATUS_MSG_SIZE]; buf[0] = STATUS_MSG_TYPE; - buf[1..5].copy_from_slice(&self.seq.to_be_bytes()); - buf[5] = 0; - buf[6] = 0; - buf[7] = 0; + // Byte 1: CPU load percentage (0-100) + buf[1] = self.cpu_load; + buf[2] = 0; + buf[3] = 0; + // Bytes 4-7: sequence number (LE) + buf[4..8].copy_from_slice(&self.seq.to_le_bytes()); + // Bytes 8-11: bytes received (LE) buf[8..12].copy_from_slice(&self.bytes_received.to_le_bytes()); buf } pub fn deserialize(buf: &[u8; STATUS_MSG_SIZE]) -> Self { Self { - seq: u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]), + cpu_load: buf[1], + seq: u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]), bytes_received: u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]), } } diff --git a/src/server.rs b/src/server.rs index 0a39fbc..771d0bf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -536,7 +536,7 @@ async fn tcp_tx_loop_inner( if send_status && Instant::now() >= next_status { status_seq += 1; let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed); - let status = StatusMessage { + let status = StatusMessage { cpu_load: crate::cpu::get(), seq: status_seq, bytes_received: rx_bytes as u32, }; @@ -615,7 +615,7 @@ async fn tcp_status_sender( // Swap to get bytes received this interval (atomic reset) let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed); - let status = StatusMessage { + let status = StatusMessage { cpu_load: crate::cpu::get(), seq, bytes_received: rx_bytes as u32, }; @@ -925,9 +925,10 @@ async fn udp_status_loop( match tokio::time::timeout(wait_time, reader.read_exact(&mut status_buf)).await { Ok(Ok(_)) => { let client_status = StatusMessage::deserialize(&status_buf); + state.remote_cpu.store(client_status.cpu_load, Ordering::Relaxed); tracing::debug!( - "RECV status: raw={:02x?} seq={} bytes_received={}", - &status_buf, client_status.seq, client_status.bytes_received, + "RECV status: raw={:02x?} seq={} bytes_received={} cpu={}%", + &status_buf, client_status.seq, client_status.bytes_received, client_status.cpu_load, ); if client_status.bytes_received > 0 && cmd.server_tx() { @@ -972,7 +973,7 @@ async fn udp_status_loop( } else { rx_bytes }; - let status = StatusMessage { + let status = StatusMessage { cpu_load: crate::cpu::get(), seq, bytes_received: report_bytes as u32, }; @@ -990,10 +991,14 @@ async fn udp_status_loop( // Print local stats and record totals state.record_interval(tx_bytes, rx_bytes, lost); if cmd.server_tx() { - bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None); + let local_cpu = crate::cpu::get(); + let remote_cpu = state.remote_cpu.load(Ordering::Relaxed); + bandwidth::print_status_with_cpu(seq, "TX", tx_bytes, Duration::from_secs(1), None, Some(local_cpu), Some(remote_cpu)); } if cmd.server_rx() { - bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost)); + let local_cpu = crate::cpu::get(); + let remote_cpu = state.remote_cpu.load(Ordering::Relaxed); + bandwidth::print_status_with_cpu(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost), Some(local_cpu), Some(remote_cpu)); } } }