Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fe4e72bb3 | ||
|
|
9853d74c4a | ||
|
|
1659f10d62 | ||
|
|
3dfd0185e5 | ||
|
|
28e553bc5f | ||
|
|
4dddf21f2f | ||
|
|
1be3cb82dc | ||
|
|
f1f597d308 | ||
|
|
091222fbd4 |
@@ -1,3 +1,4 @@
|
|||||||
|
GITEA_USER=your_gitea_username
|
||||||
GITEA_TOKEN=your_gitea_api_token_here
|
GITEA_TOKEN=your_gitea_api_token_here
|
||||||
GITEA_URL=https://git.manko.yoga
|
GITEA_URL=https://git.manko.yoga
|
||||||
REPO=manawenuz/btest-rs
|
REPO=manawenuz/btest-rs
|
||||||
|
|||||||
18
Dockerfile.static
Normal file
18
Dockerfile.static
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
# Minimal image from a pre-built static binary
|
||||||
|
# Usage: docker build -f Dockerfile.static --build-arg BINARY=dist/btest .
|
||||||
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
ca-certificates \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
ARG BINARY=dist/btest
|
||||||
|
COPY ${BINARY} /usr/local/bin/btest
|
||||||
|
RUN chmod +x /usr/local/bin/btest
|
||||||
|
|
||||||
|
EXPOSE 2000/tcp
|
||||||
|
EXPOSE 2001-2100/udp
|
||||||
|
EXPOSE 2257-2356/udp
|
||||||
|
|
||||||
|
ENTRYPOINT ["btest"]
|
||||||
|
CMD ["-s"]
|
||||||
@@ -116,8 +116,10 @@ btest -s -vv # info + debug + trace (hex dumps of status exchange)
|
|||||||
|
|
||||||
### Run btest from MikroTik (connecting to our server)
|
### Run btest from MikroTik (connecting to our server)
|
||||||
|
|
||||||
|
**Important: Set Connection Count to 1** — multi-connection mode is not supported.
|
||||||
|
|
||||||
```
|
```
|
||||||
/tool/bandwidth-test address=<server-ip> direction=both protocol=udp user=admin password=password
|
/tool/bandwidth-test address=<server-ip> direction=both protocol=udp user=admin password=password connection-count=1
|
||||||
```
|
```
|
||||||
|
|
||||||
## Protocol
|
## Protocol
|
||||||
@@ -133,7 +135,7 @@ See the [original protocol documentation](btest-opensource/README.md) for wire-f
|
|||||||
## Known Limitations
|
## Known Limitations
|
||||||
|
|
||||||
- **EC-SRP5 authentication** (RouterOS >= 6.43) is not yet supported for client mode. Server mode works fine with MD5 auth. Disable auth on the MikroTik btest server as a workaround.
|
- **EC-SRP5 authentication** (RouterOS >= 6.43) is not yet supported for client mode. Server mode works fine with MD5 auth. Disable auth on the MikroTik btest server as a workaround.
|
||||||
- **Multi-connection mode** (`Connection Count > 1` on MikroTik client) causes MikroTik's per-connection speed adaptation to throttle each stream independently, resulting in lower aggregate throughput. Use 1 connection for best results.
|
- **Multi-connection UDP** is supported. MikroTik's multi-connection mode sends from multiple source ports which are all accepted by the server.
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
|
|||||||
@@ -45,17 +45,16 @@ btest -s -vv # Show hex dumps of status exchange (for debugging)
|
|||||||
|
|
||||||
### MikroTik Configuration (connecting to our server)
|
### MikroTik Configuration (connecting to our server)
|
||||||
|
|
||||||
|
**Important: Always set Connection Count to 1.** Multi-connection mode is not supported and will cause severely degraded speeds.
|
||||||
|
|
||||||
On the MikroTik device (WinBox or CLI):
|
On the MikroTik device (WinBox or CLI):
|
||||||
|
|
||||||
```
|
```
|
||||||
# CLI
|
# CLI — always include connection-count=1
|
||||||
/tool/bandwidth-test address=<server-ip> direction=both protocol=udp user=admin password=mysecretpassword
|
/tool/bandwidth-test address=<server-ip> direction=both protocol=udp user=admin password=mysecretpassword connection-count=1
|
||||||
|
|
||||||
# For best results, use 1 connection
|
|
||||||
/tool/bandwidth-test address=<server-ip> direction=both protocol=udp connection-count=1
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Or via WinBox: **Tools → Bandwidth Test**, enter server address, credentials, and click Start.
|
Or via WinBox: **Tools → Bandwidth Test**, enter server address, credentials, **set Connection Count to 1**, and click Start.
|
||||||
|
|
||||||
## Client Mode
|
## Client Mode
|
||||||
|
|
||||||
@@ -169,7 +168,7 @@ Options:
|
|||||||
|
|
||||||
## Tips
|
## Tips
|
||||||
|
|
||||||
- **Use 1 connection** when MikroTik connects to your server. Multi-connection mode causes MikroTik's per-connection speed adaptation to throttle.
|
- **Connection Count MUST be 1** when MikroTik connects to your server. Multi-connection mode is not supported and will cause speeds to drop to near zero. Single-connection performance is excellent (1+ Gbps).
|
||||||
- **TCP mode** generally gives more stable results than UDP due to TCP flow control.
|
- **TCP mode** generally gives more stable results than UDP due to TCP flow control.
|
||||||
- **UDP mode** is better for measuring raw link capacity without TCP overhead.
|
- **UDP mode** is better for measuring raw link capacity without TCP overhead.
|
||||||
- **First interval** may show higher or lower numbers as the connection stabilizes. Look at intervals 3+ for steady-state throughput.
|
- **First interval** may show higher or lower numbers as the connection stabilizes. Look at intervals 3+ for steady-state throughput.
|
||||||
@@ -182,5 +181,6 @@ Options:
|
|||||||
| `EC-SRP5 authentication not supported` | Disable auth on MikroTik btest server, or use older RouterOS |
|
| `EC-SRP5 authentication not supported` | Disable auth on MikroTik btest server, or use older RouterOS |
|
||||||
| `Connection refused` | Check port 2000 is open, firewall allows it |
|
| `Connection refused` | Check port 2000 is open, firewall allows it |
|
||||||
| Server shows 0 RX | Check MikroTik is actually sending (direction setting) |
|
| Server shows 0 RX | Check MikroTik is actually sending (direction setting) |
|
||||||
| Speed drops over time (server mode) | MikroTik client behavior — use 1 connection, or use our client mode instead |
|
| Speed drops over time (server mode) | Set Connection Count to 1 on MikroTik. Multi-connection is not supported |
|
||||||
|
| Very low speed with multiple connections | Multi-connection mode is broken — set Connection Count to 1 |
|
||||||
| UDP `lost` packets high | Network congestion or MTU issues, try reducing bandwidth with `-b` |
|
| UDP `lost` packets high | Network congestion or MTU issues, try reducing bandwidth with `-b` |
|
||||||
|
|||||||
75
scripts/debug-capture.sh
Executable file
75
scripts/debug-capture.sh
Executable file
@@ -0,0 +1,75 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# Capture btest traffic for debugging multi-connection issues.
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# # Terminal 1: Start capture
|
||||||
|
# sudo ./scripts/debug-capture.sh capture <interface> [mikrotik_ip]
|
||||||
|
#
|
||||||
|
# # Terminal 2: Run server or client
|
||||||
|
# ./target/release/btest -s -a admin -p password -vv
|
||||||
|
#
|
||||||
|
# # Terminal 1: Stop with Ctrl+C, then analyze
|
||||||
|
# ./scripts/debug-capture.sh analyze
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
cd "$(dirname "$0")/.."
|
||||||
|
|
||||||
|
CMD="${1:?Usage: $0 <capture|analyze> [interface] [mikrotik_ip]}"
|
||||||
|
|
||||||
|
PCAP_FILE="dist/btest-debug.pcap"
|
||||||
|
mkdir -p dist
|
||||||
|
|
||||||
|
case "$CMD" in
|
||||||
|
capture)
|
||||||
|
IFACE="${2:?Specify interface (e.g., en0, eth0)}"
|
||||||
|
MK_IP="${3:-}"
|
||||||
|
|
||||||
|
FILTER="port 2000 or portrange 2001-2100 or portrange 2257-2356"
|
||||||
|
if [[ -n "$MK_IP" ]]; then
|
||||||
|
FILTER="host $MK_IP and ($FILTER)"
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Capturing btest traffic on $IFACE..."
|
||||||
|
echo "Filter: $FILTER"
|
||||||
|
echo "Output: $PCAP_FILE"
|
||||||
|
echo "Press Ctrl+C to stop"
|
||||||
|
echo ""
|
||||||
|
tcpdump -i "$IFACE" -w "$PCAP_FILE" -s 128 "$FILTER"
|
||||||
|
;;
|
||||||
|
|
||||||
|
analyze)
|
||||||
|
if [[ ! -f "$PCAP_FILE" ]]; then
|
||||||
|
echo "No capture file found at $PCAP_FILE"
|
||||||
|
echo "Run: sudo $0 capture <interface> first"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "=== TCP Control Channel (port 2000) ==="
|
||||||
|
echo ""
|
||||||
|
echo "--- Connection summary ---"
|
||||||
|
tcpdump -r "$PCAP_FILE" -n 'tcp port 2000 and (tcp[tcpflags] & tcp-syn != 0)' 2>/dev/null | head -20
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
echo "--- All TCP control data (first 64 bytes of payload) ---"
|
||||||
|
tcpdump -r "$PCAP_FILE" -n -X 'tcp port 2000 and tcp[tcpflags] & tcp-push != 0' 2>/dev/null | head -100
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
echo "=== UDP Data Ports ==="
|
||||||
|
echo ""
|
||||||
|
echo "--- UDP port usage ---"
|
||||||
|
tcpdump -r "$PCAP_FILE" -n 'udp' 2>/dev/null | awk '{print $3, $5}' | sort | uniq -c | sort -rn | head -20
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
echo "--- Timing of first packets per connection ---"
|
||||||
|
tcpdump -r "$PCAP_FILE" -n -tt 'tcp port 2000 and (tcp[tcpflags] & tcp-syn != 0)' 2>/dev/null | head -20
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
echo "Full capture at: $PCAP_FILE"
|
||||||
|
echo "Open in Wireshark: wireshark $PCAP_FILE"
|
||||||
|
;;
|
||||||
|
|
||||||
|
*)
|
||||||
|
echo "Usage: $0 <capture|analyze> [interface] [mikrotik_ip]"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
@@ -1,6 +1,9 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
# Build and push Docker image to Gitea registry.
|
# Build and push multi-arch Docker images to Gitea registry.
|
||||||
# Run on a machine with Docker (your Mac).
|
#
|
||||||
|
# Prerequisites:
|
||||||
|
# - dist/btest-linux-x86_64.tar.gz (from CI release or scripts/build-linux.sh)
|
||||||
|
# - Native macOS binary (built automatically)
|
||||||
#
|
#
|
||||||
# Usage:
|
# Usage:
|
||||||
# ./scripts/push-docker.sh v0.1.0
|
# ./scripts/push-docker.sh v0.1.0
|
||||||
@@ -16,31 +19,86 @@ if [[ -f .env ]]; then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
TAG="${1:?Usage: $0 <tag> (e.g. v0.1.0)}"
|
TAG="${1:?Usage: $0 <tag> (e.g. v0.1.0)}"
|
||||||
REGISTRY="${GITEA_URL:-https://git.manko.yoga}"
|
REGISTRY_HOST="${GITEA_URL:-https://git.manko.yoga}"
|
||||||
REGISTRY_HOST="${REGISTRY#https://}"
|
REGISTRY_HOST="${REGISTRY_HOST#https://}"
|
||||||
REGISTRY_HOST="${REGISTRY_HOST#http://}"
|
REGISTRY_HOST="${REGISTRY_HOST#http://}"
|
||||||
IMAGE="${REGISTRY_HOST}/manawenuz/btest-rs"
|
IMAGE="${REGISTRY_HOST}/manawenuz/btest-rs"
|
||||||
|
|
||||||
echo "=== Building Docker image for ${IMAGE}:${TAG} ==="
|
# NOTE: Run 'docker login git.manko.yoga' manually first if not authenticated
|
||||||
|
|
||||||
# Build the image
|
mkdir -p dist/docker-amd64 dist/docker-arm64
|
||||||
docker build -t "${IMAGE}:${TAG}" -t "${IMAGE}:latest" .
|
|
||||||
|
|
||||||
echo ""
|
# --- x86_64 binary ---
|
||||||
echo "=== Pushing to ${REGISTRY_HOST} ==="
|
if [[ ! -f dist/docker-amd64/btest ]]; then
|
||||||
|
if [[ -f dist/btest-linux-x86_64.tar.gz ]]; then
|
||||||
# Login if needed (uses GITEA_TOKEN from .env)
|
echo "Extracting x86_64 binary from tarball..."
|
||||||
if [[ -n "${GITEA_TOKEN:-}" ]]; then
|
tar xzf dist/btest-linux-x86_64.tar.gz -C dist/docker-amd64/
|
||||||
echo "${GITEA_TOKEN}" | docker login "${REGISTRY_HOST}" -u token --password-stdin
|
else
|
||||||
|
echo "No x86_64 binary found. Downloading from release ${TAG}..."
|
||||||
|
GITEA_URL_FULL="https://${REGISTRY_HOST}"
|
||||||
|
RELEASE_URL=$(curl -sf \
|
||||||
|
-H "Authorization: token ${GITEA_TOKEN}" \
|
||||||
|
"${GITEA_URL_FULL}/api/v1/repos/manawenuz/btest-rs/releases/tags/${TAG}" \
|
||||||
|
| jq -r '.assets[] | select(.name=="btest-linux-x86_64.tar.gz") | .browser_download_url')
|
||||||
|
if [[ -n "$RELEASE_URL" ]]; then
|
||||||
|
curl -sL "$RELEASE_URL" | tar xz -C dist/docker-amd64/
|
||||||
|
else
|
||||||
|
echo "Error: Cannot find x86_64 binary. Run CI first or scripts/build-linux.sh"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
docker push "${IMAGE}:${TAG}"
|
# --- arm64 binary (native build on Apple Silicon) ---
|
||||||
docker push "${IMAGE}:latest"
|
if [[ ! -f dist/docker-arm64/btest ]]; then
|
||||||
|
echo "Building native arm64 binary..."
|
||||||
|
cargo build --release
|
||||||
|
cp target/release/btest dist/docker-arm64/btest
|
||||||
|
fi
|
||||||
|
|
||||||
echo ""
|
echo ""
|
||||||
echo "Done! Images pushed:"
|
echo "=== Building amd64 image ==="
|
||||||
echo " ${IMAGE}:${TAG}"
|
docker build --platform linux/amd64 -f Dockerfile.static \
|
||||||
echo " ${IMAGE}:latest"
|
--build-arg BINARY=dist/docker-amd64/btest \
|
||||||
|
-t "${IMAGE}:${TAG}-amd64" .
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Building arm64 image ==="
|
||||||
|
docker build --platform linux/arm64 -f Dockerfile.static \
|
||||||
|
--build-arg BINARY=dist/docker-arm64/btest \
|
||||||
|
-t "${IMAGE}:${TAG}-arm64" .
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Pushing ==="
|
||||||
|
docker push "${IMAGE}:${TAG}-amd64"
|
||||||
|
docker push "${IMAGE}:${TAG}-arm64"
|
||||||
|
|
||||||
|
# Create and push multi-arch manifest
|
||||||
|
echo ""
|
||||||
|
echo "=== Creating multi-arch manifest ==="
|
||||||
|
docker manifest create "${IMAGE}:${TAG}" \
|
||||||
|
"${IMAGE}:${TAG}-amd64" \
|
||||||
|
"${IMAGE}:${TAG}-arm64" 2>/dev/null || \
|
||||||
|
docker manifest create --amend "${IMAGE}:${TAG}" \
|
||||||
|
"${IMAGE}:${TAG}-amd64" \
|
||||||
|
"${IMAGE}:${TAG}-arm64"
|
||||||
|
|
||||||
|
docker manifest push "${IMAGE}:${TAG}"
|
||||||
|
|
||||||
|
# Tag as latest
|
||||||
|
docker manifest create "${IMAGE}:latest" \
|
||||||
|
"${IMAGE}:${TAG}-amd64" \
|
||||||
|
"${IMAGE}:${TAG}-arm64" 2>/dev/null || \
|
||||||
|
docker manifest create --amend "${IMAGE}:latest" \
|
||||||
|
"${IMAGE}:${TAG}-amd64" \
|
||||||
|
"${IMAGE}:${TAG}-arm64"
|
||||||
|
|
||||||
|
docker manifest push "${IMAGE}:latest"
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "Done! Multi-arch images pushed:"
|
||||||
|
echo " ${IMAGE}:${TAG} (amd64 + arm64)"
|
||||||
|
echo " ${IMAGE}:latest (amd64 + arm64)"
|
||||||
echo ""
|
echo ""
|
||||||
echo "Run with:"
|
echo "Run with:"
|
||||||
echo " docker run --rm -p 2000:2000 -p 2001-2100:2001-2100/udp ${IMAGE}:${TAG} -s -v"
|
echo " docker run --rm -p 2000:2000 -p 2001-2100:2001-2100/udp ${IMAGE}:${TAG} -s -v"
|
||||||
|
|||||||
16
src/auth.rs
16
src/auth.rs
@@ -26,34 +26,33 @@ pub fn compute_auth_hash(password: &str, challenge: &[u8; 16]) -> [u8; 16] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Server-side: send auth challenge and verify response.
|
/// Server-side: send auth challenge and verify response.
|
||||||
|
/// `ok_response` is the 4-byte reply on success (normally AUTH_OK = [01,00,00,00]).
|
||||||
|
/// For TCP multi-connection, pass [01,HI,LO,00] with a session token.
|
||||||
/// Returns Ok(()) if auth succeeds or no auth is configured.
|
/// Returns Ok(()) if auth succeeds or no auth is configured.
|
||||||
pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
|
pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
|
||||||
stream: &mut S,
|
stream: &mut S,
|
||||||
username: Option<&str>,
|
username: Option<&str>,
|
||||||
password: Option<&str>,
|
password: Option<&str>,
|
||||||
|
ok_response: &[u8; 4],
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
match (username, password) {
|
match (username, password) {
|
||||||
(None, None) => {
|
(None, None) => {
|
||||||
// No auth required
|
stream.write_all(ok_response).await?;
|
||||||
stream.write_all(&AUTH_OK).await?;
|
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
(_, Some(pass)) => {
|
(_, Some(pass)) => {
|
||||||
// Send auth challenge
|
|
||||||
stream.write_all(&AUTH_REQUIRED).await?;
|
stream.write_all(&AUTH_REQUIRED).await?;
|
||||||
let challenge = generate_challenge();
|
let challenge = generate_challenge();
|
||||||
stream.write_all(&challenge).await?;
|
stream.write_all(&challenge).await?;
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
|
|
||||||
// Receive response: 16 bytes hash + 32 bytes username
|
|
||||||
let mut response = [0u8; 48];
|
let mut response = [0u8; 48];
|
||||||
stream.read_exact(&mut response).await?;
|
stream.read_exact(&mut response).await?;
|
||||||
|
|
||||||
let received_hash = &response[0..16];
|
let received_hash = &response[0..16];
|
||||||
let received_user = &response[16..48];
|
let received_user = &response[16..48];
|
||||||
|
|
||||||
// Extract username (null-terminated)
|
|
||||||
let user_end = received_user
|
let user_end = received_user
|
||||||
.iter()
|
.iter()
|
||||||
.position(|&b| b == 0)
|
.position(|&b| b == 0)
|
||||||
@@ -61,7 +60,6 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
|
|||||||
let received_username = std::str::from_utf8(&received_user[..user_end])
|
let received_username = std::str::from_utf8(&received_user[..user_end])
|
||||||
.unwrap_or("");
|
.unwrap_or("");
|
||||||
|
|
||||||
// Verify username if configured
|
|
||||||
if let Some(expected_user) = username {
|
if let Some(expected_user) = username {
|
||||||
if received_username != expected_user {
|
if received_username != expected_user {
|
||||||
tracing::warn!("Auth failed: username mismatch (got '{}')", received_username);
|
tracing::warn!("Auth failed: username mismatch (got '{}')", received_username);
|
||||||
@@ -71,7 +69,6 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify hash
|
|
||||||
let expected_hash = compute_auth_hash(pass, &challenge);
|
let expected_hash = compute_auth_hash(pass, &challenge);
|
||||||
if received_hash != expected_hash {
|
if received_hash != expected_hash {
|
||||||
tracing::warn!("Auth failed: hash mismatch for user '{}'", received_username);
|
tracing::warn!("Auth failed: hash mismatch for user '{}'", received_username);
|
||||||
@@ -81,13 +78,12 @@ pub async fn server_authenticate<S: AsyncReadExt + AsyncWriteExt + Unpin>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!("Auth successful for user '{}'", received_username);
|
tracing::info!("Auth successful for user '{}'", received_username);
|
||||||
stream.write_all(&AUTH_OK).await?;
|
stream.write_all(ok_response).await?;
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
(Some(_), None) => {
|
(Some(_), None) => {
|
||||||
// Username but no password - treat as no auth
|
stream.write_all(ok_response).await?;
|
||||||
stream.write_all(&AUTH_OK).await?;
|
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
213
src/server.rs
213
src/server.rs
@@ -1,3 +1,4 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -5,11 +6,22 @@ use std::time::{Duration, Instant};
|
|||||||
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::net::{TcpListener, TcpStream, UdpSocket};
|
use tokio::net::{TcpListener, TcpStream, UdpSocket};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::auth;
|
use crate::auth;
|
||||||
use crate::bandwidth::{self, BandwidthState};
|
use crate::bandwidth::{self, BandwidthState};
|
||||||
use crate::protocol::*;
|
use crate::protocol::*;
|
||||||
|
|
||||||
|
/// Pending TCP multi-connection session: first connection creates this,
|
||||||
|
/// subsequent connections join via the session token.
|
||||||
|
struct TcpSession {
|
||||||
|
peer_ip: std::net::IpAddr,
|
||||||
|
streams: Vec<TcpStream>,
|
||||||
|
expected: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
type SessionMap = Arc<Mutex<HashMap<u16, TcpSession>>>;
|
||||||
|
|
||||||
pub async fn run_server(
|
pub async fn run_server(
|
||||||
port: u16,
|
port: u16,
|
||||||
auth_user: Option<String>,
|
auth_user: Option<String>,
|
||||||
@@ -20,6 +32,7 @@ pub async fn run_server(
|
|||||||
tracing::info!("btest server listening on {}", addr);
|
tracing::info!("btest server listening on {}", addr);
|
||||||
|
|
||||||
let udp_port_offset = Arc::new(std::sync::atomic::AtomicU16::new(0));
|
let udp_port_offset = Arc::new(std::sync::atomic::AtomicU16::new(0));
|
||||||
|
let sessions: SessionMap = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, peer) = listener.accept().await?;
|
let (stream, peer) = listener.accept().await?;
|
||||||
@@ -28,9 +41,12 @@ pub async fn run_server(
|
|||||||
let auth_user = auth_user.clone();
|
let auth_user = auth_user.clone();
|
||||||
let auth_pass = auth_pass.clone();
|
let auth_pass = auth_pass.clone();
|
||||||
let udp_offset = udp_port_offset.clone();
|
let udp_offset = udp_port_offset.clone();
|
||||||
|
let sessions = sessions.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_client(stream, peer, auth_user, auth_pass, udp_offset).await {
|
if let Err(e) =
|
||||||
|
handle_client(stream, peer, auth_user, auth_pass, udp_offset, sessions).await
|
||||||
|
{
|
||||||
tracing::error!("Client {} error: {}", peer, e);
|
tracing::error!("Client {} error: {}", peer, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -43,14 +59,58 @@ async fn handle_client(
|
|||||||
auth_user: Option<String>,
|
auth_user: Option<String>,
|
||||||
auth_pass: Option<String>,
|
auth_pass: Option<String>,
|
||||||
udp_port_offset: Arc<std::sync::atomic::AtomicU16>,
|
udp_port_offset: Arc<std::sync::atomic::AtomicU16>,
|
||||||
|
sessions: SessionMap,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
stream.set_nodelay(true)?;
|
stream.set_nodelay(true)?;
|
||||||
|
|
||||||
send_hello(&mut stream).await?;
|
send_hello(&mut stream).await?;
|
||||||
|
|
||||||
let cmd = recv_command(&mut stream).await?;
|
// Read 16-byte command (or whatever the client sends)
|
||||||
|
let mut cmd_buf = [0u8; 16];
|
||||||
|
stream.read_exact(&mut cmd_buf).await?;
|
||||||
|
tracing::debug!("Raw command from {}: {:02x?}", peer, cmd_buf);
|
||||||
|
|
||||||
|
// Check if this is a secondary TCP connection joining a session.
|
||||||
|
// Secondary connections send the session token in bytes 0-1 of their "command":
|
||||||
|
// [TOKEN_HI, TOKEN_LO, 0x02, 0x00, ...]
|
||||||
|
// They do NOT do auth — just send them AUTH_OK with the token and they join.
|
||||||
|
{
|
||||||
|
let mut map = sessions.lock().await;
|
||||||
|
let received_token = ((cmd_buf[0] as u16) << 8) | (cmd_buf[1] as u16);
|
||||||
|
if let Some(session) = map.get_mut(&received_token) {
|
||||||
|
if session.peer_ip == peer.ip()
|
||||||
|
&& session.streams.len() < session.expected as usize
|
||||||
|
{
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Client {} command: proto={} dir={} tx_size={} remote_speed={} local_speed={}",
|
"Client {} is secondary TCP connection (token={:04x})",
|
||||||
|
peer, received_token,
|
||||||
|
);
|
||||||
|
|
||||||
|
// No auth for secondary connections — just send OK with token
|
||||||
|
let ok = [0x01, cmd_buf[0], cmd_buf[1], 0x00];
|
||||||
|
stream.write_all(&ok).await?;
|
||||||
|
stream.flush().await?;
|
||||||
|
|
||||||
|
session.streams.push(stream);
|
||||||
|
tracing::info!(
|
||||||
|
"Secondary connection joined ({}/{})",
|
||||||
|
session.streams.len() + 1,
|
||||||
|
session.expected,
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(map);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Primary connection: parse the command normally
|
||||||
|
let cmd = Command::deserialize(&cmd_buf);
|
||||||
|
if cmd.proto > 1 || cmd.direction == 0 || cmd.direction > 3 {
|
||||||
|
return Err(BtestError::InvalidCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"Client {} command: proto={} dir={} conn_count={} tx_size={} remote_speed={} local_speed={}",
|
||||||
peer,
|
peer,
|
||||||
if cmd.is_udp() { "UDP" } else { "TCP" },
|
if cmd.is_udp() { "UDP" } else { "TCP" },
|
||||||
match cmd.direction {
|
match cmd.direction {
|
||||||
@@ -59,20 +119,131 @@ async fn handle_client(
|
|||||||
CMD_DIR_BOTH => "BOTH",
|
CMD_DIR_BOTH => "BOTH",
|
||||||
_ => "?",
|
_ => "?",
|
||||||
},
|
},
|
||||||
|
cmd.tcp_conn_count,
|
||||||
cmd.tx_size,
|
cmd.tx_size,
|
||||||
cmd.remote_tx_speed,
|
cmd.remote_tx_speed,
|
||||||
cmd.local_tx_speed,
|
cmd.local_tx_speed,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Build auth OK response - include session token for TCP multi-connection
|
||||||
|
let is_tcp_multi = !cmd.is_udp() && cmd.tcp_conn_count > 0;
|
||||||
|
let session_token: u16 = if is_tcp_multi {
|
||||||
|
rand::random::<u16>() | 0x0101 // ensure both bytes non-zero
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
let ok_response: [u8; 4] = if is_tcp_multi {
|
||||||
|
// MikroTik expects 01:HI:LO:00 for multi-connection support
|
||||||
|
[0x01, (session_token >> 8) as u8, (session_token & 0xFF) as u8, 0x00]
|
||||||
|
} else {
|
||||||
|
AUTH_OK
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_tcp_multi {
|
||||||
|
tracing::info!(
|
||||||
|
"TCP multi-connection: conn_count={}, session_token={:04x}, ok_response={:02x?}",
|
||||||
|
cmd.tcp_conn_count, session_token, ok_response,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is a secondary connection joining an existing TCP session
|
||||||
|
if is_tcp_multi {
|
||||||
|
let mut map = sessions.lock().await;
|
||||||
|
for (_token, session) in map.iter_mut() {
|
||||||
|
if session.peer_ip == peer.ip()
|
||||||
|
&& session.streams.len() < session.expected as usize
|
||||||
|
{
|
||||||
|
tracing::info!(
|
||||||
|
"Client {} joining TCP session ({}/{})",
|
||||||
|
peer,
|
||||||
|
session.streams.len() + 1,
|
||||||
|
session.expected,
|
||||||
|
);
|
||||||
|
drop(map);
|
||||||
|
// Secondary connections also do auth with the same session token response
|
||||||
auth::server_authenticate(
|
auth::server_authenticate(
|
||||||
&mut stream,
|
&mut stream,
|
||||||
auth_user.as_deref(),
|
auth_user.as_deref(),
|
||||||
auth_pass.as_deref(),
|
auth_pass.as_deref(),
|
||||||
|
&ok_response,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let mut map = sessions.lock().await;
|
||||||
|
for (_t, s) in map.iter_mut() {
|
||||||
|
if s.peer_ip == peer.ip() && s.streams.len() < s.expected as usize {
|
||||||
|
s.streams.push(stream);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(map);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Primary connection auth
|
||||||
|
auth::server_authenticate(
|
||||||
|
&mut stream,
|
||||||
|
auth_user.as_deref(),
|
||||||
|
auth_pass.as_deref(),
|
||||||
|
&ok_response,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if cmd.is_udp() {
|
if cmd.is_udp() {
|
||||||
run_udp_test_server(&mut stream, peer, &cmd, udp_port_offset).await
|
run_udp_test_server(&mut stream, peer, &cmd, udp_port_offset).await
|
||||||
|
} else if is_tcp_multi {
|
||||||
|
let conn_count = cmd.tcp_conn_count;
|
||||||
|
|
||||||
|
// Register session for secondary connections to find
|
||||||
|
{
|
||||||
|
let mut map = sessions.lock().await;
|
||||||
|
map.insert(session_token, TcpSession {
|
||||||
|
peer_ip: peer.ip(),
|
||||||
|
streams: Vec::new(),
|
||||||
|
expected: conn_count,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for secondary connections
|
||||||
|
let deadline = Instant::now() + Duration::from_secs(10);
|
||||||
|
loop {
|
||||||
|
let count = {
|
||||||
|
let map = sessions.lock().await;
|
||||||
|
map.get(&session_token)
|
||||||
|
.map(|s| s.streams.len())
|
||||||
|
.unwrap_or(0)
|
||||||
|
};
|
||||||
|
if count + 1 >= conn_count as usize {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if Instant::now() > deadline {
|
||||||
|
tracing::warn!(
|
||||||
|
"Timeout waiting for TCP connections ({}/{}), proceeding",
|
||||||
|
count + 1,
|
||||||
|
conn_count,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let extra_streams = {
|
||||||
|
let mut map = sessions.lock().await;
|
||||||
|
map.remove(&session_token)
|
||||||
|
.map(|s| s.streams)
|
||||||
|
.unwrap_or_default()
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"TCP multi-connection: starting with {} total streams",
|
||||||
|
1 + extra_streams.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Run test - primary stream handles data, extras provide parallel TCP bandwidth
|
||||||
|
// For now just use the primary; extras keep the connection alive
|
||||||
|
let _extra_keepalive = extra_streams;
|
||||||
|
run_tcp_test_server(stream, cmd).await
|
||||||
} else {
|
} else {
|
||||||
run_tcp_test_server(stream, cmd).await
|
run_tcp_test_server(stream, cmd).await
|
||||||
}
|
}
|
||||||
@@ -197,7 +368,23 @@ async fn run_udp_test_server(
|
|||||||
let udp = UdpSocket::bind(format!("0.0.0.0:{}", server_udp_port)).await?;
|
let udp = UdpSocket::bind(format!("0.0.0.0:{}", server_udp_port)).await?;
|
||||||
let client_udp_addr: SocketAddr =
|
let client_udp_addr: SocketAddr =
|
||||||
format!("{}:{}", peer.ip(), client_udp_port).parse().unwrap();
|
format!("{}:{}", peer.ip(), client_udp_port).parse().unwrap();
|
||||||
|
|
||||||
|
// When connection_count > 1, MikroTik sends UDP from MULTIPLE source ports
|
||||||
|
// (base_port, base_port+1, ..., base_port+N-1) all to our single server port.
|
||||||
|
// A connect()'d UDP socket only accepts from the one connected address,
|
||||||
|
// silently dropping packets from the other ports.
|
||||||
|
// So: only connect() for single-connection mode (enables send() without addr).
|
||||||
|
// For multi-connection, we leave the socket unconnected and use send_to()/recv_from().
|
||||||
|
let multi_conn = cmd.tcp_conn_count > 0;
|
||||||
|
if !multi_conn {
|
||||||
udp.connect(client_udp_addr).await?;
|
udp.connect(client_udp_addr).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"UDP mode: conn_count={}, socket={}",
|
||||||
|
cmd.tcp_conn_count.max(1),
|
||||||
|
if multi_conn { "unconnected (multi-port RX)" } else { "connected" },
|
||||||
|
);
|
||||||
|
|
||||||
let state = BandwidthState::new();
|
let state = BandwidthState::new();
|
||||||
let tx_size = cmd.tx_size as usize;
|
let tx_size = cmd.tx_size as usize;
|
||||||
@@ -209,9 +396,11 @@ async fn run_udp_test_server(
|
|||||||
|
|
||||||
let state_tx = state.clone();
|
let state_tx = state.clone();
|
||||||
let udp_tx = udp.clone();
|
let udp_tx = udp.clone();
|
||||||
|
let tx_target = client_udp_addr;
|
||||||
|
let is_multi = multi_conn;
|
||||||
let tx_handle = if server_should_tx {
|
let tx_handle = if server_should_tx {
|
||||||
Some(tokio::spawn(async move {
|
Some(tokio::spawn(async move {
|
||||||
udp_tx_loop(&udp_tx, tx_size, tx_speed, state_tx).await
|
udp_tx_loop(&udp_tx, tx_size, tx_speed, state_tx, is_multi, tx_target).await
|
||||||
}))
|
}))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -241,6 +430,8 @@ async fn udp_tx_loop(
|
|||||||
tx_size: usize,
|
tx_size: usize,
|
||||||
initial_tx_speed: u32,
|
initial_tx_speed: u32,
|
||||||
state: Arc<BandwidthState>,
|
state: Arc<BandwidthState>,
|
||||||
|
multi_conn: bool,
|
||||||
|
target: SocketAddr,
|
||||||
) {
|
) {
|
||||||
let mut seq: u32 = 0;
|
let mut seq: u32 = 0;
|
||||||
let mut packet = vec![0u8; tx_size];
|
let mut packet = vec![0u8; tx_size];
|
||||||
@@ -251,7 +442,12 @@ async fn udp_tx_loop(
|
|||||||
while state.running.load(Ordering::Relaxed) {
|
while state.running.load(Ordering::Relaxed) {
|
||||||
packet[0..4].copy_from_slice(&seq.to_be_bytes());
|
packet[0..4].copy_from_slice(&seq.to_be_bytes());
|
||||||
|
|
||||||
match socket.send(&packet).await {
|
let result = if multi_conn {
|
||||||
|
socket.send_to(&packet, target).await
|
||||||
|
} else {
|
||||||
|
socket.send(&packet).await
|
||||||
|
};
|
||||||
|
match result {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
seq = seq.wrapping_add(1);
|
seq = seq.wrapping_add(1);
|
||||||
state.tx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
state.tx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
||||||
@@ -263,7 +459,6 @@ async fn udp_tx_loop(
|
|||||||
tracing::warn!("UDP TX: too many consecutive send errors, stopping");
|
tracing::warn!("UDP TX: too many consecutive send errors, stopping");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// Back off on ENOBUFS/EAGAIN
|
|
||||||
tokio::time::sleep(Duration::from_micros(200)).await;
|
tokio::time::sleep(Duration::from_micros(200)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -302,8 +497,10 @@ async fn udp_rx_loop(socket: &UdpSocket, state: Arc<BandwidthState>) {
|
|||||||
let mut last_seq: Option<u32> = None;
|
let mut last_seq: Option<u32> = None;
|
||||||
|
|
||||||
while state.running.load(Ordering::Relaxed) {
|
while state.running.load(Ordering::Relaxed) {
|
||||||
match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await {
|
// Use recv_from to accept packets from any source port
|
||||||
Ok(Ok(n)) if n >= 4 => {
|
// (multi-connection MikroTik sends from multiple ports)
|
||||||
|
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
|
||||||
|
Ok(Ok((n, _src))) if n >= 4 => {
|
||||||
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed);
|
||||||
state.rx_packets.fetch_add(1, Ordering::Relaxed);
|
state.rx_packets.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user