Fix CPU reporting: Android support, TCP remote CPU parsing
- Add target_os = "android" to CPU sampler (reads /proc/stat like Linux) - Parse remote CPU from interleaved TCP status messages in BOTH mode - Add dedicated status reader for TX-only mode (reads server's 12-byte status messages to get remote CPU and enable speed adaptation) - Add 3 CPU integration tests: local CPU, TCP BOTH remote, TCP TX-only Fixes: Android always showing cpu: 0%/0%, TCP remote CPU always 0% on all platforms (btest-to-btest and btest-to-MikroTik). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -127,6 +127,12 @@ async fn run_tcp_test_client(stream: TcpStream, cmd: Command, state: Arc<Bandwid
|
|||||||
Some(tokio::spawn(async move {
|
Some(tokio::spawn(async move {
|
||||||
tcp_client_rx_loop(reader, state_rx).await
|
tcp_client_rx_loop(reader, state_rx).await
|
||||||
}))
|
}))
|
||||||
|
} else if client_should_tx {
|
||||||
|
// TX-only: still need to read the server's status messages to get remote CPU.
|
||||||
|
// Don't count these bytes as RX data.
|
||||||
|
Some(tokio::spawn(async move {
|
||||||
|
tcp_client_status_reader(reader, state_rx).await
|
||||||
|
}))
|
||||||
} else {
|
} else {
|
||||||
_reader_keepalive = Some(reader);
|
_reader_keepalive = Some(reader);
|
||||||
None
|
None
|
||||||
@@ -189,10 +195,52 @@ async fn tcp_client_rx_loop(
|
|||||||
Ok(0) | Err(_) => break,
|
Ok(0) | Err(_) => break,
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
||||||
|
// Scan for interleaved 12-byte status messages from the server.
|
||||||
|
// In BOTH mode, the server's TX loop injects status messages into the
|
||||||
|
// data stream. Status starts with 0x07 (STATUS_MSG_TYPE) and byte 1
|
||||||
|
// has the high bit set (0x80 | cpu%). Data packets are all zeros.
|
||||||
|
if n >= STATUS_MSG_SIZE {
|
||||||
|
for i in 0..=(n - STATUS_MSG_SIZE) {
|
||||||
|
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
|
||||||
|
let cpu = buf[i + 1] & 0x7F;
|
||||||
|
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read only status messages from the server (TX-only mode).
|
||||||
|
/// The server sends 12-byte status messages on the TCP connection even when
|
||||||
|
/// the client is only transmitting. We need to read them to get remote CPU
|
||||||
|
/// and to prevent the TCP receive buffer from filling up.
|
||||||
|
async fn tcp_client_status_reader(
|
||||||
|
mut reader: tokio::net::tcp::OwnedReadHalf,
|
||||||
|
state: Arc<BandwidthState>,
|
||||||
|
) {
|
||||||
|
let mut buf = [0u8; STATUS_MSG_SIZE];
|
||||||
|
while state.running.load(Ordering::Relaxed) {
|
||||||
|
match reader.read_exact(&mut buf).await {
|
||||||
|
Ok(_) => {
|
||||||
|
if buf[0] == STATUS_MSG_TYPE && buf[1] >= 0x80 {
|
||||||
|
let status = StatusMessage::deserialize(&buf);
|
||||||
|
state.remote_cpu.store(status.cpu_load, Ordering::Relaxed);
|
||||||
|
// Use server's bytes_received for TX speed adaptation
|
||||||
|
if status.bytes_received > 0 {
|
||||||
|
let new_speed =
|
||||||
|
((status.bytes_received as u64 * 8 * 3) / 2) as u32;
|
||||||
|
state.tx_speed.store(new_speed, Ordering::Relaxed);
|
||||||
|
state.tx_speed_changed.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// --- UDP Test Client ---
|
// --- UDP Test Client ---
|
||||||
|
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ pub fn get() -> u8 {
|
|||||||
|
|
||||||
// --- Platform-specific implementation ---
|
// --- Platform-specific implementation ---
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||||
fn get_cpu_times() -> (u64, u64) {
|
fn get_cpu_times() -> (u64, u64) {
|
||||||
// Read /proc/stat: cpu user nice system idle iowait irq softirq steal
|
// Read /proc/stat: cpu user nice system idle iowait irq softirq steal
|
||||||
if let Ok(content) = std::fs::read_to_string("/proc/stat") {
|
if let Ok(content) = std::fs::read_to_string("/proc/stat") {
|
||||||
@@ -165,6 +165,7 @@ fn get_cpu_times() -> (u64, u64) {
|
|||||||
|
|
||||||
#[cfg(not(any(
|
#[cfg(not(any(
|
||||||
target_os = "linux",
|
target_os = "linux",
|
||||||
|
target_os = "android",
|
||||||
target_os = "macos",
|
target_os = "macos",
|
||||||
target_os = "windows",
|
target_os = "windows",
|
||||||
target_os = "freebsd",
|
target_os = "freebsd",
|
||||||
@@ -193,6 +194,7 @@ mod tests {
|
|||||||
// On supported platforms, total should be > 0
|
// On supported platforms, total should be > 0
|
||||||
if cfg!(any(
|
if cfg!(any(
|
||||||
target_os = "linux",
|
target_os = "linux",
|
||||||
|
target_os = "android",
|
||||||
target_os = "macos",
|
target_os = "macos",
|
||||||
target_os = "windows",
|
target_os = "windows",
|
||||||
target_os = "freebsd",
|
target_os = "freebsd",
|
||||||
|
|||||||
@@ -336,3 +336,67 @@ async fn test_bandwidth_state_running_flag() {
|
|||||||
state.running.store(false, Ordering::SeqCst);
|
state.running.store(false, Ordering::SeqCst);
|
||||||
assert!(!state.running.load(Ordering::Relaxed));
|
assert!(!state.running.load(Ordering::Relaxed));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- CPU Reporting Tests ---
|
||||||
|
|
||||||
|
/// Helper that returns the full BandwidthState (not just summary) so we can check remote_cpu.
|
||||||
|
async fn run_client_with_state(
|
||||||
|
host: &str, port: u16, transmit: bool, receive: bool, udp: bool,
|
||||||
|
secs: u64,
|
||||||
|
) -> std::sync::Arc<btest_rs::bandwidth::BandwidthState> {
|
||||||
|
let direction = match (transmit, receive) {
|
||||||
|
(true, false) => btest_rs::protocol::CMD_DIR_RX,
|
||||||
|
(false, true) => btest_rs::protocol::CMD_DIR_TX,
|
||||||
|
(true, true) => btest_rs::protocol::CMD_DIR_BOTH,
|
||||||
|
_ => panic!("must specify direction"),
|
||||||
|
};
|
||||||
|
let state = btest_rs::bandwidth::BandwidthState::new();
|
||||||
|
let state_clone = state.clone();
|
||||||
|
let host = host.to_string();
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
btest_rs::client::run_client(
|
||||||
|
&host, port, direction, udp,
|
||||||
|
0, 0, None, None, false, state_clone,
|
||||||
|
).await
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(secs)).await;
|
||||||
|
state.running.store(false, Ordering::SeqCst);
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
|
handle.abort();
|
||||||
|
|
||||||
|
state
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_local_cpu_nonzero() {
|
||||||
|
// CPU sampler should return > 0 on supported platforms after warming up
|
||||||
|
btest_rs::cpu::start_sampler();
|
||||||
|
std::thread::sleep(Duration::from_secs(2));
|
||||||
|
let cpu = btest_rs::cpu::get();
|
||||||
|
// On CI or idle machines, CPU may genuinely be 0, so just check it doesn't panic
|
||||||
|
// and returns a value in range
|
||||||
|
assert!(cpu <= 100, "CPU should be 0-100, got {}", cpu);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_tcp_remote_cpu_both() {
|
||||||
|
let port = BASE_PORT + 20;
|
||||||
|
start_server_noauth(port).await;
|
||||||
|
let state = run_client_with_state("127.0.0.1", port, true, true, false, 3).await;
|
||||||
|
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
|
||||||
|
// On loopback with bidirectional traffic, server CPU should be > 0
|
||||||
|
// The status messages are interleaved in the TCP data stream
|
||||||
|
assert!(remote_cpu > 0, "TCP BOTH: remote CPU should be > 0 on loopback, got {}", remote_cpu);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_tcp_remote_cpu_tx_only() {
|
||||||
|
let port = BASE_PORT + 21;
|
||||||
|
start_server_noauth(port).await;
|
||||||
|
let state = run_client_with_state("127.0.0.1", port, true, false, false, 3).await;
|
||||||
|
let remote_cpu = state.remote_cpu.load(Ordering::Relaxed);
|
||||||
|
// TX-only: server sends status messages that the status reader should parse
|
||||||
|
assert!(remote_cpu > 0, "TCP TX-only: remote CPU should be > 0 on loopback, got {}", remote_cpu);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user