Key findings: - btest EC-SRP5 uses [len][payload] framing (NO 0x06 handler byte) - Winbox uses [len][0x06][payload] — that one byte was the difference - Crypto is identical: Curve25519 Weierstrass, SHA256, SRP-like key exchange - Python prototype successfully authenticates against MikroTik RouterOS 7.x Files: - docs/ecsrp5-research.md: complete protocol spec, captured exchange, impl plan - proto-test/btest_ecsrp5_client.py: working Python EC-SRP5 btest client - proto-test/btest_mitm.py: MITM proxy used to discover the framing - proto-test/elliptic_curves.py: Curve25519 Weierstrass (from MarginResearch) Based on MarginResearch/mikrotik_authentication (MIT License). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
236 lines
8.1 KiB
Python
236 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
btest EC-SRP5 authentication test client.
|
|
Connects to a MikroTik btest server (RouterOS >= 6.43) and performs
|
|
the EC-SRP5 handshake to authenticate.
|
|
|
|
Usage:
|
|
python3 btest_ecsrp5_client.py -a 172.16.81.1 -u admin -p password
|
|
python3 btest_ecsrp5_client.py -a 172.16.81.1 -u admin -p password --receive
|
|
"""
|
|
import socket
|
|
import secrets
|
|
import hashlib
|
|
import argparse
|
|
import struct
|
|
import sys
|
|
import time
|
|
|
|
import elliptic_curves
|
|
|
|
BTEST_PORT = 2000
|
|
|
|
|
|
def sha256(data: bytes) -> bytes:
|
|
return hashlib.sha256(data).digest()
|
|
|
|
|
|
def hexdump(label: str, data: bytes):
|
|
print(f" {label} ({len(data)} bytes): {data.hex()}")
|
|
|
|
|
|
class BtestECSRP5Client:
|
|
def __init__(self, host: str, port: int = BTEST_PORT):
|
|
self.host = host
|
|
self.port = port
|
|
self.w = elliptic_curves.WCurve()
|
|
self.sock = None
|
|
|
|
def connect(self):
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.settimeout(10)
|
|
self.sock.connect((self.host, self.port))
|
|
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
print(f"Connected to {self.host}:{self.port}")
|
|
|
|
def recv_exact(self, n: int) -> bytes:
|
|
buf = b""
|
|
while len(buf) < n:
|
|
chunk = self.sock.recv(n - len(buf))
|
|
if not chunk:
|
|
raise ConnectionError("Connection closed")
|
|
buf += chunk
|
|
return buf
|
|
|
|
def do_hello_and_command(self, proto=1, direction=2, conn_count=0, tx_size=0x8000):
|
|
"""Send command, return auth response type."""
|
|
# Receive HELLO
|
|
hello = self.recv_exact(4)
|
|
hexdump("HELLO", hello)
|
|
assert hello == b"\x01\x00\x00\x00", f"Bad HELLO: {hello.hex()}"
|
|
|
|
# Send command
|
|
cmd = struct.pack("<BBBBHHII",
|
|
proto, direction, 0, conn_count,
|
|
tx_size, 0, 0, 0)
|
|
hexdump("CMD", cmd)
|
|
self.sock.sendall(cmd)
|
|
|
|
# Receive auth response
|
|
resp = self.recv_exact(4)
|
|
hexdump("AUTH_RESP", resp)
|
|
return resp
|
|
|
|
def do_ecsrp5_auth(self, username: str, password: str):
|
|
"""Perform EC-SRP5 authentication after receiving 03 00 00 00."""
|
|
print("\n=== EC-SRP5 Authentication ===")
|
|
|
|
# Step 1: Generate client ephemeral keypair
|
|
s_a = secrets.token_bytes(32)
|
|
x_w_a, x_w_a_parity = self.w.gen_public_key(s_a)
|
|
print(f" Client private: {s_a.hex()}")
|
|
hexdump("Client pubkey (x_w_a)", x_w_a)
|
|
print(f" Client parity: {x_w_a_parity}")
|
|
|
|
# Step 2: Send client public key + username
|
|
# btest format: [len][username\0][pubkey:32][parity:1] (NO 0x06 handler byte)
|
|
payload = username.encode("utf-8") + b"\x00" + x_w_a + bytes([x_w_a_parity])
|
|
msg = bytes([len(payload)]) + payload
|
|
hexdump("MSG1 (client pubkey)", msg)
|
|
self.sock.sendall(msg)
|
|
|
|
# Step 3: Receive server response
|
|
# btest format: [len][server_pubkey:32][parity:1][salt:16] (NO 0x06)
|
|
resp = self.sock.recv(1024)
|
|
hexdump("MSG2 (server challenge)", resp)
|
|
|
|
if len(resp) < 2:
|
|
print("ERROR: Empty server response")
|
|
return False
|
|
|
|
resp_len = resp[0]
|
|
resp_data = resp[1:]
|
|
if len(resp_data) != resp_len:
|
|
print(f"WARNING: Expected {resp_len} bytes, got {len(resp_data)}")
|
|
|
|
x_w_b = resp_data[:32]
|
|
x_w_b_parity = resp_data[32]
|
|
salt = resp_data[33:]
|
|
|
|
hexdump("Server pubkey (x_w_b)", x_w_b)
|
|
print(f" Server parity: {x_w_b_parity}")
|
|
hexdump("Salt", salt)
|
|
|
|
if len(salt) != 16:
|
|
print(f"ERROR: Expected 16-byte salt, got {len(salt)}")
|
|
return False
|
|
|
|
# Step 4: Compute shared secret (ECPESVDP-SRP-A)
|
|
i = self.w.gen_password_validator_priv(username, password, salt)
|
|
x_gamma, gamma_parity = self.w.gen_public_key(i)
|
|
v = self.w.redp1(x_gamma, 1) # password verifier point
|
|
|
|
# Recover server's actual public point (undo verifier blinding)
|
|
w_b = self.w.lift_x(int.from_bytes(x_w_b, "big"), x_w_b_parity)
|
|
w_b = w_b + v # w_b + V
|
|
|
|
# Compute combined hash
|
|
j = sha256(x_w_a + x_w_b)
|
|
|
|
# Compute combined scalar: (i * j + s_a) mod r
|
|
scalar = int.from_bytes(i, "big") * int.from_bytes(j, "big")
|
|
scalar += int.from_bytes(s_a, "big")
|
|
scalar = self.w.finite_field_value(scalar)
|
|
|
|
# Shared secret point
|
|
Z = scalar * w_b
|
|
z, _ = self.w.to_montgomery(Z)
|
|
|
|
hexdump("Shared secret (z)", z)
|
|
|
|
# Step 5: Send client confirmation code
|
|
# btest format: [len][cc:32] (NO 0x06)
|
|
client_cc = sha256(j + z)
|
|
msg3 = bytes([len(client_cc)]) + client_cc
|
|
hexdump("MSG3 (client proof)", msg3)
|
|
self.sock.sendall(msg3)
|
|
|
|
# Step 6: Receive server confirmation
|
|
resp2 = self.sock.recv(1024)
|
|
hexdump("MSG4 (server proof)", resp2)
|
|
|
|
if len(resp2) < 2:
|
|
print("ERROR: Invalid server proof response")
|
|
return False
|
|
|
|
# btest format: [len][sc:32] (NO 0x06)
|
|
server_cc_received = resp2[1:]
|
|
server_cc_expected = sha256(j + client_cc + z)
|
|
|
|
if server_cc_received == server_cc_expected:
|
|
print("\n=== EC-SRP5 Authentication SUCCESSFUL ===")
|
|
return True
|
|
else:
|
|
print("\n=== EC-SRP5 Authentication FAILED ===")
|
|
hexdump("Expected server_cc", server_cc_expected)
|
|
hexdump("Received server_cc", server_cc_received)
|
|
return False
|
|
|
|
def run_test(self, username: str, password: str, direction: str = "receive"):
|
|
self.connect()
|
|
|
|
# Direction from server perspective:
|
|
# 0x01 = server RX (client TX / "send")
|
|
# 0x02 = server TX (client RX / "receive")
|
|
dir_byte = 0x02 if direction == "receive" else 0x01
|
|
|
|
resp = self.do_hello_and_command(proto=1, direction=dir_byte)
|
|
|
|
if resp == b"\x01\x00\x00\x00":
|
|
print("No auth required - server accepted without authentication")
|
|
return True
|
|
elif resp == b"\x02\x00\x00\x00":
|
|
print("MD5 auth required (old RouterOS)")
|
|
return False
|
|
elif resp == b"\x03\x00\x00\x00":
|
|
print("EC-SRP5 auth required (RouterOS >= 6.43)")
|
|
return self.do_ecsrp5_auth(username, password)
|
|
else:
|
|
print(f"Unknown auth response: {resp.hex()}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="btest EC-SRP5 auth test client")
|
|
parser.add_argument("-a", "--address", required=True, help="MikroTik IP address")
|
|
parser.add_argument("-u", "--username", default="admin", help="Username")
|
|
parser.add_argument("-p", "--password", default="", help="Password")
|
|
parser.add_argument("-P", "--port", type=int, default=BTEST_PORT, help="Port")
|
|
parser.add_argument("--receive", action="store_true", help="Test receive direction")
|
|
args = parser.parse_args()
|
|
|
|
direction = "receive" if args.receive else "receive"
|
|
|
|
client = BtestECSRP5Client(args.address, args.port)
|
|
try:
|
|
success = client.run_test(args.username, args.password, direction)
|
|
if success:
|
|
print("\nAuth passed! Reading some data...")
|
|
try:
|
|
data = client.sock.recv(4096)
|
|
hexdump("First data received", data[:64])
|
|
print(f" (total {len(data)} bytes)")
|
|
|
|
# Read for a few seconds to confirm data flows
|
|
client.sock.settimeout(2)
|
|
total = len(data)
|
|
for _ in range(5):
|
|
try:
|
|
data = client.sock.recv(65536)
|
|
total += len(data)
|
|
except socket.timeout:
|
|
break
|
|
print(f"\nTotal received: {total} bytes ({total * 8 / 1_000_000:.2f} Mbps over ~2s)")
|
|
except Exception as e:
|
|
print(f"Data read error: {e}")
|
|
sys.exit(0 if success else 1)
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|