#!/usr/bin/env python3 """ Minimal multi-user chat. No dependencies beyond stdlib. Server: python3 chat.py server [port] Client: python3 chat.py [port] The server also exposes a web UI at /chat (HTTP on the same port). """ import asyncio import base64 import curses import hashlib import json import os import struct import sys import time import html import urllib.parse PORT = 9999 VERSION = "5" TUNNEL_TARGET = ("185.208.174.152", 22) # ── Server ────────────────────────────────────────────────────────────── clients: dict[asyncio.StreamWriter, str] = {} # TCP clients sse_queues: list[asyncio.Queue] = [] # web clients history: list[dict] = [] async def broadcast(msg: dict): history.append(msg) line = json.dumps(msg) + "\n" # TCP clients dead = [] for w in clients: try: w.write(line.encode()) await w.drain() except Exception: dead.append(w) for w in dead: clients.pop(w, None) # SSE web clients dead_q = [] for q in sse_queues: try: q.put_nowait(msg) except Exception: dead_q.append(q) for q in dead_q: sse_queues.remove(q) # ── HTML / JS chat page ──────────────────────────────────────────────── CHAT_HTML = r""" Chat
""" # ── HTTP request parser ──────────────────────────────────────────────── async def parse_http_request(reader: asyncio.StreamReader, first_line: str): """Parse an HTTP request, return (method, path, headers, body).""" parts = first_line.split() method = parts[0] path = parts[1] if len(parts) > 1 else "/" headers = {} while True: hline = (await reader.readline()).decode().strip() if not hline: break k, _, v = hline.partition(":") headers[k.strip().lower()] = v.strip() body = b"" clen = int(headers.get("content-length", 0)) if clen > 0: body = await reader.readexactly(clen) return method, path, headers, body # ── WebSocket helpers ─────────────────────────────────────────────────── async def ws_read_frame(reader): """Read one WebSocket frame, return (opcode, payload).""" b0, b1 = struct.unpack("!BB", await reader.readexactly(2)) opcode = b0 & 0x0F masked = b1 & 0x80 length = b1 & 0x7F if length == 126: length = struct.unpack("!H", await reader.readexactly(2))[0] elif length == 127: length = struct.unpack("!Q", await reader.readexactly(8))[0] mask = await reader.readexactly(4) if masked else None data = await reader.readexactly(length) if mask: data = bytes(b ^ mask[i % 4] for i, b in enumerate(data)) return opcode, data def ws_make_frame(opcode, data): """Build an unmasked WebSocket frame.""" frame = bytes([0x80 | opcode]) if len(data) < 126: frame += bytes([len(data)]) elif len(data) < 65536: frame += struct.pack("!BH", 126, len(data)) else: frame += struct.pack("!BQ", 127, len(data)) return frame + data async def handle_ws_tunnel(ws_reader, ws_writer): """Bridge WebSocket frames <-> raw TCP to TUNNEL_TARGET.""" try: ssh_reader, ssh_writer = await asyncio.open_connection(*TUNNEL_TARGET) except Exception as e: ws_writer.write(ws_make_frame(0x8, struct.pack("!H", 1011) + str(e).encode()[:123])) await ws_writer.drain() ws_writer.close() return async def ws_to_ssh(): try: while True: op, data = await ws_read_frame(ws_reader) if op == 0x8: break if op == 0x9: ws_writer.write(ws_make_frame(0xA, data)) await ws_writer.drain() continue if op in (0x1, 0x2): ssh_writer.write(data) await ssh_writer.drain() except Exception: pass finally: ssh_writer.close() async def ssh_to_ws(): try: while True: data = await ssh_reader.read(16384) if not data: break ws_writer.write(ws_make_frame(0x2, data)) await ws_writer.drain() except Exception: pass finally: ws_writer.write(ws_make_frame(0x8, struct.pack("!H", 1000))) try: await ws_writer.drain() except Exception: pass ws_writer.close() await asyncio.gather(ws_to_ssh(), ssh_to_ws()) # ── HTTP handling ─────────────────────────────────────────────────────── async def handle_http(reader, writer, first_line): method, path, headers, body = await parse_http_request(reader, first_line) # GET /version if method == "GET" and path == "/version": resp = json.dumps({"version": VERSION}).encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/json\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return # GET /tunnel.py — download the tunnel client if method == "GET" and path == "/tunnel.py": tunnel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tunnel.py") try: source = open(tunnel_path, "rb").read() except FileNotFoundError: writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/plain\r\n") writer.write(f"Content-Length: {len(source)}\r\n".encode()) writer.write(b"Content-Disposition: attachment; filename=\"tunnel.py\"\r\n") writer.write(b"\r\n") writer.write(source) await writer.drain() writer.close() return # GET / or /chat — serve the web UI if method == "GET" and path in ("/", "/chat", "/chat/"): resp = CHAT_HTML.encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/html; charset=utf-8\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return # GET /chat/events — SSE stream if method == "GET" and path == "/chat/events": writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/event-stream\r\n") writer.write(b"Cache-Control: no-cache\r\n") writer.write(b"Connection: keep-alive\r\n") writer.write(b"X-Accel-Buffering: no\r\n") writer.write(b"Access-Control-Allow-Origin: *\r\n") writer.write(b"\r\n") await writer.drain() for msg in history: writer.write(f"data: {json.dumps(msg)}\n\n".encode()) await writer.drain() q: asyncio.Queue = asyncio.Queue() sse_queues.append(q) try: while True: msg = await q.get() writer.write(f"data: {json.dumps(msg)}\n\n".encode()) await writer.drain() except Exception: pass finally: if q in sse_queues: sse_queues.remove(q) writer.close() return # POST /chat/send — send a message from the web UI if method == "POST" and path == "/chat/send": params = urllib.parse.parse_qs(body.decode()) name = params.get("name", ["anon"])[0] text = params.get("text", [""])[0].strip() if text: await broadcast({"ts": time.time(), "user": name, "text": text}) writer.write(b"HTTP/1.1 204 No Content\r\n\r\n") await writer.drain() writer.close() return # GET /tunnel — WebSocket upgrade for SSH tunnel if method == "GET" and path == "/tunnel": ws_key = headers.get("sec-websocket-key", "") if not ws_key: writer.write(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return accept = base64.b64encode( hashlib.sha1((ws_key + "258EAFA5-E914-47DA-95CA-5AB9DC65B5F3").encode()).digest() ).decode() writer.write(f"HTTP/1.1 101 Switching Protocols\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Accept: {accept}\r\n" f"\r\n".encode()) await writer.drain() await handle_ws_tunnel(reader, writer) return # 404 writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() # ── Connection handler (TCP chat + HTTP on same port) ────────────────── async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): first_line = (await reader.readline()).decode().strip() # HTTP request if first_line.startswith(("GET ", "POST ", "PUT ", "HEAD ")): await handle_http(reader, writer, first_line) return # Raw TCP chat client name = first_line clients[writer] = name for msg in history: writer.write((json.dumps(msg) + "\n").encode()) await writer.drain() await broadcast({"ts": time.time(), "user": "***", "text": f"{name} joined"}) print(f"+ {name} connected ({len(clients)} online)") try: while True: data = await reader.readline() if not data: break text = data.decode().strip() if text: await broadcast({"ts": time.time(), "user": name, "text": text}) except Exception: pass finally: clients.pop(writer, None) await broadcast({"ts": time.time(), "user": "***", "text": f"{name} left"}) print(f"- {name} disconnected ({len(clients)} online)") writer.close() async def run_server(port: int): srv = await asyncio.start_server(handle, "0.0.0.0", port) print(f"Chat server listening on :{port}") print(f" Web UI: http://localhost:{port}/chat") print(f" CLI: python3 chat.py localhost {port}") async with srv: await srv.serve_forever() # ── Client TUI ────────────────────────────────────────────────────────── class ChatClient: def __init__(self, host: str, name: str, port: int): self.host = host self.name = name self.port = port self.messages: list[str] = [] self.input_buf = "" self.scroll = 0 self.reader: asyncio.StreamReader | None = None self.writer: asyncio.StreamWriter | None = None self.running = True def fmt(self, msg: dict) -> str: t = time.strftime("%H:%M", time.localtime(msg["ts"])) if msg["user"] == "***": return f" {t} {msg['text']}" return f" {t} {msg['user']}: {msg['text']}" async def recv_loop(self): while self.running: line = await self.reader.readline() if not line: self.messages.append(" *** connection lost") self.running = False break msg = json.loads(line.decode()) self.messages.append(self.fmt(msg)) async def send(self, text: str): self.writer.write((text + "\n").encode()) await self.writer.drain() async def run(self, stdscr): curses.curs_set(1) curses.use_default_colors() curses.init_pair(1, curses.COLOR_CYAN, -1) curses.init_pair(2, curses.COLOR_GREEN, -1) curses.init_pair(3, curses.COLOR_YELLOW, -1) stdscr.nodelay(True) stdscr.timeout(50) self.reader, self.writer = await asyncio.open_connection(self.host, self.port) await self.send(self.name) recv_task = asyncio.create_task(self.recv_loop()) while self.running: h, w = stdscr.getmaxyx() stdscr.erase() sep_y = h - 2 stdscr.addstr(sep_y, 0, "─" * w, curses.color_pair(1)) visible = self.messages[-(sep_y + self.scroll):len(self.messages) - self.scroll if self.scroll else None] for i, line in enumerate(visible[-(sep_y):]): try: stdscr.addnstr(i, 0, line, w - 1) except curses.error: pass prompt = f" {self.name}> " try: stdscr.addstr(h - 1, 0, prompt, curses.color_pair(2)) stdscr.addnstr(h - 1, len(prompt), self.input_buf, w - len(prompt) - 1) stdscr.move(h - 1, min(len(prompt) + len(self.input_buf), w - 1)) except curses.error: pass stdscr.refresh() try: ch = stdscr.get_wch() except curses.error: await asyncio.sleep(0.05) continue if isinstance(ch, str): if ch == "\n": if self.input_buf.strip(): if self.input_buf.strip() == "/quit": break await self.send(self.input_buf) self.input_buf = "" self.scroll = 0 elif ch == "\x7f" or ch == "\b": self.input_buf = self.input_buf[:-1] elif ch == "\x1b": pass elif ch.isprintable(): self.input_buf += ch elif isinstance(ch, int): if ch == curses.KEY_BACKSPACE: self.input_buf = self.input_buf[:-1] elif ch == curses.KEY_PPAGE: self.scroll = min(self.scroll + 5, max(0, len(self.messages) - (h - 2))) elif ch == curses.KEY_NPAGE: self.scroll = max(self.scroll - 5, 0) self.running = False recv_task.cancel() self.writer.close() def run_client(host: str, name: str, port: int): client = ChatClient(host, name, port) async def main(stdscr): await client.run(stdscr) curses.wrapper(lambda stdscr: asyncio.run(main(stdscr))) # ── Main ──────────────────────────────────────────────────────────────── if __name__ == "__main__": if len(sys.argv) < 2: print(__doc__.strip()) sys.exit(1) if sys.argv[1] == "server": port = int(sys.argv[2]) if len(sys.argv) > 2 else PORT asyncio.run(run_server(port)) else: host = sys.argv[1] name = sys.argv[2] if len(sys.argv) > 2 else "anon" port = int(sys.argv[3]) if len(sys.argv) > 3 else PORT run_client(host, name, port)