perf: eliminate redundant allocations and computations (Sprint 1)

This commit applies eight low-risk internal optimizations identified
in the performance audit. No wire protocol changes — 100% MikroTik
compatible.

Changes:
- ecsrp5.rs: Cache WCurve in a global LazyLock, eliminating the
  expensive BigUint modular square root recomputation on every
  EC-SRP5 authentication. Also optimize the local hex::encode
  module to use a single pre-allocated String instead of N format!
  allocations.

- server.rs: Deduplicate Instant::now() calls in the TCP TX hot
  loop, caching the result at the top of each iteration.

- csv_output.rs: Hold the CSV file handle open in a static
  Mutex<Option<(String, File)>> instead of reopening the file on
  every write_result call. Add explicit flush after each write.

- server_pro/user_db.rs: Replace hand-rolled Gregorian calendar
  math (30+ lines looping from 1970) with chrono::Local::now().
  Optimize hash_password() to write username:password directly
  into the SHA256 hasher and hex-encode with a pre-allocated
  String.

- server_pro/enforcer.rs: Replace allocating error string matching
  (format!({}, e).as_str().contains(...)) with direct
  QuotaError variant matching. Pass ip_str into flush_to_db()
  to avoid a per-call ip.to_string().

- syslog_logger.rs: Move timestamp formatting outside the global
  std::sync::Mutex to reduce lock hold time. Replace manual
  calendar arithmetic with chrono::Local::now().format().

New dependency: chrono (already pulled in transitively by rusqlite).
This commit is contained in:
Siavash Sameni
2026-04-30 20:45:56 +04:00
parent a655d3bbe8
commit b3c12b7f8b
7 changed files with 104 additions and 119 deletions

View File

@@ -4,6 +4,8 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::Mutex;
@@ -18,6 +20,7 @@ struct TcpSession {
peer_ip: std::net::IpAddr,
streams: Vec<TcpStream>,
expected: u8,
notify: Arc<Notify>,
}
type SessionMap = Arc<Mutex<HashMap<u16, TcpSession>>>;
@@ -169,6 +172,7 @@ async fn handle_client(
stream.flush().await?;
session.streams.push(stream);
session.notify.notify_one();
tracing::info!(
"Secondary connection joined ({}/{})",
session.streams.len() + 1,
@@ -249,6 +253,7 @@ async fn handle_client(
for (_t, s) in map.iter_mut() {
if s.peer_ip == peer.ip() && s.streams.len() < s.expected as usize {
s.streams.push(stream);
s.notify.notify_one();
return Ok(());
}
}
@@ -299,12 +304,14 @@ async fn handle_client(
let conn_count = cmd.tcp_conn_count;
// Register session for secondary connections to find
let notify = Arc::new(Notify::new());
{
let mut map = sessions.lock().await;
map.insert(session_token, TcpSession {
peer_ip: peer.ip(),
streams: Vec::new(),
expected: conn_count,
notify: notify.clone(),
});
}
@@ -320,7 +327,8 @@ async fn handle_client(
if count + 1 >= conn_count as usize {
break;
}
if Instant::now() > deadline {
let now = Instant::now();
if now >= deadline {
tracing::warn!(
"Timeout waiting for TCP connections ({}/{}), proceeding",
count + 1,
@@ -328,7 +336,17 @@ async fn handle_client(
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
match tokio::time::timeout(deadline - now, notify.notified()).await {
Ok(()) => continue,
Err(_) => {
tracing::warn!(
"Timeout waiting for TCP connections ({}/{}), proceeding",
count + 1,
conn_count,
);
break;
}
}
}
let extra_streams = {
@@ -589,8 +607,10 @@ async fn tcp_tx_loop_inner(
let mut status_seq: u32 = 0;
while state.running.load(Ordering::Relaxed) {
let now = Instant::now();
// Inject status message every ~1 second if in bidirectional mode
if send_status && Instant::now() >= next_status {
if send_status && now >= next_status {
status_seq += 1;
let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed);
let status = StatusMessage { cpu_load: crate::cpu::get(),
@@ -603,7 +623,7 @@ async fn tcp_tx_loop_inner(
}
state.record_interval(0, rx_bytes, 0);
bandwidth::print_status(status_seq, "RX", rx_bytes, Duration::from_secs(1), None);
next_status = Instant::now() + Duration::from_secs(1);
next_status = now + Duration::from_secs(1);
}
if !state.spend_budget(effective_size as u64) {
@@ -619,12 +639,11 @@ async fn tcp_tx_loop_inner(
state.tx_speed_changed.store(false, Ordering::Relaxed);
let new_speed = state.tx_speed.load(Ordering::Relaxed);
interval = bandwidth::calc_send_interval(new_speed, tx_size as u16);
next_send = Instant::now();
next_send = now;
}
match interval {
Some(iv) => {
let now = Instant::now();
if let Some(delay) = bandwidth::advance_next_send(&mut next_send, iv, now) {
tokio::time::sleep(delay).await;
}
@@ -918,36 +937,43 @@ async fn udp_tx_loop(
async fn udp_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
let mut buf = vec![0u8; 65536];
let mut last_seq: Option<u32> = None;
let mut timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
while state.running.load(Ordering::Relaxed) {
// Use recv_from to accept packets from any source port
// (multi-connection MikroTik sends from multiple ports)
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
Ok(Ok((n, _src))) if n >= 4 => {
if !state.spend_budget(n as u64) {
break;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
tokio::select! {
biased;
res = socket.recv_from(&mut buf) => {
match res {
Ok((n, _src)) if n >= 4 => {
if !state.spend_budget(n as u64) {
return;
}
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
state.rx_packets.fetch_add(1, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
if seq > expected {
let lost = seq - expected;
state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed);
}
}
last_seq = Some(seq);
state.last_udp_seq.store(seq, Ordering::Relaxed);
}
Ok(_) => {}
Err(e) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
last_seq = Some(seq);
state.last_udp_seq.store(seq, Ordering::Relaxed);
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::debug!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(_) => {
_ = &mut timeout => {
tracing::debug!("UDP RX timeout");
timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(5));
}
}
}