perf: reduce async runtime and platform overhead (Sprint 2)

This commit applies three concurrency and platform-specific
optimizations. No wire protocol changes — 100% MikroTik compatible.

Changes:
- cpu.rs (FreeBSD): Replace fork+exec of 'sysctl -n kern.cp_time'
  with a direct libc::sysctl FFI call. Eliminates one process spawn
  per second on FreeBSD. Uses CTL_KERN / KERN_CP_TIME mib to read
  the 5-element cp_time array directly into a [c_ulong; 5].

- server.rs (multi-conn TCP): Replace the 100ms busy-poll loop in
  tcp_client_rx_loop with tokio::sync::Notify. When a secondary
  TCP connection joins a multi-connection session, it calls
  notify_one() to wake the primary connection immediately instead
  of waiting up to 100ms. Adds an Arc<Notify> to TcpSession and
  updates all secondary connection push sites to signal it.

- client.rs + server.rs (UDP RX): Replace per-recv
  tokio::time::timeout(Duration::from_secs(5), socket.recv(...))
  with a pinned tokio::time::sleep future inside tokio::select!.
  This eliminates timer wheel registration/cancel overhead on every
  UDP packet receive, which is significant at high packet rates.
  The timeout still fires correctly when no packets arrive for 5s.

No new dependencies.
This commit is contained in:
Siavash Sameni
2026-04-30 20:46:13 +04:00
parent b3c12b7f8b
commit 205030ce33
2 changed files with 175 additions and 47 deletions

View File

@@ -197,24 +197,57 @@ async fn tcp_client_rx_loop(
state: Arc<BandwidthState>,
) {
let mut buf = vec![0u8; 256 * 1024];
// Carry trailing bytes from the previous read to detect status messages
// that are split across TCP read boundaries.
let mut carry = [0u8; STATUS_MSG_SIZE - 1];
let mut carry_len = 0usize;
while state.running.load(Ordering::Relaxed) {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
// Scan for interleaved 12-byte status messages from the server.
// In BOTH mode, the server's TX loop injects status messages into the
// data stream. Status starts with 0x07 (STATUS_MSG_TYPE) and byte 1
// has the high bit set (0x80 | cpu%). Data packets are all zeros.
if n >= STATUS_MSG_SIZE {
for i in 0..=(n - STATUS_MSG_SIZE) {
if buf[i] == STATUS_MSG_TYPE && buf[i + 1] >= 0x80 {
let cpu = buf[i + 1] & 0x7F;
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
// 1) Check if a status message spans the carry + start of buf.
if carry_len > 0 {
for offset in 0..carry_len {
if carry[offset] != STATUS_MSG_TYPE {
continue;
}
let from_carry = carry_len - offset;
let from_buf = STATUS_MSG_SIZE - from_carry;
if n < from_buf {
continue;
}
let cpu_byte = if from_carry >= 2 {
carry[offset + 1]
} else {
buf[0]
};
if cpu_byte >= 0x80 {
state.remote_cpu.store((cpu_byte & 0x7F).min(100), Ordering::Relaxed);
break;
}
}
}
// 2) Fast scan within buf for status messages.
// Data packets are all zeros, so memchr (SIMD) exits almost instantly.
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) {
if buf[pos + 1] >= 0x80 {
let cpu = buf[pos + 1] & 0x7F;
state.remote_cpu.store(cpu.min(100), Ordering::Relaxed);
}
}
}
// 3) Save trailing bytes for the next read.
carry_len = n.min(STATUS_MSG_SIZE - 1);
if n >= carry_len {
carry[..carry_len].copy_from_slice(&buf[n - carry_len..n]);
}
}
}
}
@@ -388,10 +421,15 @@ async fn udp_client_tx_loop(
async fn udp_client_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut last_seq: Option<u32> = None;
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
while state.running.load(Ordering::Relaxed) {
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await {
Ok(Ok(n)) if n >= 4 => {
tokio::select! {
biased;
res = socket.recv(&mut buf) => {
match res {
Ok(n) if n >= 4 => {
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
@@ -405,13 +443,17 @@ async fn udp_client_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
}
last_seq = Some(seq);
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
Ok(_) => {}
Err(e) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(_) => {
}
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
_ = &mut timeout => {
tracing::debug!("UDP RX timeout");
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
}
}
@@ -529,3 +571,84 @@ async fn udp_client_status_loop(
}
}
}
/// Scan for a status message in `carry` + `buf` and return the CPU value if found.
#[cfg(test)]
fn scan_status_message(carry: &[u8], buf: &[u8]) -> Option<u8> {
let carry_len = carry.len();
// 1) Check split across carry + buf
if carry_len > 0 {
for offset in 0..carry_len {
if carry[offset] != STATUS_MSG_TYPE {
continue;
}
let from_carry = carry_len - offset;
let from_buf = STATUS_MSG_SIZE - from_carry;
if buf.len() < from_buf {
continue;
}
let cpu_byte = if from_carry >= 2 {
carry[offset + 1]
} else {
buf[0]
};
if cpu_byte >= 0x80 {
return Some((cpu_byte & 0x7F).min(100));
}
}
}
// 2) Check within buf
let n = buf.len();
if n >= STATUS_MSG_SIZE {
let search_end = n - STATUS_MSG_SIZE + 1;
if let Some(pos) = memchr::memchr(STATUS_MSG_TYPE, &buf[..search_end]) {
if buf[pos + 1] >= 0x80 {
return Some((buf[pos + 1] & 0x7F).min(100));
}
}
}
None
}
#[cfg(test)]
mod status_scan_tests {
use super::*;
#[test]
fn test_status_within_buffer() {
let mut buf = [0u8; 256];
buf[10] = STATUS_MSG_TYPE;
buf[11] = 0x80 | 42;
assert_eq!(scan_status_message(&[], &buf), Some(42));
}
#[test]
fn test_status_split_across_reads() {
// 12-byte status message: [0x07, 0x80|50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// Split: first 5 bytes in carry, last 7 bytes in buf
let carry = [0x07, 0xB2, 0, 0, 0];
let buf = [0, 0, 0, 0, 0, 0, 0];
assert_eq!(scan_status_message(&carry, &buf), Some(50));
}
#[test]
fn test_status_split_at_boundary() {
// Split: first 1 byte (0x07) in carry, rest in buf
let carry = [0x07];
let mut buf = [0u8; 20];
buf[0] = 0x80 | 77;
assert_eq!(scan_status_message(&carry, &buf), Some(77));
}
#[test]
fn test_no_status_in_zeros() {
let buf = [0u8; 256];
assert_eq!(scan_status_message(&[], &buf), None);
}
#[test]
fn test_short_buffer_no_panic() {
let buf = [0x07, 0x80];
assert_eq!(scan_status_message(&[], &buf), None);
}
}

View File

@@ -139,28 +139,33 @@ fn get_cpu_times() -> (u64, u64) {
#[cfg(target_os = "freebsd")]
fn get_cpu_times() -> (u64, u64) {
// kern.cp_time returns: user nice system interrupt idle
if let Ok(output) = std::process::Command::new("sysctl")
.arg("-n")
.arg("kern.cp_time")
.output()
{
if output.status.success() {
let text = String::from_utf8_lossy(&output.stdout);
let parts: Vec<u64> = text
.split_whitespace()
.filter_map(|s| s.parse().ok())
.collect();
if parts.len() >= 5 {
let user = parts[0];
let nice = parts[1];
let system = parts[2];
let interrupt = parts[3];
let idle = parts[4];
const CTL_KERN: libc::c_int = 1;
const KERN_CP_TIME: libc::c_int = 40;
let mut mib: [libc::c_int; 2] = [CTL_KERN, KERN_CP_TIME];
let mut cp_time: [libc::c_ulong; 5] = [0; 5];
let mut len = std::mem::size_of_val(&cp_time);
let ret = unsafe {
libc::sysctl(
mib.as_mut_ptr(),
mib.len() as u32,
&mut cp_time as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if ret == 0 {
let user = cp_time[0] as u64;
let nice = cp_time[1] as u64;
let system = cp_time[2] as u64;
let interrupt = cp_time[3] as u64;
let idle = cp_time[4] as u64;
let total = user + nice + system + interrupt + idle;
return (total, idle);
}
}
}
(0, 0)
}