#!/usr/bin/env python3 """ Minimal multi-user chat. No dependencies beyond stdlib. Server: python3 chat.py server [port] Client: python3 chat.py [port] The server also exposes a web UI at /chat (HTTP on the same port). CLI client commands: /file Upload a file to the chat /quit Disconnect """ import asyncio import base64 import curses import hashlib import json import os import struct import sys import time import html import urllib.parse PORT = 9999 VERSION = "14" TUNNEL_TARGETS = { "parspack": ("185.208.174.152", 22), "mequ": ("188.213.68.133", 2022), "alipi": ("10.66.66.2", 22), } MAX_FILE_SIZE = 1 * 1024 * 1024 # 1 MB per file MAX_TOTAL_STORAGE = 50 * 1024 * 1024 # 50 MB total # ── Server ────────────────────────────────────────────────────────────── # E2E encryption: public key registry (username -> JWK public key JSON string) KEYS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "keys.json") def load_keys() -> dict[str, str]: """Load user keys from disk.""" try: with open(KEYS_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_keys(): """Persist user keys to disk.""" try: with open(KEYS_FILE, "w") as f: json.dump(user_keys, f) except Exception: pass user_keys: dict[str, str] = load_keys() # DM routing: username -> list of queues for SSE delivery dm_targets: dict[str, list] = {} uploaded_files: dict[str, bytes] = {} # file_id -> raw bytes (insertion order) total_file_bytes = 0 def store_file(file_id: str, data: bytes): """Store a file, evicting oldest files if total storage exceeds limit.""" global total_file_bytes uploaded_files[file_id] = data total_file_bytes += len(data) while total_file_bytes > MAX_TOTAL_STORAGE and uploaded_files: oldest_id = next(iter(uploaded_files)) total_file_bytes -= len(uploaded_files.pop(oldest_id)) class Group: """A chat group with its own history, clients, and optional password.""" def __init__(self, name: str): self.name = name self.password: str | None = None self.history: list[dict] = [] self.clients: dict[asyncio.StreamWriter, str] = {} # TCP clients self.sse_queues: list[asyncio.Queue] = [] async def broadcast(self, msg: dict): self.history.append(msg) line = json.dumps(msg) + "\n" dead = [] for w in self.clients: try: w.write(line.encode()) await w.drain() except Exception: dead.append(w) for w in dead: self.clients.pop(w, None) dead_q = [] for q in self.sse_queues: try: q.put_nowait(msg) except Exception: dead_q.append(q) for q in dead_q: self.sse_queues.remove(q) # All groups keyed by name. Auto-created on first access. groups: dict[str, Group] = {} DEFAULT_GROUP = "lobby" def get_group(name: str) -> Group: """Get or create a group by name.""" name = name.lower().strip() if not name: name = DEFAULT_GROUP if name not in groups: groups[name] = Group(name) return groups[name] # ── HTML / JS chat page ──────────────────────────────────────────────── CHAT_HTML = r""" Chat
Install as app for notifications & fullscreen

This group is password protected

Wrong password
""" PWA_MANIFEST = json.dumps({ "name": "Chat", "short_name": "Chat", "description": "Minimal multi-user chat", "start_url": "/chat", "display": "standalone", "background_color": "#1a1a2e", "theme_color": "#1a1a2e", "icons": [ {"src": "/icon.svg", "sizes": "any", "type": "image/svg+xml", "purpose": "any maskable"} ] }) # Minimal SVG icon (chat bubble) PWA_ICON = """ """ SERVICE_WORKER = """ const CACHE = 'chat-v1'; self.addEventListener('install', e => { self.skipWaiting(); }); self.addEventListener('activate', e => { e.waitUntil(clients.claim()); }); self.addEventListener('fetch', e => { // Let all requests go to network (chat is real-time, caching would break it) // But cache the shell for offline "you're offline" experience if (e.request.mode === 'navigate') { e.respondWith( fetch(e.request).catch(() => new Response( '

Offline - connect to the internet

', {headers: {'Content-Type': 'text/html'}} )) ); } }); """ # ── Multipart parser (minimal, for file uploads) ─────────────────────── def parse_multipart(body: bytes, boundary: str) -> dict: """Parse multipart/form-data, return {field_name: (filename|None, value)}.""" parts = body.split(b"--" + boundary.encode()) result = {} for part in parts: if not part or part.strip() in (b"", b"--"): continue if b"\r\n\r\n" not in part: continue header_block, content = part.split(b"\r\n\r\n", 1) if content.endswith(b"\r\n"): content = content[:-2] headers_str = header_block.decode(errors="replace") name = filename = None for h in headers_str.split("\r\n"): if "content-disposition" in h.lower(): for kv in h.split(";"): kv = kv.strip() if kv.startswith("name="): name = kv.split("=", 1)[1].strip('"') if kv.startswith("filename="): filename = kv.split("=", 1)[1].strip('"') if name: result[name] = (filename, content) return result # ── HTTP request parser ──────────────────────────────────────────────── async def parse_http_request(reader: asyncio.StreamReader, first_line: str): """Parse an HTTP request, return (method, path, headers, body).""" parts = first_line.split() method = parts[0] path = parts[1] if len(parts) > 1 else "/" headers = {} while True: hline = (await reader.readline()).decode().strip() if not hline: break k, _, v = hline.partition(":") headers[k.strip().lower()] = v.strip() body = b"" clen = int(headers.get("content-length", 0)) if clen > 0: body = await reader.readexactly(clen) return method, path, headers, body # ── WebSocket helpers ─────────────────────────────────────────────────── async def ws_read_frame(reader): """Read one WebSocket frame, return (opcode, payload).""" b0, b1 = struct.unpack("!BB", await reader.readexactly(2)) opcode = b0 & 0x0F masked = b1 & 0x80 length = b1 & 0x7F if length == 126: length = struct.unpack("!H", await reader.readexactly(2))[0] elif length == 127: length = struct.unpack("!Q", await reader.readexactly(8))[0] mask = await reader.readexactly(4) if masked else None data = await reader.readexactly(length) if mask: data = bytes(b ^ mask[i % 4] for i, b in enumerate(data)) return opcode, data def ws_make_frame(opcode, data): """Build an unmasked WebSocket frame.""" frame = bytes([0x80 | opcode]) if len(data) < 126: frame += bytes([len(data)]) elif len(data) < 65536: frame += struct.pack("!BH", 126, len(data)) else: frame += struct.pack("!BQ", 127, len(data)) return frame + data async def handle_ws_tunnel(ws_reader, ws_writer, target): """Bridge WebSocket frames <-> raw TCP to target (host, port).""" try: ssh_reader, ssh_writer = await asyncio.open_connection(*target) except Exception as e: ws_writer.write(ws_make_frame(0x8, struct.pack("!H", 1011) + str(e).encode()[:123])) await ws_writer.drain() ws_writer.close() return async def ws_to_ssh(): try: while True: op, data = await ws_read_frame(ws_reader) if op == 0x8: break if op == 0x9: ws_writer.write(ws_make_frame(0xA, data)) await ws_writer.drain() continue if op in (0x1, 0x2): ssh_writer.write(data) await ssh_writer.drain() except Exception: pass finally: ssh_writer.close() async def ssh_to_ws(): try: while True: data = await ssh_reader.read(16384) if not data: break ws_writer.write(ws_make_frame(0x2, data)) await ws_writer.drain() except Exception: pass finally: ws_writer.write(ws_make_frame(0x8, struct.pack("!H", 1000))) try: await ws_writer.drain() except Exception: pass ws_writer.close() await asyncio.gather(ws_to_ssh(), ssh_to_ws()) # ── HTTP handling ─────────────────────────────────────────────────────── async def handle_http(reader, writer, first_line): method, path, headers, body = await parse_http_request(reader, first_line) # PWA assets if method == "GET" and path == "/manifest.json": resp = PWA_MANIFEST.encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/manifest+json\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return if method == "GET" and path == "/icon.svg": resp = PWA_ICON.encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: image/svg+xml\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return if method == "GET" and path == "/sw.js": resp = SERVICE_WORKER.encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/javascript\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return # GET /version if method == "GET" and path == "/version": resp = json.dumps({"version": VERSION}).encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/json\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return # GET /tunnel.py — download the tunnel client if method == "GET" and path == "/tunnel.py": tunnel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tunnel.py") try: source = open(tunnel_path, "rb").read() except FileNotFoundError: writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/plain\r\n") writer.write(f"Content-Length: {len(source)}\r\n".encode()) writer.write(b"Content-Disposition: attachment; filename=\"tunnel.py\"\r\n") writer.write(b"\r\n") writer.write(source) await writer.drain() writer.close() return # GET / or /chat → redirect to /group/lobby if method == "GET" and path in ("/", "/chat", "/chat/"): writer.write(b"HTTP/1.1 302 Found\r\nLocation: /group/lobby\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return # ── Group routes: /group/[/action] ── if path.startswith("/group/"): parts = path[7:].strip("/").split("/", 1) # strip "/group/" group_name = urllib.parse.unquote(parts[0]) if parts[0] else DEFAULT_GROUP action = parts[1] if len(parts) > 1 else "" grp = get_group(group_name) # GET /group/ — serve the web UI if method == "GET" and action == "": has_pw = "true" if grp.password else "false" resp = CHAT_HTML.replace("%%GROUP%%", group_name).replace("%%HAS_PASSWORD%%", has_pw).encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/html; charset=utf-8\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return # GET /group//events — SSE stream if method == "GET" and action.startswith("events"): # Check password query = "" if "?" in action: query = action.split("?", 1)[1] qs = urllib.parse.parse_qs(query) pw = qs.get("password", [""])[0] if grp.password and pw != grp.password: writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/event-stream\r\n") writer.write(b"Cache-Control: no-cache\r\n") writer.write(b"X-Accel-Buffering: no\r\n") writer.write(b"\r\n") writer.write(f"data: {json.dumps({'_auth': 'fail'})}\n\n".encode()) await writer.drain() writer.close() return writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: text/event-stream\r\n") writer.write(b"Cache-Control: no-cache\r\n") writer.write(b"Connection: keep-alive\r\n") writer.write(b"X-Accel-Buffering: no\r\n") writer.write(b"Access-Control-Allow-Origin: *\r\n") writer.write(b"\r\n") await writer.drain() for msg in grp.history: writer.write(f"data: {json.dumps(msg)}\n\n".encode()) await writer.drain() q: asyncio.Queue = asyncio.Queue() grp.sse_queues.append(q) # Register for DM delivery dm_name = qs.get("name", [""])[0] if dm_name: dm_targets.setdefault(dm_name, []).append(q) try: while True: msg = await q.get() writer.write(f"data: {json.dumps(msg)}\n\n".encode()) await writer.drain() except Exception: pass finally: if q in grp.sse_queues: grp.sse_queues.remove(q) if dm_name and dm_name in dm_targets: try: dm_targets[dm_name].remove(q) except ValueError: pass if not dm_targets[dm_name]: del dm_targets[dm_name] writer.close() return # POST /group//send if method == "POST" and action == "send": params = urllib.parse.parse_qs(body.decode()) name = params.get("name", ["anon"])[0] text = params.get("text", [""])[0].strip() if text: await grp.broadcast({"ts": time.time(), "user": name, "text": text}) writer.write(b"HTTP/1.1 204 No Content\r\n\r\n") await writer.drain() writer.close() return # POST /group//upload if method == "POST" and action == "upload": ct = headers.get("content-type", "") if "multipart/form-data" in ct and "boundary=" in ct: boundary = ct.split("boundary=")[1].strip() fields = parse_multipart(body, boundary) name = fields.get("name", (None, b"anon"))[1] if isinstance(name, bytes): name = name.decode() file_entry = fields.get("file") if file_entry and file_entry[0]: filename = file_entry[0] file_data = file_entry[1] if len(file_data) <= MAX_FILE_SIZE: file_id = hashlib.sha256(file_data + str(time.time()).encode()).hexdigest()[:16] store_file(file_id, file_data) await grp.broadcast({ "ts": time.time(), "user": name, "text": f"[file: {filename}]", "file_id": file_id, "filename": filename, "file_size": len(file_data) }) writer.write(b"HTTP/1.1 204 No Content\r\n\r\n") await writer.drain() writer.close() return # POST /group//setpass if method == "POST" and action == "setpass": params = urllib.parse.parse_qs(body.decode(), keep_blank_values=True) pw = params.get("password", [""])[0].strip() grp.password = pw if pw else None writer.write(b"HTTP/1.1 204 No Content\r\n\r\n") await writer.drain() writer.close() return # ── E2E encrypted DM routes ── # POST /keys — register public key: body = name=...&key= if method == "POST" and path == "/keys": params = urllib.parse.parse_qs(body.decode()) name = params.get("name", [""])[0] key = params.get("key", [""])[0] if name and key: user_keys[name] = key save_keys() writer.write(b"HTTP/1.1 204 No Content\r\n\r\n") await writer.drain() writer.close() return # GET /keys/ — get public key if method == "GET" and path.startswith("/keys/"): username = urllib.parse.unquote(path[6:]) key = user_keys.get(username) if key: resp = key.encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/json\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) else: writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return # GET /keys — list all registered usernames if method == "GET" and path == "/keys": resp = json.dumps(list(user_keys.keys())).encode() writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/json\r\n") writer.write(f"Content-Length: {len(resp)}\r\n".encode()) writer.write(b"\r\n") writer.write(resp) await writer.drain() writer.close() return # POST /dm — relay an encrypted DM: body = from=...&to=...&encrypted=...&nonce=... if method == "POST" and path == "/dm": params = urllib.parse.parse_qs(body.decode()) sender = params.get("from", [""])[0] recipient = params.get("to", [""])[0] encrypted = params.get("encrypted", [""])[0] nonce = params.get("nonce", [""])[0] if sender and recipient and encrypted: dm_msg = { "ts": time.time(), "user": sender, "dm": True, "to": recipient, "encrypted": encrypted, "nonce": nonce } # Deliver to all SSE queues registered for this recipient for q in dm_targets.get(recipient, []): try: q.put_nowait(dm_msg) except Exception: pass # Also deliver to sender so they see their own DM if sender != recipient: for q in dm_targets.get(sender, []): try: q.put_nowait(dm_msg) except Exception: pass writer.write(b"HTTP/1.1 204 No Content\r\n\r\n") await writer.drain() writer.close() return # GET /files// — download uploaded file if method == "GET" and path.startswith("/files/"): parts = path.split("/") if len(parts) >= 3: file_id = parts[2] if file_id in uploaded_files: data = uploaded_files[file_id] fname = urllib.parse.unquote(parts[3]) if len(parts) > 3 else "file" writer.write(b"HTTP/1.1 200 OK\r\n") writer.write(b"Content-Type: application/octet-stream\r\n") writer.write(f"Content-Length: {len(data)}\r\n".encode()) writer.write(f"Content-Disposition: attachment; filename=\"{fname}\"\r\n".encode()) writer.write(b"\r\n") writer.write(data) await writer.drain() writer.close() return writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return # GET /tunnel/ — WebSocket upgrade for SSH tunnel if method == "GET" and path.startswith("/tunnel"): # Parse destination: /tunnel/parspack, /tunnel/mequ, or /tunnel (default: parspack) parts = path.strip("/").split("/") dest_name = parts[1] if len(parts) > 1 else "parspack" target = TUNNEL_TARGETS.get(dest_name) if not target: names = ", ".join(TUNNEL_TARGETS.keys()) err = f"Unknown destination '{dest_name}'. Available: {names}".encode() writer.write(b"HTTP/1.1 404 Not Found\r\n") writer.write(f"Content-Length: {len(err)}\r\n".encode()) writer.write(b"\r\n") writer.write(err) await writer.drain() writer.close() return ws_key = headers.get("sec-websocket-key", "") if not ws_key: writer.write(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() return accept = base64.b64encode( hashlib.sha1((ws_key + "258EAFA5-E914-47DA-95CA-5AB9DC65B5F3").encode()).digest() ).decode() writer.write(f"HTTP/1.1 101 Switching Protocols\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Accept: {accept}\r\n" f"\r\n".encode()) await writer.drain() await handle_ws_tunnel(reader, writer, target) return # 404 writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") await writer.drain() writer.close() # ── Connection handler (TCP chat + HTTP on same port) ────────────────── async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): first_line = (await reader.readline()).decode().strip() # HTTP request if first_line.startswith(("GET ", "POST ", "PUT ", "HEAD ")): await handle_http(reader, writer, first_line) return # Raw TCP chat client — first line is the name (uses lobby group) name = first_line grp = get_group(DEFAULT_GROUP) grp.clients[writer] = name for msg in grp.history: writer.write((json.dumps(msg) + "\n").encode()) await writer.drain() await grp.broadcast({"ts": time.time(), "user": "***", "text": f"{name} joined"}) print(f"+ {name} connected ({len(grp.clients)} online in {grp.name})") try: while True: data = await reader.readline() if not data: break line = data.decode().rstrip("\n") if not line: continue try: pkt = json.loads(line) if pkt.get("type") == "file": file_data = base64.b64decode(pkt["data"]) if len(file_data) > MAX_FILE_SIZE: continue file_id = hashlib.sha256(file_data + str(time.time()).encode()).hexdigest()[:16] store_file(file_id, file_data) await grp.broadcast({ "ts": time.time(), "user": name, "text": f"[file: {pkt['filename']}]", "file_id": file_id, "filename": pkt["filename"], "file_size": len(file_data) }) else: text = pkt.get("text", "").strip() if text: await grp.broadcast({"ts": time.time(), "user": name, "text": text}) except (json.JSONDecodeError, KeyError): if line.strip(): await grp.broadcast({"ts": time.time(), "user": name, "text": line.strip()}) except Exception: pass finally: grp.clients.pop(writer, None) await grp.broadcast({"ts": time.time(), "user": "***", "text": f"{name} left"}) print(f"- {name} disconnected ({len(grp.clients)} online in {grp.name})") writer.close() async def run_server(port: int): srv = await asyncio.start_server(handle, "0.0.0.0", port) print(f"Chat server listening on :{port}") print(f" Web UI: http://localhost:{port}/chat") print(f" CLI: python3 chat.py localhost {port}") async with srv: await srv.serve_forever() # ── Client TUI ────────────────────────────────────────────────────────── # Curses color pair assignments CP_SEPARATOR = 1 CP_MY_NAME = 2 CP_SYSTEM = 3 CP_TIMESTAMP = 4 # 10+ for other users OTHER_CURSES_COLORS = [ curses.COLOR_YELLOW, curses.COLOR_MAGENTA, curses.COLOR_CYAN, curses.COLOR_RED, curses.COLOR_BLUE, curses.COLOR_WHITE, ] def user_color_pair(username: str, my_name: str) -> int: """Return curses color pair number for a username.""" if username == "***": return CP_SYSTEM if username == my_name: return CP_MY_NAME h = 0 for c in username: h = ((h << 5) - h + ord(c)) & 0xFFFFFFFF return 10 + (h % len(OTHER_CURSES_COLORS)) class ChatClient: def __init__(self, host: str, name: str, port: int): self.host = host self.name = name self.port = port self.messages: list[tuple[str, int]] = [] # (text, color_pair) self.input_buf = "" self.scroll = 0 self.reader: asyncio.StreamReader | None = None self.writer: asyncio.StreamWriter | None = None self.running = True self.color_seed = 0 def get_color(self, username: str) -> int: """Color pair for username, affected by color_seed.""" if username == "***": return CP_SYSTEM if username == self.name: return CP_MY_NAME h = self.color_seed for c in username: h = ((h << 5) - h + ord(c)) & 0xFFFFFFFF return 10 + (h % len(OTHER_CURSES_COLORS)) def notify(self, msg: dict): """Send terminal bell + update title for messages from others.""" if msg["user"] == self.name or msg["user"] == "***": return # Terminal bell sys.stdout.write("\a") sys.stdout.flush() # OSC title update (works in most terminals: iTerm2, Terminal.app, etc.) preview = msg["text"][:50] if not msg.get("file_id") else f"shared {msg['filename']}" sys.stdout.write(f"\033]0;[NEW] {msg['user']}: {preview}\007") sys.stdout.flush() def reset_title(self): sys.stdout.write(f"\033]0;Chat - {self.name}\007") sys.stdout.flush() def add_message(self, msg: dict): t = time.strftime("%H:%M", time.localtime(msg["ts"])) cp = self.get_color(msg["user"]) if msg["user"] == "***": self.messages.append((f" {t} {msg['text']}", CP_SYSTEM)) elif msg.get("file_id"): self.messages.append(( f" {t} {msg['user']}: [file: {msg['filename']} ({msg['file_size']} bytes)]", cp)) else: text = msg["text"] lines = text.split("\n") self.messages.append((f" {t} {msg['user']}: {lines[0]}", cp)) for extra in lines[1:]: self.messages.append((f" {extra}", cp)) self.notify(msg) async def recv_loop(self): while self.running: line = await self.reader.readline() if not line: self.messages.append((" *** connection lost", CP_SYSTEM)) self.running = False break msg = json.loads(line.decode()) self.add_message(msg) async def send_json(self, pkt: dict): """Send a JSON packet to the server.""" self.writer.write((json.dumps(pkt) + "\n").encode()) await self.writer.drain() async def send_msg(self, text: str): """Send a chat message (supports multiline).""" await self.send_json({"type": "msg", "text": text}) async def send_file(self, filepath: str): """Upload a file to the chat.""" filepath = os.path.expanduser(filepath) if not os.path.isfile(filepath): self.messages.append((f" *** file not found: {filepath}", CP_SYSTEM)) return size = os.path.getsize(filepath) if size > MAX_FILE_SIZE: self.messages.append((f" *** file too large ({size} bytes, max {MAX_FILE_SIZE})", CP_SYSTEM)) return with open(filepath, "rb") as f: data = f.read() await self.send_json({ "type": "file", "filename": os.path.basename(filepath), "data": base64.b64encode(data).decode() }) async def run(self, stdscr): curses.curs_set(1) curses.use_default_colors() curses.init_pair(CP_SEPARATOR, curses.COLOR_CYAN, -1) curses.init_pair(CP_MY_NAME, curses.COLOR_GREEN, -1) curses.init_pair(CP_SYSTEM, curses.COLOR_CYAN, -1) curses.init_pair(CP_TIMESTAMP, curses.COLOR_WHITE, -1) for i, color in enumerate(OTHER_CURSES_COLORS): curses.init_pair(10 + i, color, -1) stdscr.nodelay(True) stdscr.timeout(50) self.reader, self.writer = await asyncio.open_connection(self.host, self.port) # Send name as plain first line (server expects it) self.writer.write((self.name + "\n").encode()) await self.writer.drain() recv_task = asyncio.create_task(self.recv_loop()) while self.running: h, w = stdscr.getmaxyx() stdscr.erase() sep_y = h - 2 stdscr.addstr(sep_y, 0, "─" * w, curses.color_pair(CP_SEPARATOR)) visible = self.messages[-(sep_y + self.scroll):len(self.messages) - self.scroll if self.scroll else None] for i, (line, cp) in enumerate(visible[-(sep_y):]): try: stdscr.addnstr(i, 0, line, w - 1, curses.color_pair(cp)) except curses.error: pass prompt = f" {self.name}> " try: stdscr.addstr(h - 1, 0, prompt, curses.color_pair(CP_MY_NAME)) stdscr.addnstr(h - 1, len(prompt), self.input_buf, w - len(prompt) - 1) stdscr.move(h - 1, min(len(prompt) + len(self.input_buf), w - 1)) except curses.error: pass stdscr.refresh() try: ch = stdscr.get_wch() except curses.error: await asyncio.sleep(0.05) continue self.reset_title() if isinstance(ch, str): if ch == "\n": if self.input_buf.strip(): cmd = self.input_buf.strip() if cmd == "/quit": break elif cmd in ("/color", "/colors"): self.color_seed += 1 self.messages.append((" *** Colors reshuffled!", CP_SYSTEM)) elif cmd.startswith("/file "): await self.send_file(cmd[6:].strip()) else: await self.send_msg(self.input_buf) self.input_buf = "" self.scroll = 0 elif ch == "\x7f" or ch == "\b": self.input_buf = self.input_buf[:-1] elif ch == "\x1b": pass elif ch.isprintable() or ch == "\t": self.input_buf += ch elif isinstance(ch, int): if ch == curses.KEY_BACKSPACE: self.input_buf = self.input_buf[:-1] elif ch == curses.KEY_PPAGE: self.scroll = min(self.scroll + 5, max(0, len(self.messages) - (h - 2))) elif ch == curses.KEY_NPAGE: self.scroll = max(self.scroll - 5, 0) self.running = False recv_task.cancel() self.writer.close() def run_client(host: str, name: str, port: int): client = ChatClient(host, name, port) async def main(stdscr): await client.run(stdscr) curses.wrapper(lambda stdscr: asyncio.run(main(stdscr))) # ── Main ──────────────────────────────────────────────────────────────── if __name__ == "__main__": if len(sys.argv) < 2: print(__doc__.strip()) sys.exit(1) if sys.argv[1] == "server": port = int(sys.argv[2]) if len(sys.argv) > 2 else PORT asyncio.run(run_server(port)) else: host = sys.argv[1] name = sys.argv[2] if len(sys.argv) > 2 else "anon" port = int(sys.argv[3]) if len(sys.argv) > 3 else PORT run_client(host, name, port)