diff --git a/src/client.rs b/src/client.rs index 56ad5bb..2e59e40 100644 --- a/src/client.rs +++ b/src/client.rs @@ -127,6 +127,12 @@ async fn run_tcp_test_client(stream: TcpStream, cmd: Command, state: Arc break, Ok(n) => { state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); + // Scan for interleaved 12-byte status messages from the server. + // In BOTH mode, the server's TX loop injects status messages into the + // data stream. Status starts with 0x07 (STATUS_MSG_TYPE) and byte 1 + // has the high bit set (0x80 | cpu%). Data packets are all zeros. + if n >= STATUS_MSG_SIZE { + for i in 0..=(n - STATUS_MSG_SIZE) { + if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 { + let cpu = buf[i + 1] & 0x7F; + state.remote_cpu.store(cpu.min(100), Ordering::Relaxed); + break; + } + } + } } } } } +/// Read only status messages from the server (TX-only mode). +/// The server sends 12-byte status messages on the TCP connection even when +/// the client is only transmitting. We need to read them to get remote CPU +/// and to prevent the TCP receive buffer from filling up. +async fn tcp_client_status_reader( + mut reader: tokio::net::tcp::OwnedReadHalf, + state: Arc, +) { + let mut buf = [0u8; STATUS_MSG_SIZE]; + while state.running.load(Ordering::Relaxed) { + match reader.read_exact(&mut buf).await { + Ok(_) => { + if buf[0] == STATUS_MSG_TYPE && buf[1] >= 0x80 { + let status = StatusMessage::deserialize(&buf); + state.remote_cpu.store(status.cpu_load, Ordering::Relaxed); + // Use server's bytes_received for TX speed adaptation + if status.bytes_received > 0 { + let new_speed = + ((status.bytes_received as u64 * 8 * 3) / 2) as u32; + state.tx_speed.store(new_speed, Ordering::Relaxed); + state.tx_speed_changed.store(true, Ordering::Relaxed); + } + } + } + Err(_) => break, + } + } +} + // --- UDP Test Client --- async fn run_udp_test_client( diff --git a/src/cpu.rs b/src/cpu.rs index 456bb0c..f56bfa8 100644 --- a/src/cpu.rs +++ b/src/cpu.rs @@ -29,7 +29,7 @@ pub fn get() -> u8 { // --- Platform-specific implementation --- -#[cfg(target_os = "linux")] +#[cfg(any(target_os = "linux", target_os = "android"))] fn get_cpu_times() -> (u64, u64) { // Read /proc/stat: cpu user nice system idle iowait irq softirq steal if let Ok(content) = std::fs::read_to_string("/proc/stat") { @@ -165,6 +165,7 @@ fn get_cpu_times() -> (u64, u64) { #[cfg(not(any( target_os = "linux", + target_os = "android", target_os = "macos", target_os = "windows", target_os = "freebsd", @@ -193,6 +194,7 @@ mod tests { // On supported platforms, total should be > 0 if cfg!(any( target_os = "linux", + target_os = "android", target_os = "macos", target_os = "windows", target_os = "freebsd", diff --git a/tests/full_integration_test.rs b/tests/full_integration_test.rs index 9dc2d23..30cdef9 100644 --- a/tests/full_integration_test.rs +++ b/tests/full_integration_test.rs @@ -336,3 +336,67 @@ async fn test_bandwidth_state_running_flag() { state.running.store(false, Ordering::SeqCst); assert!(!state.running.load(Ordering::Relaxed)); } + +// --- CPU Reporting Tests --- + +/// Helper that returns the full BandwidthState (not just summary) so we can check remote_cpu. +async fn run_client_with_state( + host: &str, port: u16, transmit: bool, receive: bool, udp: bool, + secs: u64, +) -> std::sync::Arc { + let direction = match (transmit, receive) { + (true, false) => btest_rs::protocol::CMD_DIR_RX, + (false, true) => btest_rs::protocol::CMD_DIR_TX, + (true, true) => btest_rs::protocol::CMD_DIR_BOTH, + _ => panic!("must specify direction"), + }; + let state = btest_rs::bandwidth::BandwidthState::new(); + let state_clone = state.clone(); + let host = host.to_string(); + + let handle = tokio::spawn(async move { + btest_rs::client::run_client( + &host, port, direction, udp, + 0, 0, None, None, false, state_clone, + ).await + }); + + tokio::time::sleep(Duration::from_secs(secs)).await; + state.running.store(false, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(500)).await; + handle.abort(); + + state +} + +#[test] +fn test_local_cpu_nonzero() { + // CPU sampler should return > 0 on supported platforms after warming up + btest_rs::cpu::start_sampler(); + std::thread::sleep(Duration::from_secs(2)); + let cpu = btest_rs::cpu::get(); + // On CI or idle machines, CPU may genuinely be 0, so just check it doesn't panic + // and returns a value in range + assert!(cpu <= 100, "CPU should be 0-100, got {}", cpu); +} + +#[tokio::test] +async fn test_tcp_remote_cpu_both() { + let port = BASE_PORT + 20; + start_server_noauth(port).await; + let state = run_client_with_state("127.0.0.1", port, true, true, false, 3).await; + let remote_cpu = state.remote_cpu.load(Ordering::Relaxed); + // On loopback with bidirectional traffic, server CPU should be > 0 + // The status messages are interleaved in the TCP data stream + assert!(remote_cpu > 0, "TCP BOTH: remote CPU should be > 0 on loopback, got {}", remote_cpu); +} + +#[tokio::test] +async fn test_tcp_remote_cpu_tx_only() { + let port = BASE_PORT + 21; + start_server_noauth(port).await; + let state = run_client_with_state("127.0.0.1", port, true, false, false, 3).await; + let remote_cpu = state.remote_cpu.load(Ordering::Relaxed); + // TX-only: server sends status messages that the status reader should parse + assert!(remote_cpu > 0, "TCP TX-only: remote CPU should be > 0 on loopback, got {}", remote_cpu); +}