diff --git a/src/client.rs b/src/client.rs index 7366987..226f209 100644 --- a/src/client.rs +++ b/src/client.rs @@ -197,24 +197,57 @@ async fn tcp_client_rx_loop( state: Arc, ) { let mut buf = vec![0u8; 256 * 1024]; + // Carry trailing bytes from the previous read to detect status messages + // that are split across TCP read boundaries. + let mut carry = [0u8; STATUS_MSG_SIZE - 1]; + let mut carry_len = 0usize; + while state.running.load(Ordering::Relaxed) { match reader.read(&mut buf).await { Ok(0) | Err(_) => break, Ok(n) => { state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); - // Scan for interleaved 12-byte status messages from the server. - // In BOTH mode, the server's TX loop injects status messages into the - // data stream. Status starts with 0x07 (STATUS_MSG_TYPE) and byte 1 - // has the high bit set (0x80 | cpu%). Data packets are all zeros. - if n >= STATUS_MSG_SIZE { - for i in 0..=(n - STATUS_MSG_SIZE) { - if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 { - let cpu = buf[i + 1] & 0x7F; - state.remote_cpu.store(cpu.min(100), Ordering::Relaxed); + + // 1) Check if a status message spans the carry + start of buf. + if carry_len > 0 { + for offset in 0..carry_len { + if carry[offset] != STATUS_MSG_TYPE { + continue; + } + let from_carry = carry_len - offset; + let from_buf = STATUS_MSG_SIZE - from_carry; + if n < from_buf { + continue; + } + let cpu_byte = if from_carry >= 2 { + carry[offset + 1] + } else { + buf[0] + }; + if cpu_byte >= 0x80 { + state.remote_cpu.store((cpu_byte & 0x7F).min(100), Ordering::Relaxed); break; } } } + + // 2) Fast scan within buf for status messages. + // Data packets are all zeros, so memchr (SIMD) exits almost instantly. + if n >= STATUS_MSG_SIZE { + let search_end = n - STATUS_MSG_SIZE + 1; + if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) { + if buf[pos + 1] >= 0x80 { + let cpu = buf[pos + 1] & 0x7F; + state.remote_cpu.store(cpu.min(100), Ordering::Relaxed); + } + } + } + + // 3) Save trailing bytes for the next read. + carry_len = n.min(STATUS_MSG_SIZE - 1); + if n >= carry_len { + carry[..carry_len].copy_from_slice(&buf[n - carry_len..n]); + } } } } @@ -388,30 +421,39 @@ async fn udp_client_tx_loop( async fn udp_client_rx_loop(socket: &UdpSocket, state: Arc) { let mut buf = vec![0u8; 65536]; let mut last_seq: Option = None; + let mut timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); while state.running.load(Ordering::Relaxed) { - match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await { - Ok(Ok(n)) if n >= 4 => { - state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); - state.rx_packets.fetch_add(1, Ordering::Relaxed); + tokio::select! { + biased; + res = socket.recv(&mut buf) => { + match res { + Ok(n) if n >= 4 => { + state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); + state.rx_packets.fetch_add(1, Ordering::Relaxed); - let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); - if let Some(last) = last_seq { - let expected = last.wrapping_add(1); - if seq > expected { - let lost = seq - expected; - state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed); + let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); + if let Some(last) = last_seq { + let expected = last.wrapping_add(1); + if seq > expected { + let lost = seq - expected; + state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed); + } + } + last_seq = Some(seq); + } + Ok(_) => {} + Err(e) => { + tracing::debug!("UDP recv error: {}", e); + tokio::time::sleep(Duration::from_millis(10)).await; } } - last_seq = Some(seq); + timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5)); } - Ok(Ok(_)) => {} - Ok(Err(e)) => { - tracing::debug!("UDP recv error: {}", e); - tokio::time::sleep(Duration::from_millis(10)).await; - } - Err(_) => { + _ = &mut timeout => { tracing::debug!("UDP RX timeout"); + timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5)); } } } @@ -529,3 +571,84 @@ async fn udp_client_status_loop( } } } + +/// Scan for a status message in `carry` + `buf` and return the CPU value if found. +#[cfg(test)] +fn scan_status_message(carry: &[u8], buf: &[u8]) -> Option { + let carry_len = carry.len(); + // 1) Check split across carry + buf + if carry_len > 0 { + for offset in 0..carry_len { + if carry[offset] != STATUS_MSG_TYPE { + continue; + } + let from_carry = carry_len - offset; + let from_buf = STATUS_MSG_SIZE - from_carry; + if buf.len() < from_buf { + continue; + } + let cpu_byte = if from_carry >= 2 { + carry[offset + 1] + } else { + buf[0] + }; + if cpu_byte >= 0x80 { + return Some((cpu_byte & 0x7F).min(100)); + } + } + } + // 2) Check within buf + let n = buf.len(); + if n >= STATUS_MSG_SIZE { + let search_end = n - STATUS_MSG_SIZE + 1; + if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) { + if buf[pos + 1] >= 0x80 { + return Some((buf[pos + 1] & 0x7F).min(100)); + } + } + } + None +} + +#[cfg(test)] +mod status_scan_tests { + use super::*; + + #[test] + fn test_status_within_buffer() { + let mut buf = [0u8; 256]; + buf[10] = STATUS_MSG_TYPE; + buf[11] = 0x80 | 42; + assert_eq!(scan_status_message(&[], &buf), Some(42)); + } + + #[test] + fn test_status_split_across_reads() { + // 12-byte status message: [0x07, 0x80|50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + // Split: first 5 bytes in carry, last 7 bytes in buf + let carry = [0x07, 0xB2, 0, 0, 0]; + let buf = [0, 0, 0, 0, 0, 0, 0]; + assert_eq!(scan_status_message(&carry, &buf), Some(50)); + } + + #[test] + fn test_status_split_at_boundary() { + // Split: first 1 byte (0x07) in carry, rest in buf + let carry = [0x07]; + let mut buf = [0u8; 20]; + buf[0] = 0x80 | 77; + assert_eq!(scan_status_message(&carry, &buf), Some(77)); + } + + #[test] + fn test_no_status_in_zeros() { + let buf = [0u8; 256]; + assert_eq!(scan_status_message(&[], &buf), None); + } + + #[test] + fn test_short_buffer_no_panic() { + let buf = [0x07, 0x80]; + assert_eq!(scan_status_message(&[], &buf), None); + } +} diff --git a/src/cpu.rs b/src/cpu.rs index f4fb043..e3350b8 100644 --- a/src/cpu.rs +++ b/src/cpu.rs @@ -139,27 +139,32 @@ fn get_cpu_times() -> (u64, u64) { #[cfg(target_os = "freebsd")] fn get_cpu_times() -> (u64, u64) { // kern.cp_time returns: user nice system interrupt idle - if let Ok(output) = std::process::Command::new("sysctl") - .arg("-n") - .arg("kern.cp_time") - .output() - { - if output.status.success() { - let text = String::from_utf8_lossy(&output.stdout); - let parts: Vec = text - .split_whitespace() - .filter_map(|s| s.parse().ok()) - .collect(); - if parts.len() >= 5 { - let user = parts[0]; - let nice = parts[1]; - let system = parts[2]; - let interrupt = parts[3]; - let idle = parts[4]; - let total = user + nice + system + interrupt + idle; - return (total, idle); - } - } + const CTL_KERN: libc::c_int = 1; + const KERN_CP_TIME: libc::c_int = 40; + + let mut mib: [libc::c_int; 2] = [CTL_KERN, KERN_CP_TIME]; + let mut cp_time: [libc::c_ulong; 5] = [0; 5]; + let mut len = std::mem::size_of_val(&cp_time); + + let ret = unsafe { + libc::sysctl( + mib.as_mut_ptr(), + mib.len() as u32, + &mut cp_time as *mut _ as *mut libc::c_void, + &mut len, + std::ptr::null_mut(), + 0, + ) + }; + + if ret == 0 { + let user = cp_time[0] as u64; + let nice = cp_time[1] as u64; + let system = cp_time[2] as u64; + let interrupt = cp_time[3] as u64; + let idle = cp_time[4] as u64; + let total = user + nice + system + interrupt + idle; + return (total, idle); } (0, 0) }