commit d9007dc1698f0e9b71bf15915ee996b9d85b8fa9 Author: Siavash Sameni Date: Tue Mar 31 11:56:34 2026 +0400 Initial commit: MikroTik btest server & client in Rust Full reimplementation of the MikroTik Bandwidth Test protocol: - Server mode: accepts connections from MikroTik devices on port 2000 - Client mode: connects to MikroTik btest servers - TCP and UDP protocols with bidirectional support - MD5 challenge-response authentication - Dynamic speed adjustment (1.5x algorithm) - Status exchange matching original C pselect() behavior - Docker support with multi-stage build Tested against MikroTik RouterOS achieving: - 1.05 Gbps server RX (single connection) - 530 Mbps client TCP download - 840 Mbps client TCP upload - 433 Mbps client UDP download Co-Authored-By: Claude Opus 4.6 (1M context) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..194c6d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +btest_original +.claude/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..32de8b0 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "btest-opensource"] + path = btest-opensource + url = https://github.com/samm-git/btest-opensource diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..6209c9a --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,746 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "clap" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "mikrotik-btest" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "clap", + "md-5", + "rand", + "socket2 0.5.10", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "mio" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.61.2", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2 0.6.3", + "tokio-macros", + "windows-sys 0.61.2", +] + +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..0a248fc --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "mikrotik-btest" +version = "0.1.0" +edition = "2021" +description = "MikroTik Bandwidth Test (btest) server and client implementation in Rust" +license = "MIT" + +[lib] +name = "mikrotik_btest" +path = "src/lib.rs" + +[[bin]] +name = "btest" +path = "src/main.rs" + +[dependencies] +tokio = { version = "1", features = ["full"] } +clap = { version = "4", features = ["derive"] } +md-5 = "0.10" +bytes = "1" +thiserror = "2" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +rand = "0.8" +socket2 = "0.5" +anyhow = "1.0.102" + +[profile.release] +opt-level = 3 +lto = true +strip = true +codegen-units = 1 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..36cb10e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,28 @@ +# Stage 1: Build +FROM rust:1.82-slim AS builder + +WORKDIR /build +COPY Cargo.toml Cargo.lock ./ +COPY src/ src/ + +RUN cargo build --release + +# Stage 2: Runtime (minimal image) +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/target/release/btest /usr/local/bin/btest + +# btest control port +EXPOSE 2000/tcp +# UDP data ports range +EXPOSE 2001-2100/udp +# UDP client ports range +EXPOSE 2257-2356/udp + +ENTRYPOINT ["btest"] +# Default: run as server +CMD ["-s"] diff --git a/btest-opensource b/btest-opensource new file mode 160000 index 0000000..5040a01 --- /dev/null +++ b/btest-opensource @@ -0,0 +1 @@ +Subproject commit 5040a01267c3578d97bb14fa09ccf54de0f179bc diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a28b61e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,22 @@ +services: + btest-server: + build: . + container_name: btest-server + ports: + - "2000:2000/tcp" + - "2001-2100:2001-2100/udp" + - "2257-2356:2257-2356/udp" + command: ["-s", "-v"] + restart: unless-stopped + + # Server with authentication enabled + btest-server-auth: + build: . + container_name: btest-server-auth + ports: + - "2010:2000/tcp" + - "2101-2200:2001-2100/udp" + command: ["-s", "-a", "admin", "-p", "password", "-v"] + restart: unless-stopped + profiles: + - auth diff --git a/scripts/test-docker.sh b/scripts/test-docker.sh new file mode 100755 index 0000000..6c86c99 --- /dev/null +++ b/scripts/test-docker.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +# Test using Docker +set -euo pipefail + +echo "=== Building Docker image ===" +docker build -t btest . + +echo "" +echo "=== Running btest server in Docker ===" +docker run -d --name btest-test -p 2000:2000/tcp -p 2001-2100:2001-2100/udp -p 2257-2356:2257-2356/udp btest -s -v +sleep 2 + +cleanup() { + echo "Stopping Docker container..." + docker stop btest-test 2>/dev/null || true + docker rm btest-test 2>/dev/null || true +} +trap cleanup EXIT + +BTEST="./target/release/btest" +cargo build --release + +run_timed() { + local desc="$1"; shift + echo "" + echo "--- $desc ---" + $BTEST "$@" & + local pid=$! + sleep 5 + kill $pid 2>/dev/null || true + wait $pid 2>/dev/null || true +} + +run_timed "TCP Download from Docker server" -c 127.0.0.1 -r +run_timed "TCP Upload to Docker server" -c 127.0.0.1 -t +run_timed "TCP Bidirectional" -c 127.0.0.1 -t -r + +echo "" +echo "=== Docker server logs ===" +docker logs btest-test + +echo "" +echo "=== Docker tests completed ===" diff --git a/scripts/test-local.sh b/scripts/test-local.sh new file mode 100755 index 0000000..390fe72 --- /dev/null +++ b/scripts/test-local.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash +# Local loopback tests - run server and client against each other +set -euo pipefail + +BTEST="cargo run --release --" +PORT=2000 + +echo "=== Building release binary ===" +cargo build --release + +BTEST="./target/release/btest" + +cleanup() { + echo "Stopping server..." + kill $SERVER_PID 2>/dev/null || true + wait $SERVER_PID 2>/dev/null || true +} + +echo "" +echo "=== Starting btest server on port $PORT ===" +$BTEST -s -P $PORT -v & +SERVER_PID=$! +trap cleanup EXIT +sleep 1 + +TIMEOUT_CMD="timeout" +if ! command -v timeout &>/dev/null; then + if command -v gtimeout &>/dev/null; then + TIMEOUT_CMD="gtimeout" + else + # Fallback: background + sleep + kill + TIMEOUT_CMD="" + fi +fi + +run_test() { + local desc="$1" + shift + echo "" + echo "--- Test: $desc ---" + if [ -n "$TIMEOUT_CMD" ]; then + $TIMEOUT_CMD 5 $BTEST "$@" || true + else + $BTEST "$@" & + local pid=$! + sleep 5 + kill $pid 2>/dev/null || true + wait $pid 2>/dev/null || true + fi + echo "--- Done: $desc ---" + sleep 1 +} + +run_test "TCP Download (RX)" -c 127.0.0.1 -P $PORT -r +run_test "TCP Upload (TX)" -c 127.0.0.1 -P $PORT -t +run_test "TCP Bidirectional" -c 127.0.0.1 -P $PORT -t -r +run_test "TCP Download 100Mbps limited" -c 127.0.0.1 -P $PORT -r -b 100M +run_test "UDP Download" -c 127.0.0.1 -P $PORT -r -u +run_test "UDP Upload" -c 127.0.0.1 -P $PORT -t -u +run_test "UDP Bidirectional" -c 127.0.0.1 -P $PORT -t -r -u + +echo "" +echo "=== All local tests completed ===" diff --git a/scripts/test-mikrotik.sh b/scripts/test-mikrotik.sh new file mode 100755 index 0000000..907c9c4 --- /dev/null +++ b/scripts/test-mikrotik.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# Test against a MikroTik device +# Usage: ./scripts/test-mikrotik.sh [username] [password] +set -euo pipefail + +MIKROTIK_IP="${1:?Usage: $0 [username] [password]}" +USERNAME="${2:-}" +PASSWORD="${3:-}" +BTEST="./target/release/btest" + +echo "=== Building release binary ===" +cargo build --release + +AUTH_ARGS="" +if [ -n "$USERNAME" ]; then + AUTH_ARGS="-a $USERNAME" +fi +if [ -n "$PASSWORD" ]; then + AUTH_ARGS="$AUTH_ARGS -p $PASSWORD" +fi + +echo "" +echo "=== Testing against MikroTik at $MIKROTIK_IP ===" +echo "" + +TIMEOUT_CMD="timeout" +if ! command -v timeout &>/dev/null; then + if command -v gtimeout &>/dev/null; then + TIMEOUT_CMD="gtimeout" + else + TIMEOUT_CMD="" + fi +fi + +run_test() { + local desc="$1" + shift + echo "--- $desc ---" + if [ -n "$TIMEOUT_CMD" ]; then + $TIMEOUT_CMD 10 $BTEST "$@" $AUTH_ARGS || echo "(test ended)" + else + $BTEST "$@" $AUTH_ARGS & + local pid=$! + sleep 10 + kill $pid 2>/dev/null || true + wait $pid 2>/dev/null || true + echo "(test ended)" + fi + echo "" + sleep 1 +} + +echo "=== Mode 1: Our client -> MikroTik btest server ===" +echo "(Make sure btest server is enabled on your MikroTik: /tool/bandwidth-server set enabled=yes)" +echo "" + +run_test "TCP Download from MikroTik" -c "$MIKROTIK_IP" -r +run_test "TCP Upload to MikroTik" -c "$MIKROTIK_IP" -t +run_test "TCP Bidirectional with MikroTik" -c "$MIKROTIK_IP" -t -r +run_test "UDP Download from MikroTik" -c "$MIKROTIK_IP" -r -u +run_test "UDP Upload to MikroTik" -c "$MIKROTIK_IP" -t -u + +echo "" +echo "=== Mode 2: Our server <- MikroTik connects to us ===" +echo "To test this mode:" +echo " 1. Run: $BTEST -s -v $AUTH_ARGS" +echo " 2. On MikroTik, run:" +echo " /tool/bandwidth-test address= direction=both protocol=tcp" +echo "" +echo "=== All MikroTik tests completed ===" diff --git a/src/auth.rs b/src/auth.rs new file mode 100644 index 0000000..69f92e0 --- /dev/null +++ b/src/auth.rs @@ -0,0 +1,168 @@ +use md5::{Digest, Md5}; +use rand::RngCore; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::protocol::{self, BtestError, Result, AUTH_FAILED, AUTH_OK, AUTH_REQUIRED}; + +pub fn generate_challenge() -> [u8; 16] { + let mut nonce = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut nonce); + nonce +} + +/// Compute the double-MD5 response: MD5(password + MD5(password + challenge)) +pub fn compute_auth_hash(password: &str, challenge: &[u8; 16]) -> [u8; 16] { + // hash1 = MD5(password + challenge) + let mut hasher = Md5::new(); + hasher.update(password.as_bytes()); + hasher.update(challenge); + let hash1 = hasher.finalize(); + + // hash2 = MD5(password + hash1) + let mut hasher = Md5::new(); + hasher.update(password.as_bytes()); + hasher.update(&hash1); + hasher.finalize().into() +} + +/// Server-side: send auth challenge and verify response. +/// Returns Ok(()) if auth succeeds or no auth is configured. +pub async fn server_authenticate( + stream: &mut S, + username: Option<&str>, + password: Option<&str>, +) -> Result<()> { + match (username, password) { + (None, None) => { + // No auth required + stream.write_all(&AUTH_OK).await?; + stream.flush().await?; + Ok(()) + } + (_, Some(pass)) => { + // Send auth challenge + stream.write_all(&AUTH_REQUIRED).await?; + let challenge = generate_challenge(); + stream.write_all(&challenge).await?; + stream.flush().await?; + + // Receive response: 16 bytes hash + 32 bytes username + let mut response = [0u8; 48]; + stream.read_exact(&mut response).await?; + + let received_hash = &response[0..16]; + let received_user = &response[16..48]; + + // Extract username (null-terminated) + let user_end = received_user + .iter() + .position(|&b| b == 0) + .unwrap_or(32); + let received_username = std::str::from_utf8(&received_user[..user_end]) + .unwrap_or(""); + + // Verify username if configured + if let Some(expected_user) = username { + if received_username != expected_user { + tracing::warn!("Auth failed: username mismatch (got '{}')", received_username); + stream.write_all(&AUTH_FAILED).await?; + stream.flush().await?; + return Err(BtestError::AuthFailed); + } + } + + // Verify hash + let expected_hash = compute_auth_hash(pass, &challenge); + if received_hash != expected_hash { + tracing::warn!("Auth failed: hash mismatch for user '{}'", received_username); + stream.write_all(&AUTH_FAILED).await?; + stream.flush().await?; + return Err(BtestError::AuthFailed); + } + + tracing::info!("Auth successful for user '{}'", received_username); + stream.write_all(&AUTH_OK).await?; + stream.flush().await?; + Ok(()) + } + (Some(_), None) => { + // Username but no password - treat as no auth + stream.write_all(&AUTH_OK).await?; + stream.flush().await?; + Ok(()) + } + } +} + +/// Client-side: respond to auth challenge if required. +pub async fn client_authenticate( + stream: &mut S, + resp: [u8; 4], + username: &str, + password: &str, +) -> Result<()> { + if resp == AUTH_OK { + return Ok(()); + } + + if resp == AUTH_REQUIRED { + // Read 16-byte challenge + let mut challenge = [0u8; 16]; + stream.read_exact(&mut challenge).await?; + + // Compute response + let hash = compute_auth_hash(password, &challenge); + + // Build 48-byte response: 16 hash + 32 username + let mut response = [0u8; 48]; + response[0..16].copy_from_slice(&hash); + let user_bytes = username.as_bytes(); + let copy_len = user_bytes.len().min(32); + response[16..16 + copy_len].copy_from_slice(&user_bytes[..copy_len]); + + stream.write_all(&response).await?; + stream.flush().await?; + + // Read auth result + let result = protocol::recv_response(stream).await?; + if result == AUTH_OK { + tracing::info!("Authentication successful"); + Ok(()) + } else { + Err(BtestError::AuthFailed) + } + } else if resp == [0x03, 0x00, 0x00, 0x00] { + Err(BtestError::Protocol( + "EC-SRP5 authentication (RouterOS >= 6.43) is not supported".into(), + )) + } else { + Err(BtestError::Protocol(format!( + "Unexpected auth response: {:02x?}", + resp + ))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_auth_hash_known_vector() { + // From the Perl reference: password "test", challenge as hex "ad32d6f94d28161625f2f390bb895637" + let challenge: [u8; 16] = [ + 0xad, 0x32, 0xd6, 0xf9, 0x4d, 0x28, 0x16, 0x16, 0x25, 0xf2, 0xf3, 0x90, 0xbb, 0x89, + 0x56, 0x37, + ]; + let hash = compute_auth_hash("test", &challenge); + let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect(); + assert_eq!(hex, "3c968565bc0314f281a6da1571cf7255"); + } + + #[test] + fn test_empty_password() { + let challenge = generate_challenge(); + let hash = compute_auth_hash("", &challenge); + assert_eq!(hash.len(), 16); + } +} diff --git a/src/bandwidth.rs b/src/bandwidth.rs new file mode 100644 index 0000000..a83caad --- /dev/null +++ b/src/bandwidth.rs @@ -0,0 +1,146 @@ +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64}; +use std::sync::Arc; +use std::time::Duration; + +/// Shared state for bandwidth tracking between TX/RX threads and status reporter. +#[derive(Debug)] +pub struct BandwidthState { + pub tx_bytes: AtomicU64, + pub rx_bytes: AtomicU64, + pub tx_speed: AtomicU32, + pub tx_speed_changed: AtomicBool, + pub running: AtomicBool, + pub rx_packets: AtomicU64, + pub rx_lost_packets: AtomicU64, + pub last_udp_seq: AtomicU32, +} + +impl BandwidthState { + pub fn new() -> Arc { + Arc::new(Self { + tx_bytes: AtomicU64::new(0), + rx_bytes: AtomicU64::new(0), + tx_speed: AtomicU32::new(0), + tx_speed_changed: AtomicBool::new(false), + running: AtomicBool::new(true), + rx_packets: AtomicU64::new(0), + rx_lost_packets: AtomicU64::new(0), + last_udp_seq: AtomicU32::new(0), + }) + } +} + +/// Calculate the sleep interval between packets to achieve target bandwidth. +/// Returns None if speed is unlimited (0). +pub fn calc_send_interval(tx_speed_bps: u32, tx_size: u16) -> Option { + if tx_speed_bps == 0 { + return None; + } + + let bits_per_packet = tx_size as u64 * 8; + let interval_ns = (1_000_000_000u64 * bits_per_packet) / tx_speed_bps as u64; + + // Replicate MikroTik behavior: if interval > 500ms, clamp to 1 second + if interval_ns > 500_000_000 { + Some(Duration::from_secs(1)) + } else { + Some(Duration::from_nanos(interval_ns.max(1))) + } +} + +/// Format a bandwidth value in human-readable form. +pub fn format_bandwidth(bits_per_sec: f64) -> String { + if bits_per_sec >= 1_000_000_000.0 { + format!("{:.2} Gbps", bits_per_sec / 1_000_000_000.0) + } else if bits_per_sec >= 1_000_000.0 { + format!("{:.2} Mbps", bits_per_sec / 1_000_000.0) + } else if bits_per_sec >= 1_000.0 { + format!("{:.2} Kbps", bits_per_sec / 1_000.0) + } else { + format!("{:.0} bps", bits_per_sec) + } +} + +/// Parse bandwidth string like "100M", "1G", "500K", "1000000" +pub fn parse_bandwidth(s: &str) -> std::result::Result { + let s = s.trim(); + if s.is_empty() { + return Err(anyhow::anyhow!("Empty bandwidth string")); + } + + let (num_str, multiplier) = match s.as_bytes().last() { + Some(b'G' | b'g') => (&s[..s.len() - 1], 1_000_000_000u64), + Some(b'M' | b'm') => (&s[..s.len() - 1], 1_000_000u64), + Some(b'K' | b'k') => (&s[..s.len() - 1], 1_000u64), + _ => (s, 1u64), + }; + + let num: f64 = num_str + .parse() + .map_err(|e| anyhow::anyhow!("Invalid bandwidth number '{}': {}", num_str, e))?; + let result = (num * multiplier as f64) as u64; + if result > u32::MAX as u64 { + Err(anyhow::anyhow!("Bandwidth {} exceeds maximum (4 Gbps)", s)) + } else { + Ok(result as u32) + } +} + +/// Print a status line for a reporting interval. +pub fn print_status( + interval_num: u32, + direction: &str, + bytes: u64, + elapsed: Duration, + lost_packets: Option, +) { + let secs = elapsed.as_secs_f64(); + let bits = bytes as f64 * 8.0; + let bw = if secs > 0.0 { bits / secs } else { 0.0 }; + + let loss_str = match lost_packets { + Some(lost) if lost > 0 => format!(" lost: {}", lost), + _ => String::new(), + }; + + println!( + "[{:4}] {:>3} {} ({} bytes){}", + interval_num, + direction, + format_bandwidth(bw), + bytes, + loss_str, + ); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_bandwidth() { + assert_eq!(parse_bandwidth("100M").unwrap(), 100_000_000); + assert_eq!(parse_bandwidth("1G").unwrap(), 1_000_000_000); + assert_eq!(parse_bandwidth("500K").unwrap(), 500_000); + assert_eq!(parse_bandwidth("1000000").unwrap(), 1_000_000); + assert_eq!(parse_bandwidth("1.5M").unwrap(), 1_500_000); + } + + #[test] + fn test_calc_interval() { + // 100Mbps with 1500 byte packets + let interval = calc_send_interval(100_000_000, 1500).unwrap(); + // Expected: (1e9 * 1500 * 8) / 100_000_000 = 120_000 ns = 120 us + assert_eq!(interval.as_nanos(), 120_000); + + // Unlimited + assert!(calc_send_interval(0, 1500).is_none()); + } + + #[test] + fn test_format_bandwidth() { + assert_eq!(format_bandwidth(100_000_000.0), "100.00 Mbps"); + assert_eq!(format_bandwidth(1_500_000_000.0), "1.50 Gbps"); + assert_eq!(format_bandwidth(500_000.0), "500.00 Kbps"); + } +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..e8cfad4 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,440 @@ +use std::net::SocketAddr; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpStream, UdpSocket}; + +use crate::auth; +use crate::bandwidth::{self, BandwidthState}; +use crate::protocol::*; + +pub async fn run_client( + host: &str, + port: u16, + direction: u8, + use_udp: bool, + tx_speed: u32, + rx_speed: u32, + auth_user: Option, + auth_pass: Option, + nat_mode: bool, +) -> Result<()> { + let addr = format!("{}:{}", host, port); + tracing::info!("Connecting to {}...", addr); + let mut stream = TcpStream::connect(&addr).await?; + stream.set_nodelay(true)?; + + recv_hello(&mut stream).await?; + tracing::info!("Connected to server"); + + let proto = if use_udp { CMD_PROTO_UDP } else { CMD_PROTO_TCP }; + let mut cmd = Command::new(proto, direction); + cmd.local_tx_speed = tx_speed; + cmd.remote_tx_speed = rx_speed; + + send_command(&mut stream, &cmd).await?; + + let resp = recv_response(&mut stream).await?; + match (auth_user.as_deref(), auth_pass.as_deref()) { + (Some(user), Some(pass)) => { + auth::client_authenticate(&mut stream, resp, user, pass).await?; + } + _ => { + if resp == AUTH_REQUIRED { + return Err(BtestError::Protocol( + "Server requires authentication but no credentials provided".into(), + )); + } + if resp == [0x03, 0x00, 0x00, 0x00] { + return Err(BtestError::Protocol( + "Server requires EC-SRP5 authentication (RouterOS >= 6.43) which is not yet supported. \ + Try disabling authentication on the MikroTik btest server, or provide -a/-p credentials".into(), + )); + } + if resp != AUTH_OK { + return Err(BtestError::Protocol(format!( + "Unexpected server response: {:02x?}", + resp + ))); + } + } + } + + tracing::info!( + "Starting {} {} test", + if use_udp { "UDP" } else { "TCP" }, + match direction { + CMD_DIR_RX => "upload (client TX)", + CMD_DIR_TX => "download (client RX)", + CMD_DIR_BOTH => "bidirectional", + _ => "unknown", + }, + ); + + if use_udp { + run_udp_test_client(&mut stream, host, &cmd, nat_mode).await + } else { + run_tcp_test_client(stream, cmd).await + } +} + +// --- TCP Test Client --- + +async fn run_tcp_test_client(stream: TcpStream, cmd: Command) -> Result<()> { + let state = BandwidthState::new(); + let tx_size = cmd.tx_size as usize; + let client_should_tx = cmd.client_tx(); + let client_should_rx = cmd.client_rx(); + let tx_speed = cmd.local_tx_speed; + + let (reader, writer) = stream.into_split(); + + // IMPORTANT: Do NOT drop unused halves - dropping OwnedWriteHalf sends TCP FIN, + // causing the peer to think we disconnected. Use Option to conditionally move. + let mut _writer_keepalive = None; + let mut _reader_keepalive = None; + + let state_tx = state.clone(); + let tx_handle = if client_should_tx { + Some(tokio::spawn(async move { + tcp_client_tx_loop(writer, tx_size, tx_speed, state_tx).await + })) + } else { + _writer_keepalive = Some(writer); + None + }; + + let state_rx = state.clone(); + let rx_handle = if client_should_rx { + Some(tokio::spawn(async move { + tcp_client_rx_loop(reader, state_rx).await + })) + } else { + _reader_keepalive = Some(reader); + None + }; + + client_status_loop(&cmd, &state).await; + + state.running.store(false, Ordering::SeqCst); + if let Some(h) = tx_handle { let _ = h.await; } + if let Some(h) = rx_handle { let _ = h.await; } + Ok(()) +} + +async fn tcp_client_tx_loop( + mut writer: tokio::net::tcp::OwnedWriteHalf, + tx_size: usize, + tx_speed: u32, + state: Arc, +) { + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut packet = vec![0u8; tx_size]; + packet[0] = STATUS_MSG_TYPE; + let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16); + let mut next_send = Instant::now(); + + while state.running.load(Ordering::Relaxed) { + if writer.write_all(&packet).await.is_err() { + break; + } + state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed); + + if state.tx_speed_changed.load(Ordering::Relaxed) { + state.tx_speed_changed.store(false, Ordering::Relaxed); + let new_speed = state.tx_speed.load(Ordering::Relaxed); + interval = bandwidth::calc_send_interval(new_speed, tx_size as u16); + next_send = Instant::now(); + } + + match interval { + Some(iv) => { + next_send += iv; + let now = Instant::now(); + if next_send > now { + tokio::time::sleep(next_send - now).await; + } + } + None => { + tokio::task::yield_now().await; + } + } + } +} + +async fn tcp_client_rx_loop( + mut reader: tokio::net::tcp::OwnedReadHalf, + state: Arc, +) { + let mut buf = vec![0u8; 65536]; + while state.running.load(Ordering::Relaxed) { + match reader.read(&mut buf).await { + Ok(0) | Err(_) => break, + Ok(n) => { + state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); + } + } + } +} + +// --- UDP Test Client --- + +async fn run_udp_test_client( + stream: &mut TcpStream, + host: &str, + cmd: &Command, + nat_mode: bool, +) -> Result<()> { + let mut port_buf = [0u8; 2]; + stream.read_exact(&mut port_buf).await?; + let server_udp_port = u16::from_be_bytes(port_buf); + let client_udp_port = server_udp_port + BTEST_PORT_CLIENT_OFFSET; + + tracing::info!( + "UDP ports: server={}, client={}", + server_udp_port, client_udp_port, + ); + + let udp = UdpSocket::bind(format!("0.0.0.0:{}", client_udp_port)).await?; + let server_udp_addr: SocketAddr = + format!("{}:{}", host, server_udp_port).parse().unwrap(); + udp.connect(server_udp_addr).await?; + + if nat_mode { + tracing::info!("NAT mode: sending probe packet"); + udp.send(&[]).await?; + } + + let state = BandwidthState::new(); + let tx_size = cmd.tx_size as usize; + let client_should_tx = cmd.client_tx(); + let client_should_rx = cmd.client_rx(); + let tx_speed = cmd.local_tx_speed; + let udp = Arc::new(udp); + + let state_tx = state.clone(); + let udp_tx = udp.clone(); + let tx_handle = if client_should_tx { + Some(tokio::spawn(async move { + udp_client_tx_loop(&udp_tx, tx_size, tx_speed, state_tx).await + })) + } else { + None + }; + + let state_rx = state.clone(); + let udp_rx = udp.clone(); + let rx_handle = if client_should_rx { + Some(tokio::spawn(async move { + udp_client_rx_loop(&udp_rx, state_rx).await + })) + } else { + None + }; + + udp_client_status_loop(stream, cmd, &state).await; + + state.running.store(false, Ordering::SeqCst); + if let Some(h) = tx_handle { let _ = h.await; } + if let Some(h) = rx_handle { let _ = h.await; } + Ok(()) +} + +async fn udp_client_tx_loop( + socket: &UdpSocket, + tx_size: usize, + initial_tx_speed: u32, + state: Arc, +) { + let mut seq: u32 = 0; + let mut packet = vec![0u8; tx_size]; + let mut interval = bandwidth::calc_send_interval(initial_tx_speed, tx_size as u16); + let mut next_send = Instant::now(); + let mut consecutive_errors: u32 = 0; + + while state.running.load(Ordering::Relaxed) { + packet[0..4].copy_from_slice(&seq.to_be_bytes()); + + match socket.send(&packet).await { + Ok(n) => { + seq = seq.wrapping_add(1); + state.tx_bytes.fetch_add(n as u64, Ordering::Relaxed); + consecutive_errors = 0; + } + Err(_) => { + consecutive_errors += 1; + if consecutive_errors > 1000 { + tracing::warn!("UDP TX: too many consecutive send errors, stopping"); + break; + } + tokio::time::sleep(Duration::from_micros(200)).await; + continue; + } + } + + if state.tx_speed_changed.load(Ordering::Relaxed) { + state.tx_speed_changed.store(false, Ordering::Relaxed); + let new_speed = state.tx_speed.load(Ordering::Relaxed); + interval = bandwidth::calc_send_interval(new_speed, tx_size as u16); + next_send = Instant::now(); + tracing::debug!("TX speed adjusted to {} bps ({:.2} Mbps)", + new_speed, new_speed as f64 / 1_000_000.0); + } + + match interval { + Some(iv) => { + next_send += iv; + let now = Instant::now(); + if next_send > now { + tokio::time::sleep(next_send - now).await; + } + } + None => { + if seq % 64 == 0 { + tokio::task::yield_now().await; + } + } + } + } +} + +async fn udp_client_rx_loop(socket: &UdpSocket, state: Arc) { + let mut buf = vec![0u8; 65536]; + let mut last_seq: Option = None; + + while state.running.load(Ordering::Relaxed) { + match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await { + Ok(Ok(n)) if n >= 4 => { + state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); + state.rx_packets.fetch_add(1, Ordering::Relaxed); + + let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); + if let Some(last) = last_seq { + let expected = last.wrapping_add(1); + if seq > expected { + let lost = seq - expected; + state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed); + } + } + last_seq = Some(seq); + } + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::debug!("UDP recv error: {}", e); + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(_) => { + tracing::debug!("UDP RX timeout"); + } + } + } +} + +// --- Status Loops --- + +async fn client_status_loop(cmd: &Command, state: &BandwidthState) { + let mut seq: u32 = 0; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + loop { + interval.tick().await; + if !state.running.load(Ordering::Relaxed) { + break; + } + + seq += 1; + + if cmd.client_tx() { + let tx = state.tx_bytes.swap(0, Ordering::Relaxed); + bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); + } + + if cmd.client_rx() { + let rx = state.rx_bytes.swap(0, Ordering::Relaxed); + bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), None); + } + } +} + +/// UDP status exchange - sequential like C pselect(): +/// 1. Wait up to 1 second for server status +/// 2. Read and process if available +/// 3. ALWAYS send our status +async fn udp_client_status_loop( + stream: &mut TcpStream, + cmd: &Command, + state: &BandwidthState, +) { + let mut seq: u32 = 0; + let (mut reader, mut writer) = tokio::io::split(stream); + let mut status_buf = [0u8; STATUS_MSG_SIZE]; + let mut next_status = Instant::now() + Duration::from_secs(1); + + loop { + if !state.running.load(Ordering::Relaxed) { + break; + } + + let now = Instant::now(); + let wait_time = if next_status > now { + next_status - now + } else { + Duration::ZERO + }; + + match tokio::time::timeout(wait_time, reader.read_exact(&mut status_buf)).await { + Ok(Ok(_)) => { + let server_status = StatusMessage::deserialize(&status_buf); + + if server_status.bytes_received > 0 && cmd.client_tx() { + let new_speed = + ((server_status.bytes_received as u64 * 8 * 3) / 2) as u32; + state.tx_speed.store(new_speed, Ordering::Relaxed); + state.tx_speed_changed.store(true, Ordering::Relaxed); + tracing::debug!( + "Server received {} bytes → TX {:.2} Mbps", + server_status.bytes_received, + new_speed as f64 / 1_000_000.0, + ); + } + + if Instant::now() < next_status { + continue; + } + } + Ok(Err(_)) => { + state.running.store(false, Ordering::SeqCst); + break; + } + Err(_) => {} + } + + // ALWAYS send our status every 1 second + seq += 1; + next_status = Instant::now() + Duration::from_secs(1); + + let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed); + let tx_bytes = state.tx_bytes.swap(0, Ordering::Relaxed); + let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed); + + let status = StatusMessage { + seq, + bytes_received: rx_bytes as u32, + }; + if writer.write_all(&status.serialize()).await.is_err() { + state.running.store(false, Ordering::SeqCst); + break; + } + let _ = writer.flush().await; + + if cmd.client_tx() { + bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None); + } + if cmd.client_rx() { + bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost)); + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..97d18eb --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,5 @@ +pub mod auth; +pub mod bandwidth; +pub mod client; +pub mod protocol; +pub mod server; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..381fa21 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,136 @@ +mod auth; +mod bandwidth; +mod client; +mod protocol; +mod server; + +use clap::Parser; +use tracing_subscriber::EnvFilter; + +use crate::protocol::*; + +#[derive(Parser, Debug)] +#[command( + name = "btest", + about = "MikroTik Bandwidth Test (btest) - server and client", + version, + long_about = "Compatible bandwidth testing tool for MikroTik RouterOS devices.\n\ + Supports TCP and UDP modes with optional authentication." +)] +struct Cli { + /// Run in server mode + #[arg(short = 's', long = "server", conflicts_with = "client")] + server: bool, + + /// Run in client mode, connecting to the specified host + #[arg(short = 'c', long = "client", conflicts_with = "server")] + client: Option, + + /// Client transmits data (upload test) + #[arg(short = 't', long = "transmit")] + transmit: bool, + + /// Client receives data (download test) + #[arg(short = 'r', long = "receive")] + receive: bool, + + /// Use UDP instead of TCP + #[arg(short = 'u', long = "udp")] + udp: bool, + + /// Target bandwidth (e.g., 100M, 1G, 500K) + #[arg(short = 'b', long = "bandwidth")] + bandwidth: Option, + + /// Listen/connect port (default: 2000) + #[arg(short = 'P', long = "port", default_value_t = BTEST_PORT)] + port: u16, + + /// Authentication username + #[arg(short = 'a', long = "authuser")] + auth_user: Option, + + /// Authentication password + #[arg(short = 'p', long = "authpass")] + auth_pass: Option, + + /// NAT mode - send probe packet to open firewall + #[arg(short = 'n', long = "nat")] + nat: bool, + + /// Verbose logging (repeat for more: -v, -vv, -vvv) + #[arg(short = 'v', long = "verbose", action = clap::ArgAction::Count)] + verbose: u8, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + + // Set up logging based on verbosity + let filter = match cli.verbose { + 0 => "info", + 1 => "debug", + _ => "trace", + }; + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(filter)), + ) + .with_target(false) + .init(); + + if cli.server { + // Server mode + tracing::info!("Starting btest server on port {}", cli.port); + server::run_server(cli.port, cli.auth_user, cli.auth_pass).await?; + } else if let Some(host) = cli.client { + // Client mode - must specify at least one direction + if !cli.transmit && !cli.receive { + eprintln!("Error: Client mode requires at least one of -t (transmit) or -r (receive)"); + std::process::exit(1); + } + + // Direction tells SERVER what to do (C client convention): + // client transmit → CMD_DIR_RX (server receives) + // client receive → CMD_DIR_TX (server transmits) + let direction = match (cli.transmit, cli.receive) { + (true, false) => CMD_DIR_RX, + (false, true) => CMD_DIR_TX, + (true, true) => CMD_DIR_BOTH, + _ => unreachable!(), + }; + + let bw = match &cli.bandwidth { + Some(b) => bandwidth::parse_bandwidth(b)?, + None => 0, + }; + + // For client: local_tx_speed controls upload, remote_tx_speed controls download + let (tx_speed, rx_speed) = match direction { + CMD_DIR_TX => (bw, 0), + CMD_DIR_RX => (0, bw), + CMD_DIR_BOTH => (bw, bw), + _ => (0, 0), + }; + + client::run_client( + &host, + cli.port, + direction, + cli.udp, + tx_speed, + rx_speed, + cli.auth_user, + cli.auth_pass, + cli.nat, + ) + .await?; + } else { + eprintln!("Error: Must specify either -s (server) or -c (client)"); + eprintln!("Run with --help for usage information."); + std::process::exit(1); + } + + Ok(()) +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..95f126b --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,205 @@ +use std::io; +use thiserror::Error; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +// --- Constants --- + +pub const BTEST_PORT: u16 = 2000; +pub const BTEST_UDP_PORT_START: u16 = 2001; +pub const BTEST_PORT_CLIENT_OFFSET: u16 = 256; + +pub const CMD_PROTO_UDP: u8 = 0x00; +pub const CMD_PROTO_TCP: u8 = 0x01; + +pub const CMD_DIR_RX: u8 = 0x01; +pub const CMD_DIR_TX: u8 = 0x02; +pub const CMD_DIR_BOTH: u8 = 0x03; + +pub const DEFAULT_TCP_TX_SIZE: u16 = 0x8000; // 32768 +pub const DEFAULT_UDP_TX_SIZE: u16 = 0x05DC; // 1500 + +pub const HELLO: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; +pub const AUTH_OK: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; +pub const AUTH_REQUIRED: [u8; 4] = [0x02, 0x00, 0x00, 0x00]; +pub const AUTH_FAILED: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; + +pub const STATUS_MSG_TYPE: u8 = 0x07; +pub const STATUS_MSG_SIZE: usize = 12; + +// --- Error Types --- + +#[derive(Error, Debug)] +pub enum BtestError { + #[error("I/O error: {0}")] + Io(#[from] io::Error), + #[error("Protocol error: {0}")] + Protocol(String), + #[error("Authentication failed")] + AuthFailed, + #[error("Invalid command")] + InvalidCommand, +} + +pub type Result = std::result::Result; + +// --- Command Structure --- + +#[derive(Debug, Clone)] +pub struct Command { + pub proto: u8, + pub direction: u8, + pub random_data: u8, + pub tcp_conn_count: u8, + pub tx_size: u16, + pub client_buf_size: u16, + pub remote_tx_speed: u32, + pub local_tx_speed: u32, +} + +impl Command { + pub fn new(proto: u8, direction: u8) -> Self { + let tx_size = if proto == CMD_PROTO_UDP { + DEFAULT_UDP_TX_SIZE + } else { + DEFAULT_TCP_TX_SIZE + }; + Self { + proto, + direction, + random_data: 0, + tcp_conn_count: 0, + tx_size, + client_buf_size: 0, + remote_tx_speed: 0, + local_tx_speed: 0, + } + } + + pub fn serialize(&self) -> [u8; 16] { + let mut buf = [0u8; 16]; + buf[0] = self.proto; + buf[1] = self.direction; + buf[2] = self.random_data; + buf[3] = self.tcp_conn_count; + buf[4..6].copy_from_slice(&self.tx_size.to_le_bytes()); + buf[6..8].copy_from_slice(&self.client_buf_size.to_le_bytes()); + buf[8..12].copy_from_slice(&self.remote_tx_speed.to_le_bytes()); + buf[12..16].copy_from_slice(&self.local_tx_speed.to_le_bytes()); + buf + } + + pub fn deserialize(buf: &[u8; 16]) -> Self { + Self { + proto: buf[0], + direction: buf[1], + random_data: buf[2], + tcp_conn_count: buf[3], + tx_size: u16::from_le_bytes([buf[4], buf[5]]), + client_buf_size: u16::from_le_bytes([buf[6], buf[7]]), + remote_tx_speed: u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]), + local_tx_speed: u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]), + } + } + + pub fn is_udp(&self) -> bool { + self.proto == CMD_PROTO_UDP + } + + // Direction bits are from SERVER's perspective: + // CMD_DIR_RX (0x01) = server receives + // CMD_DIR_TX (0x02) = server transmits + // Client inverts when building: client TX → CMD_DIR_RX, client RX → CMD_DIR_TX + + /// Server should transmit (CMD_DIR_TX bit set) + pub fn server_tx(&self) -> bool { + self.direction & CMD_DIR_TX != 0 + } + + /// Server should receive (CMD_DIR_RX bit set) + pub fn server_rx(&self) -> bool { + self.direction & CMD_DIR_RX != 0 + } + + /// Client should transmit (inverse: CMD_DIR_RX bit = server receives our data) + pub fn client_tx(&self) -> bool { + self.direction & CMD_DIR_RX != 0 + } + + /// Client should receive (inverse: CMD_DIR_TX bit = server sends us data) + pub fn client_rx(&self) -> bool { + self.direction & CMD_DIR_TX != 0 + } +} + +// --- Status Message --- + +#[derive(Debug, Clone, Default)] +pub struct StatusMessage { + pub seq: u32, + pub bytes_received: u32, +} + +impl StatusMessage { + pub fn serialize(&self) -> [u8; STATUS_MSG_SIZE] { + let mut buf = [0u8; STATUS_MSG_SIZE]; + buf[0] = STATUS_MSG_TYPE; + buf[1..5].copy_from_slice(&self.seq.to_be_bytes()); + buf[5] = 0; + buf[6] = 0; + buf[7] = 0; + buf[8..12].copy_from_slice(&self.bytes_received.to_le_bytes()); + buf + } + + pub fn deserialize(buf: &[u8; STATUS_MSG_SIZE]) -> Self { + Self { + seq: u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]), + bytes_received: u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]), + } + } +} + +// --- Protocol Helpers --- + +pub async fn send_hello(writer: &mut W) -> Result<()> { + writer.write_all(&HELLO).await?; + writer.flush().await?; + Ok(()) +} + +pub async fn recv_hello(reader: &mut R) -> Result<()> { + let mut buf = [0u8; 4]; + reader.read_exact(&mut buf).await?; + if buf != HELLO { + return Err(BtestError::Protocol(format!( + "Expected HELLO {:02x?}, got {:02x?}", + HELLO, buf + ))); + } + Ok(()) +} + +pub async fn send_command( + writer: &mut W, + cmd: &Command, +) -> Result<()> { + writer.write_all(&cmd.serialize()).await?; + writer.flush().await?; + Ok(()) +} + +pub async fn recv_command(reader: &mut R) -> Result { + let mut buf = [0u8; 16]; + reader.read_exact(&mut buf).await?; + let cmd = Command::deserialize(&buf); + if cmd.proto > 1 || cmd.direction == 0 || cmd.direction > 3 { + return Err(BtestError::InvalidCommand); + } + Ok(cmd) +} + +pub async fn recv_response(reader: &mut R) -> Result<[u8; 4]> { + let mut buf = [0u8; 4]; + reader.read_exact(&mut buf).await?; + Ok(buf) +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..531d801 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,456 @@ +use std::net::SocketAddr; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; + +use crate::auth; +use crate::bandwidth::{self, BandwidthState}; +use crate::protocol::*; + +pub async fn run_server( + port: u16, + auth_user: Option, + auth_pass: Option, +) -> Result<()> { + let addr = format!("0.0.0.0:{}", port); + let listener = TcpListener::bind(&addr).await?; + tracing::info!("btest server listening on {}", addr); + + let udp_port_offset = Arc::new(std::sync::atomic::AtomicU16::new(0)); + + loop { + let (stream, peer) = listener.accept().await?; + tracing::info!("New connection from {}", peer); + + let auth_user = auth_user.clone(); + let auth_pass = auth_pass.clone(); + let udp_offset = udp_port_offset.clone(); + + tokio::spawn(async move { + if let Err(e) = handle_client(stream, peer, auth_user, auth_pass, udp_offset).await { + tracing::error!("Client {} error: {}", peer, e); + } + }); + } +} + +async fn handle_client( + mut stream: TcpStream, + peer: SocketAddr, + auth_user: Option, + auth_pass: Option, + udp_port_offset: Arc, +) -> Result<()> { + stream.set_nodelay(true)?; + + send_hello(&mut stream).await?; + + let cmd = recv_command(&mut stream).await?; + tracing::info!( + "Client {} command: proto={} dir={} tx_size={} remote_speed={} local_speed={}", + peer, + if cmd.is_udp() { "UDP" } else { "TCP" }, + match cmd.direction { + CMD_DIR_RX => "RX", + CMD_DIR_TX => "TX", + CMD_DIR_BOTH => "BOTH", + _ => "?", + }, + cmd.tx_size, + cmd.remote_tx_speed, + cmd.local_tx_speed, + ); + + auth::server_authenticate( + &mut stream, + auth_user.as_deref(), + auth_pass.as_deref(), + ) + .await?; + + if cmd.is_udp() { + run_udp_test_server(&mut stream, peer, &cmd, udp_port_offset).await + } else { + run_tcp_test_server(stream, cmd).await + } +} + +// --- TCP Test Server --- + +async fn run_tcp_test_server(stream: TcpStream, cmd: Command) -> Result<()> { + let state = BandwidthState::new(); + let tx_size = cmd.tx_size as usize; + let server_should_tx = cmd.server_tx(); + let server_should_rx = cmd.server_rx(); + let tx_speed = cmd.remote_tx_speed; + + let (reader, writer) = stream.into_split(); + + // IMPORTANT: Do NOT drop unused halves - dropping sends TCP FIN + let mut _writer_keepalive = None; + let mut _reader_keepalive = None; + + let state_tx = state.clone(); + let tx_handle = if server_should_tx { + Some(tokio::spawn(async move { + tcp_tx_loop(writer, tx_size, tx_speed, state_tx).await + })) + } else { + _writer_keepalive = Some(writer); + None + }; + + let state_rx = state.clone(); + let rx_handle = if server_should_rx { + Some(tokio::spawn(async move { + tcp_rx_loop(reader, state_rx).await + })) + } else { + _reader_keepalive = Some(reader); + None + }; + + status_report_loop(&cmd, &state).await; + + state.running.store(false, Ordering::SeqCst); + if let Some(h) = tx_handle { let _ = h.await; } + if let Some(h) = rx_handle { let _ = h.await; } + Ok(()) +} + +async fn tcp_tx_loop( + mut writer: tokio::net::tcp::OwnedWriteHalf, + tx_size: usize, + tx_speed: u32, + state: Arc, +) { + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut packet = vec![0u8; tx_size]; + packet[0] = STATUS_MSG_TYPE; + let mut interval = bandwidth::calc_send_interval(tx_speed, tx_size as u16); + let mut next_send = Instant::now(); + + while state.running.load(Ordering::Relaxed) { + if writer.write_all(&packet).await.is_err() { + break; + } + state.tx_bytes.fetch_add(tx_size as u64, Ordering::Relaxed); + + if state.tx_speed_changed.load(Ordering::Relaxed) { + state.tx_speed_changed.store(false, Ordering::Relaxed); + let new_speed = state.tx_speed.load(Ordering::Relaxed); + interval = bandwidth::calc_send_interval(new_speed, tx_size as u16); + next_send = Instant::now(); + } + + match interval { + Some(iv) => { + next_send += iv; + let now = Instant::now(); + if next_send > now { + tokio::time::sleep(next_send - now).await; + } + } + None => { + tokio::task::yield_now().await; + } + } + } +} + +async fn tcp_rx_loop(mut reader: tokio::net::tcp::OwnedReadHalf, state: Arc) { + let mut buf = vec![0u8; 65536]; + while state.running.load(Ordering::Relaxed) { + match reader.read(&mut buf).await { + Ok(0) | Err(_) => break, + Ok(n) => { + state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); + } + } + } +} + +// --- UDP Test Server --- + +async fn run_udp_test_server( + stream: &mut TcpStream, + peer: SocketAddr, + cmd: &Command, + udp_port_offset: Arc, +) -> Result<()> { + let offset = udp_port_offset.fetch_add(1, Ordering::SeqCst); + let server_udp_port = BTEST_UDP_PORT_START + offset; + let client_udp_port = server_udp_port + BTEST_PORT_CLIENT_OFFSET; + + stream.write_all(&server_udp_port.to_be_bytes()).await?; + stream.flush().await?; + + tracing::info!( + "UDP test: server_port={}, client_port={}, peer={}", + server_udp_port, client_udp_port, peer, + ); + + let udp = UdpSocket::bind(format!("0.0.0.0:{}", server_udp_port)).await?; + let client_udp_addr: SocketAddr = + format!("{}:{}", peer.ip(), client_udp_port).parse().unwrap(); + udp.connect(client_udp_addr).await?; + + let state = BandwidthState::new(); + let tx_size = cmd.tx_size as usize; + let server_should_tx = cmd.server_tx(); + let server_should_rx = cmd.server_rx(); + let tx_speed = cmd.remote_tx_speed; + + let udp = Arc::new(udp); + + let state_tx = state.clone(); + let udp_tx = udp.clone(); + let tx_handle = if server_should_tx { + Some(tokio::spawn(async move { + udp_tx_loop(&udp_tx, tx_size, tx_speed, state_tx).await + })) + } else { + None + }; + + let state_rx = state.clone(); + let udp_rx = udp.clone(); + let rx_handle = if server_should_rx { + Some(tokio::spawn(async move { + udp_rx_loop(&udp_rx, state_rx).await + })) + } else { + None + }; + + // Status exchange using select! to match C pselect() behavior + udp_status_loop(stream, cmd, &state).await; + + state.running.store(false, Ordering::SeqCst); + if let Some(h) = tx_handle { let _ = h.await; } + if let Some(h) = rx_handle { let _ = h.await; } + Ok(()) +} + +async fn udp_tx_loop( + socket: &UdpSocket, + tx_size: usize, + initial_tx_speed: u32, + state: Arc, +) { + let mut seq: u32 = 0; + let mut packet = vec![0u8; tx_size]; + let mut interval = bandwidth::calc_send_interval(initial_tx_speed, tx_size as u16); + let mut next_send = Instant::now(); + let mut consecutive_errors: u32 = 0; + + while state.running.load(Ordering::Relaxed) { + packet[0..4].copy_from_slice(&seq.to_be_bytes()); + + match socket.send(&packet).await { + Ok(n) => { + seq = seq.wrapping_add(1); + state.tx_bytes.fetch_add(n as u64, Ordering::Relaxed); + consecutive_errors = 0; + } + Err(_) => { + consecutive_errors += 1; + if consecutive_errors > 1000 { + tracing::warn!("UDP TX: too many consecutive send errors, stopping"); + break; + } + // Back off on ENOBUFS/EAGAIN + tokio::time::sleep(Duration::from_micros(200)).await; + continue; + } + } + + // Pick up dynamic speed changes from status loop + if state.tx_speed_changed.load(Ordering::Relaxed) { + state.tx_speed_changed.store(false, Ordering::Relaxed); + let new_speed = state.tx_speed.load(Ordering::Relaxed); + interval = bandwidth::calc_send_interval(new_speed, tx_size as u16); + next_send = Instant::now(); + tracing::debug!("TX speed adjusted to {} bps ({:.2} Mbps)", + new_speed, new_speed as f64 / 1_000_000.0); + } + + match interval { + Some(iv) => { + next_send += iv; + let now = Instant::now(); + if next_send > now { + tokio::time::sleep(next_send - now).await; + } + } + None => { + // Unlimited: yield every 64 packets to keep system responsive + if seq % 64 == 0 { + tokio::task::yield_now().await; + } + } + } + } +} + +async fn udp_rx_loop(socket: &UdpSocket, state: Arc) { + let mut buf = vec![0u8; 65536]; + let mut last_seq: Option = None; + + while state.running.load(Ordering::Relaxed) { + match tokio::time::timeout(Duration::from_secs(5), socket.recv(&mut buf)).await { + Ok(Ok(n)) if n >= 4 => { + state.rx_bytes.fetch_add(n as u64, Ordering::Relaxed); + state.rx_packets.fetch_add(1, Ordering::Relaxed); + + let seq = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); + if let Some(last) = last_seq { + let expected = last.wrapping_add(1); + if seq > expected { + let lost = seq - expected; + state.rx_lost_packets.fetch_add(lost as u64, Ordering::Relaxed); + } + } + last_seq = Some(seq); + state.last_udp_seq.store(seq, Ordering::Relaxed); + } + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::debug!("UDP recv error: {}", e); + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(_) => { + tracing::debug!("UDP RX timeout"); + } + } + } +} + +// --- Status Reporting --- + +async fn status_report_loop(cmd: &Command, state: &BandwidthState) { + let mut seq: u32 = 0; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + loop { + interval.tick().await; + if !state.running.load(Ordering::Relaxed) { + break; + } + + seq += 1; + + if cmd.server_tx() { + let tx = state.tx_bytes.swap(0, Ordering::Relaxed); + bandwidth::print_status(seq, "TX", tx, Duration::from_secs(1), None); + } + + if cmd.server_rx() { + let rx = state.rx_bytes.swap(0, Ordering::Relaxed); + let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed); + let lost_opt = if cmd.is_udp() { Some(lost) } else { None }; + bandwidth::print_status(seq, "RX", rx, Duration::from_secs(1), lost_opt); + } + } +} + +/// UDP status exchange loop - matches C pselect() behavior exactly: +/// 1. Wait up to 1 second for client status (like pselect with 1s timeout) +/// 2. If client sent status, read and process it +/// 3. ALWAYS send our status (unconditional, matching C line 1048) +/// 4. Reset counters and print stats +/// This sequential approach prevents the ticker from being starved. +async fn udp_status_loop( + stream: &mut TcpStream, + cmd: &Command, + state: &BandwidthState, +) { + let mut seq: u32 = 0; + let (mut reader, mut writer) = tokio::io::split(stream); + let mut status_buf = [0u8; STATUS_MSG_SIZE]; + let mut next_status = Instant::now() + Duration::from_secs(1); + + loop { + if !state.running.load(Ordering::Relaxed) { + break; + } + + // Step 1: Wait for client status OR timeout (like C pselect) + let now = Instant::now(); + let wait_time = if next_status > now { + next_status - now + } else { + Duration::ZERO + }; + + // Try to read client's status within the remaining time window + match tokio::time::timeout(wait_time, reader.read_exact(&mut status_buf)).await { + Ok(Ok(_)) => { + let client_status = StatusMessage::deserialize(&status_buf); + tracing::debug!( + "RECV status: raw={:02x?} seq={} bytes_received={}", + &status_buf, client_status.seq, client_status.bytes_received, + ); + + if client_status.bytes_received > 0 && cmd.server_tx() { + let new_speed = + ((client_status.bytes_received as u64 * 8 * 3) / 2) as u32; + state.tx_speed.store(new_speed, Ordering::Relaxed); + state.tx_speed_changed.store(true, Ordering::Relaxed); + tracing::debug!( + "Speed adjust: client got {} bytes → our TX {:.2} Mbps", + client_status.bytes_received, + new_speed as f64 / 1_000_000.0, + ); + } + + if Instant::now() < next_status { + continue; + } + } + Ok(Err(e)) => { + tracing::debug!("Client TCP read error: {}", e); + state.running.store(false, Ordering::SeqCst); + break; + } + Err(_) => { + // Timeout - 1 second elapsed + } + } + + // Step 2: ALWAYS send our status every 1 second + seq += 1; + next_status = Instant::now() + Duration::from_secs(1); + + let rx_bytes = state.rx_bytes.swap(0, Ordering::Relaxed); + let tx_bytes = state.tx_bytes.swap(0, Ordering::Relaxed); + let lost = state.rx_lost_packets.swap(0, Ordering::Relaxed); + + let status = StatusMessage { + seq, + bytes_received: rx_bytes as u32, + }; + let serialized = status.serialize(); + tracing::debug!( + "SEND status: raw={:02x?} seq={} bytes_received={} ({:.2} Mbps)", + &serialized, seq, rx_bytes, rx_bytes as f64 * 8.0 / 1_000_000.0, + ); + if writer.write_all(&serialized).await.is_err() { + state.running.store(false, Ordering::SeqCst); + break; + } + let _ = writer.flush().await; + + // Print local stats + if cmd.server_tx() { + bandwidth::print_status(seq, "TX", tx_bytes, Duration::from_secs(1), None); + } + if cmd.server_rx() { + bandwidth::print_status(seq, "RX", rx_bytes, Duration::from_secs(1), Some(lost)); + } + } +} diff --git a/tests/integration_test.rs b/tests/integration_test.rs new file mode 100644 index 0000000..e02a13d --- /dev/null +++ b/tests/integration_test.rs @@ -0,0 +1,234 @@ +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +const SERVER_PORT: u16 = 12000; + +async fn start_test_server(port: u16, auth_user: Option<&str>, auth_pass: Option<&str>) { + let user = auth_user.map(String::from); + let pass = auth_pass.map(String::from); + tokio::spawn(async move { + let _ = mikrotik_btest::server::run_server(port, user, pass).await; + }); + tokio::time::sleep(Duration::from_millis(100)).await; +} + +#[tokio::test] +async fn test_server_hello() { + let port = SERVER_PORT; + start_test_server(port, None, None).await; + + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)) + .await + .expect("Failed to connect"); + + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x01, 0x00, 0x00, 0x00], "Expected HELLO response"); +} + +#[tokio::test] +async fn test_server_command_and_noauth() { + let port = SERVER_PORT + 1; + start_test_server(port, None, None).await; + + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)) + .await + .expect("Failed to connect"); + + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x01, 0x00, 0x00, 0x00]); + + // CMD_DIR_TX (0x02) = server should transmit data to us + let cmd = mikrotik_btest::protocol::Command::new( + mikrotik_btest::protocol::CMD_PROTO_TCP, + mikrotik_btest::protocol::CMD_DIR_TX, + ); + stream.write_all(&cmd.serialize()).await.unwrap(); + stream.flush().await.unwrap(); + + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x01, 0x00, 0x00, 0x00], "Expected AUTH_OK"); + + // Server should start sending data + tokio::time::sleep(Duration::from_millis(500)).await; + let mut data = vec![0u8; 4096]; + let n = stream.read(&mut data).await.unwrap(); + assert!(n > 0, "Expected to receive data from server"); +} + +#[tokio::test] +async fn test_server_auth_challenge() { + let port = SERVER_PORT + 2; + start_test_server(port, Some("admin"), Some("test")).await; + + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)) + .await + .expect("Failed to connect"); + + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x01, 0x00, 0x00, 0x00]); + + // CMD_DIR_TX = server transmits + let cmd = mikrotik_btest::protocol::Command::new( + mikrotik_btest::protocol::CMD_PROTO_TCP, + mikrotik_btest::protocol::CMD_DIR_TX, + ); + stream.write_all(&cmd.serialize()).await.unwrap(); + stream.flush().await.unwrap(); + + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x02, 0x00, 0x00, 0x00], "Expected AUTH_REQUIRED"); + + let mut challenge = [0u8; 16]; + stream.read_exact(&mut challenge).await.unwrap(); + + let hash = mikrotik_btest::auth::compute_auth_hash("test", &challenge); + let mut response = [0u8; 48]; + response[0..16].copy_from_slice(&hash); + response[16..21].copy_from_slice(b"admin"); + + stream.write_all(&response).await.unwrap(); + stream.flush().await.unwrap(); + + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x01, 0x00, 0x00, 0x00], "Expected AUTH_OK"); +} + +#[tokio::test] +async fn test_server_auth_failure() { + let port = SERVER_PORT + 3; + start_test_server(port, Some("admin"), Some("test")).await; + + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)) + .await + .expect("Failed to connect"); + + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).await.unwrap(); + + let cmd = mikrotik_btest::protocol::Command::new( + mikrotik_btest::protocol::CMD_PROTO_TCP, + mikrotik_btest::protocol::CMD_DIR_TX, + ); + stream.write_all(&cmd.serialize()).await.unwrap(); + stream.flush().await.unwrap(); + + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x02, 0x00, 0x00, 0x00]); + + let mut challenge = [0u8; 16]; + stream.read_exact(&mut challenge).await.unwrap(); + + let hash = mikrotik_btest::auth::compute_auth_hash("wrongpassword", &challenge); + let mut response = [0u8; 48]; + response[0..16].copy_from_slice(&hash); + response[16..21].copy_from_slice(b"admin"); + + stream.write_all(&response).await.unwrap(); + stream.flush().await.unwrap(); + + stream.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [0x00, 0x00, 0x00, 0x00], "Expected AUTH_FAILED"); +} + +// Loopback tests use run_client which builds direction correctly +// (client transmit → CMD_DIR_RX, client receive → CMD_DIR_TX) + +#[tokio::test] +async fn test_loopback_tcp_rx() { + let port = SERVER_PORT + 4; + start_test_server(port, None, None).await; + + let handle = tokio::spawn(async move { + mikrotik_btest::client::run_client( + "127.0.0.1", + port, + mikrotik_btest::protocol::CMD_DIR_TX, // server TX = client RX + false, + 0, + 0, + None, + None, + false, + ) + .await + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + handle.abort(); +} + +#[tokio::test] +async fn test_loopback_tcp_tx() { + let port = SERVER_PORT + 5; + start_test_server(port, None, None).await; + + let handle = tokio::spawn(async move { + mikrotik_btest::client::run_client( + "127.0.0.1", + port, + mikrotik_btest::protocol::CMD_DIR_RX, // server RX = client TX + false, + 0, + 0, + None, + None, + false, + ) + .await + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + handle.abort(); +} + +#[tokio::test] +async fn test_loopback_tcp_both() { + let port = SERVER_PORT + 6; + start_test_server(port, None, None).await; + + let handle = tokio::spawn(async move { + mikrotik_btest::client::run_client( + "127.0.0.1", + port, + mikrotik_btest::protocol::CMD_DIR_BOTH, + false, + 0, + 0, + None, + None, + false, + ) + .await + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + handle.abort(); +} + +#[tokio::test] +async fn test_loopback_tcp_with_auth() { + let port = SERVER_PORT + 7; + start_test_server(port, Some("admin"), Some("secret")).await; + + let handle = tokio::spawn(async move { + mikrotik_btest::client::run_client( + "127.0.0.1", + port, + mikrotik_btest::protocol::CMD_DIR_TX, // server TX = client RX + false, + 0, + 0, + Some("admin".into()), + Some("secret".into()), + false, + ) + .await + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + handle.abort(); +}