Files
featherChat/chat.py
Siavash Sameni 1d0b87b509 v9: multi-destination tunnel support (parspack, mequ, alipi)
Server:
- /tunnel/<dest> routes: parspack (185.208.174.152:22),
  mequ (188.213.68.133:2022), alipi (10.66.66.2:22)
- /tunnel without dest defaults to parspack

Client (tunnel.py):
- --destination / -d flag to pick target
- Lists available destinations in --help

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 15:03:23 +04:00

898 lines
32 KiB
Python

#!/usr/bin/env python3
"""
Minimal multi-user chat. No dependencies beyond stdlib.
Server: python3 chat.py server [port]
Client: python3 chat.py <host> <name> [port]
The server also exposes a web UI at /chat (HTTP on the same port).
CLI client commands:
/file <path> Upload a file to the chat
/quit Disconnect
"""
import asyncio
import base64
import curses
import hashlib
import json
import os
import struct
import sys
import time
import html
import urllib.parse
PORT = 9999
VERSION = "9"
TUNNEL_TARGETS = {
"parspack": ("185.208.174.152", 22),
"mequ": ("188.213.68.133", 2022),
"alipi": ("10.66.66.2", 22),
}
MAX_FILE_SIZE = 1 * 1024 * 1024 # 1 MB per file
MAX_TOTAL_STORAGE = 50 * 1024 * 1024 # 50 MB total
# ── Server ──────────────────────────────────────────────────────────────
clients: dict[asyncio.StreamWriter, str] = {} # TCP clients
sse_queues: list[asyncio.Queue] = [] # web clients
history: list[dict] = []
uploaded_files: dict[str, bytes] = {} # file_id -> raw bytes (insertion order)
total_file_bytes = 0
def store_file(file_id: str, data: bytes):
"""Store a file, evicting oldest files if total storage exceeds limit."""
global total_file_bytes
uploaded_files[file_id] = data
total_file_bytes += len(data)
while total_file_bytes > MAX_TOTAL_STORAGE and uploaded_files:
oldest_id = next(iter(uploaded_files))
total_file_bytes -= len(uploaded_files.pop(oldest_id))
async def broadcast(msg: dict):
history.append(msg)
line = json.dumps(msg) + "\n"
# TCP clients
dead = []
for w in clients:
try:
w.write(line.encode())
await w.drain()
except Exception:
dead.append(w)
for w in dead:
clients.pop(w, None)
# SSE web clients
dead_q = []
for q in sse_queues:
try:
q.put_nowait(msg)
except Exception:
dead_q.append(q)
for q in dead_q:
sse_queues.remove(q)
# ── HTML / JS chat page ────────────────────────────────────────────────
CHAT_HTML = r"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<title>Chat</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { background: #1a1a2e; color: #e0e0e0; font-family: 'Courier New', monospace;
display: flex; flex-direction: column; height: 100vh; }
#messages { flex: 1; overflow-y: auto; padding: 12px; }
.msg { padding: 3px 0; white-space: pre-wrap; word-wrap: break-word; }
.msg code { background: #2a2a4a; padding: 1px 5px; border-radius: 3px; color: #f8c555; }
.msg pre { background: #12122a; border: 1px solid #333; border-radius: 4px;
padding: 8px; margin: 4px 0; overflow-x: auto; }
.msg pre code { background: none; padding: 0; color: #e0e0e0; }
.msg strong { color: #fff; }
.msg em { color: #ccc; }
.msg a.auto-link { color: #67c7eb; }
.ts { color: #666; }
.sys { color: #5e9ca0; font-style: italic; }
.file-link { display: inline-block; background: #0f3460; border: 1px solid #444;
padding: 4px 10px; border-radius: 4px; margin: 2px 0; color: #67c7eb;
text-decoration: none; }
.file-link:hover { background: #1a4a80; }
#bottom { display: flex; padding: 8px; gap: 8px; border-top: 1px solid #333;
background: #16213e; align-items: flex-end; }
#name { width: 100px; padding: 8px; background: #0f3460; border: 1px solid #444;
color: #e0e0e0; border-radius: 4px; align-self: flex-end; }
#input { flex: 1; padding: 8px; background: #0f3460; border: 1px solid #444;
color: #e0e0e0; border-radius: 4px; resize: none; min-height: 38px;
max-height: 200px; font-family: inherit; font-size: inherit; line-height: 1.4; }
#send { padding: 8px 16px; background: #e94560; border: none; color: #fff;
border-radius: 4px; cursor: pointer; align-self: flex-end; }
#send:hover { background: #c73e54; }
#file-btn { padding: 8px 10px; background: #0f3460; border: 1px solid #444; color: #e0e0e0;
border-radius: 4px; cursor: pointer; align-self: flex-end; font-size: 1.1em; }
#file-btn:hover { background: #1a4a80; }
#file-input { display: none; }
.hint { color: #555; font-size: 0.75em; padding: 2px 12px; }
</style>
</head>
<body>
<div id="messages"></div>
<div class="hint">Shift+Enter for newline · Enter to send</div>
<div id="bottom">
<input id="name" placeholder="Name" value="">
<textarea id="input" placeholder="Type a message…" rows="1" autofocus
autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"></textarea>
<label id="file-btn" title="Upload file">&#128206;<input type="file" id="file-input"></label>
<button id="send">Send</button>
</div>
<script>
const $msg = document.getElementById('messages');
const $input = document.getElementById('input');
const $name = document.getElementById('name');
const $send = document.getElementById('send');
const $file = document.getElementById('file-input');
$name.value = 'user' + Math.floor(Math.random() * 1000);
const USER_COLORS = [
'#e6a23c', '#f56c9d', '#67c7eb', '#b39ddb',
'#ff8a65', '#81c784', '#ce93d8', '#4fc3f7',
'#ffb74d', '#aed581', '#f06292', '#4dd0e1'
];
function userColor(name) {
if (name === $name.value.trim()) return '#4ade80';
let h = 0;
for (let i = 0; i < name.length; i++) h = ((h << 5) - h + name.charCodeAt(i)) | 0;
return USER_COLORS[Math.abs(h) % USER_COLORS.length];
}
function esc(s) {
const d = document.createElement('div');
d.textContent = s;
return d.innerHTML;
}
function renderMd(raw) {
// We escape first, then apply formatting on the escaped text.
let s = esc(raw);
// Fenced code blocks: ```lang\ncode\n```
s = s.replace(/```(\w*)\n([\s\S]*?)```/g, function(_, lang, code) {
return '<pre><code>' + code + '</code></pre>';
});
// Also handle ``` without language or newline
s = s.replace(/```([\s\S]*?)```/g, function(_, code) {
return '<pre><code>' + code + '</code></pre>';
});
// Inline code: `...` (but not inside <pre>)
s = s.replace(/`([^`\n]+)`/g, '<code>$1</code>');
// Bold: **...**
s = s.replace(/\*\*(.+?)\*\*/g, '<strong>$1</strong>');
// Italic: *...* (not preceded/followed by *)
s = s.replace(/(?<!\*)\*([^*]+)\*(?!\*)/g, '<em>$1</em>');
// Auto-link URLs
s = s.replace(/(https?:\/\/[^\s<&]+)/g, '<a class="auto-link" href="$1" target="_blank" rel="noopener">$1</a>');
return s;
}
function formatSize(n) {
if (n < 1024) return n + ' B';
if (n < 1048576) return (n/1024).toFixed(1) + ' KB';
return (n/1048576).toFixed(1) + ' MB';
}
function addMsg(data) {
const d = document.createElement('div');
d.className = 'msg';
const t = new Date(data.ts * 1000).toLocaleTimeString([], {hour:'2-digit',minute:'2-digit'});
if (data.user === '***') {
d.innerHTML = '<span class="ts">' + t + '</span> <span class="sys">' + esc(data.text) + '</span>';
} else if (data.file_id) {
const c = userColor(data.user);
d.innerHTML = '<span class="ts">' + t + '</span> <span style="color:' + c + ';font-weight:bold">'
+ esc(data.user) + '</span>: '
+ '<a class="file-link" href="/files/' + esc(data.file_id) + '/' + encodeURIComponent(data.filename)
+ '" download="' + esc(data.filename) + '">&#128206; ' + esc(data.filename)
+ ' (' + formatSize(data.file_size) + ')</a>';
} else {
const c = userColor(data.user);
d.innerHTML = '<span class="ts">' + t + '</span> <span style="color:' + c + ';font-weight:bold">'
+ esc(data.user) + '</span>: ' + renderMd(data.text);
}
$msg.appendChild(d);
$msg.scrollTop = $msg.scrollHeight;
}
function send() {
const text = $input.value.trimEnd();
const name = $name.value.trim() || 'anon';
if (!text) return;
fetch('/chat/send', {
method: 'POST',
headers: {'Content-Type': 'application/x-www-form-urlencoded'},
body: 'name=' + encodeURIComponent(name) + '&text=' + encodeURIComponent(text)
});
$input.value = '';
$input.style.height = 'auto';
}
// auto-resize textarea
$input.addEventListener('input', function() {
this.style.height = 'auto';
this.style.height = Math.min(this.scrollHeight, 200) + 'px';
});
$send.onclick = send;
$input.onkeydown = function(e) {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
send();
}
};
// file upload
$file.onchange = function() {
if (!this.files.length) return;
const f = this.files[0];
const name = $name.value.trim() || 'anon';
const fd = new FormData();
fd.append('name', name);
fd.append('file', f);
fetch('/chat/upload', { method: 'POST', body: fd });
this.value = '';
};
// Notifications
let notifEnabled = false;
if ('Notification' in window) {
if (Notification.permission === 'granted') { notifEnabled = true; }
else if (Notification.permission !== 'denied') {
Notification.requestPermission().then(p => { notifEnabled = (p === 'granted'); });
}
}
let unreadCount = 0;
const baseTitle = document.title;
function notify(data) {
// Only notify when tab is not focused and it's not our own message
if (document.hasFocus()) return;
if (data.user === $name.value.trim()) return;
if (data.user === '***') return;
unreadCount++;
document.title = '(' + unreadCount + ') ' + baseTitle;
if (notifEnabled) {
const body = data.file_id
? data.user + ' shared a file: ' + data.filename
: data.user + ': ' + data.text.substring(0, 100);
const n = new Notification('Chat', { body: body, tag: 'chat-msg' });
setTimeout(() => n.close(), 5000);
}
}
window.addEventListener('focus', function() {
unreadCount = 0;
document.title = baseTitle;
});
// SSE
const es = new EventSource('/chat/events');
es.onmessage = function(e) {
const data = JSON.parse(e.data);
addMsg(data);
notify(data);
};
es.onerror = function() { addMsg({ts: Date.now()/1000, user: '***', text: 'Connection lost. Retrying…'}); };
</script>
</body>
</html>
"""
# ── Multipart parser (minimal, for file uploads) ───────────────────────
def parse_multipart(body: bytes, boundary: str) -> dict:
"""Parse multipart/form-data, return {field_name: (filename|None, value)}."""
parts = body.split(b"--" + boundary.encode())
result = {}
for part in parts:
if not part or part.strip() in (b"", b"--"):
continue
if b"\r\n\r\n" not in part:
continue
header_block, content = part.split(b"\r\n\r\n", 1)
if content.endswith(b"\r\n"):
content = content[:-2]
headers_str = header_block.decode(errors="replace")
name = filename = None
for h in headers_str.split("\r\n"):
if "content-disposition" in h.lower():
for kv in h.split(";"):
kv = kv.strip()
if kv.startswith("name="):
name = kv.split("=", 1)[1].strip('"')
if kv.startswith("filename="):
filename = kv.split("=", 1)[1].strip('"')
if name:
result[name] = (filename, content)
return result
# ── HTTP request parser ────────────────────────────────────────────────
async def parse_http_request(reader: asyncio.StreamReader, first_line: str):
"""Parse an HTTP request, return (method, path, headers, body)."""
parts = first_line.split()
method = parts[0]
path = parts[1] if len(parts) > 1 else "/"
headers = {}
while True:
hline = (await reader.readline()).decode().strip()
if not hline:
break
k, _, v = hline.partition(":")
headers[k.strip().lower()] = v.strip()
body = b""
clen = int(headers.get("content-length", 0))
if clen > 0:
body = await reader.readexactly(clen)
return method, path, headers, body
# ── WebSocket helpers ───────────────────────────────────────────────────
async def ws_read_frame(reader):
"""Read one WebSocket frame, return (opcode, payload)."""
b0, b1 = struct.unpack("!BB", await reader.readexactly(2))
opcode = b0 & 0x0F
masked = b1 & 0x80
length = b1 & 0x7F
if length == 126:
length = struct.unpack("!H", await reader.readexactly(2))[0]
elif length == 127:
length = struct.unpack("!Q", await reader.readexactly(8))[0]
mask = await reader.readexactly(4) if masked else None
data = await reader.readexactly(length)
if mask:
data = bytes(b ^ mask[i % 4] for i, b in enumerate(data))
return opcode, data
def ws_make_frame(opcode, data):
"""Build an unmasked WebSocket frame."""
frame = bytes([0x80 | opcode])
if len(data) < 126:
frame += bytes([len(data)])
elif len(data) < 65536:
frame += struct.pack("!BH", 126, len(data))
else:
frame += struct.pack("!BQ", 127, len(data))
return frame + data
async def handle_ws_tunnel(ws_reader, ws_writer, target):
"""Bridge WebSocket frames <-> raw TCP to target (host, port)."""
try:
ssh_reader, ssh_writer = await asyncio.open_connection(*target)
except Exception as e:
ws_writer.write(ws_make_frame(0x8, struct.pack("!H", 1011) + str(e).encode()[:123]))
await ws_writer.drain()
ws_writer.close()
return
async def ws_to_ssh():
try:
while True:
op, data = await ws_read_frame(ws_reader)
if op == 0x8:
break
if op == 0x9:
ws_writer.write(ws_make_frame(0xA, data))
await ws_writer.drain()
continue
if op in (0x1, 0x2):
ssh_writer.write(data)
await ssh_writer.drain()
except Exception:
pass
finally:
ssh_writer.close()
async def ssh_to_ws():
try:
while True:
data = await ssh_reader.read(16384)
if not data:
break
ws_writer.write(ws_make_frame(0x2, data))
await ws_writer.drain()
except Exception:
pass
finally:
ws_writer.write(ws_make_frame(0x8, struct.pack("!H", 1000)))
try:
await ws_writer.drain()
except Exception:
pass
ws_writer.close()
await asyncio.gather(ws_to_ssh(), ssh_to_ws())
# ── HTTP handling ───────────────────────────────────────────────────────
async def handle_http(reader, writer, first_line):
method, path, headers, body = await parse_http_request(reader, first_line)
# GET /version
if method == "GET" and path == "/version":
resp = json.dumps({"version": VERSION}).encode()
writer.write(b"HTTP/1.1 200 OK\r\n")
writer.write(b"Content-Type: application/json\r\n")
writer.write(f"Content-Length: {len(resp)}\r\n".encode())
writer.write(b"\r\n")
writer.write(resp)
await writer.drain()
writer.close()
return
# GET /tunnel.py — download the tunnel client
if method == "GET" and path == "/tunnel.py":
tunnel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tunnel.py")
try:
source = open(tunnel_path, "rb").read()
except FileNotFoundError:
writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n")
await writer.drain()
writer.close()
return
writer.write(b"HTTP/1.1 200 OK\r\n")
writer.write(b"Content-Type: text/plain\r\n")
writer.write(f"Content-Length: {len(source)}\r\n".encode())
writer.write(b"Content-Disposition: attachment; filename=\"tunnel.py\"\r\n")
writer.write(b"\r\n")
writer.write(source)
await writer.drain()
writer.close()
return
# GET / or /chat — serve the web UI
if method == "GET" and path in ("/", "/chat", "/chat/"):
resp = CHAT_HTML.encode()
writer.write(b"HTTP/1.1 200 OK\r\n")
writer.write(b"Content-Type: text/html; charset=utf-8\r\n")
writer.write(f"Content-Length: {len(resp)}\r\n".encode())
writer.write(b"\r\n")
writer.write(resp)
await writer.drain()
writer.close()
return
# GET /chat/events — SSE stream
if method == "GET" and path == "/chat/events":
writer.write(b"HTTP/1.1 200 OK\r\n")
writer.write(b"Content-Type: text/event-stream\r\n")
writer.write(b"Cache-Control: no-cache\r\n")
writer.write(b"Connection: keep-alive\r\n")
writer.write(b"X-Accel-Buffering: no\r\n")
writer.write(b"Access-Control-Allow-Origin: *\r\n")
writer.write(b"\r\n")
await writer.drain()
for msg in history:
writer.write(f"data: {json.dumps(msg)}\n\n".encode())
await writer.drain()
q: asyncio.Queue = asyncio.Queue()
sse_queues.append(q)
try:
while True:
msg = await q.get()
writer.write(f"data: {json.dumps(msg)}\n\n".encode())
await writer.drain()
except Exception:
pass
finally:
if q in sse_queues:
sse_queues.remove(q)
writer.close()
return
# POST /chat/send — send a message (supports multiline via JSON text field)
if method == "POST" and path == "/chat/send":
params = urllib.parse.parse_qs(body.decode())
name = params.get("name", ["anon"])[0]
text = params.get("text", [""])[0].strip()
if text:
await broadcast({"ts": time.time(), "user": name, "text": text})
writer.write(b"HTTP/1.1 204 No Content\r\n\r\n")
await writer.drain()
writer.close()
return
# POST /chat/upload — file upload (multipart/form-data)
if method == "POST" and path == "/chat/upload":
ct = headers.get("content-type", "")
if "multipart/form-data" in ct and "boundary=" in ct:
boundary = ct.split("boundary=")[1].strip()
fields = parse_multipart(body, boundary)
name = fields.get("name", (None, b"anon"))[1]
if isinstance(name, bytes):
name = name.decode()
file_entry = fields.get("file")
if file_entry and file_entry[0]:
filename = file_entry[0]
file_data = file_entry[1]
if len(file_data) <= MAX_FILE_SIZE:
file_id = hashlib.sha256(file_data + str(time.time()).encode()).hexdigest()[:16]
store_file(file_id, file_data)
await broadcast({
"ts": time.time(), "user": name,
"text": f"[file: {filename}]",
"file_id": file_id, "filename": filename,
"file_size": len(file_data)
})
writer.write(b"HTTP/1.1 204 No Content\r\n\r\n")
await writer.drain()
writer.close()
return
# GET /files/<id>/<filename> — download uploaded file
if method == "GET" and path.startswith("/files/"):
parts = path.split("/")
if len(parts) >= 3:
file_id = parts[2]
if file_id in uploaded_files:
data = uploaded_files[file_id]
fname = urllib.parse.unquote(parts[3]) if len(parts) > 3 else "file"
writer.write(b"HTTP/1.1 200 OK\r\n")
writer.write(b"Content-Type: application/octet-stream\r\n")
writer.write(f"Content-Length: {len(data)}\r\n".encode())
writer.write(f"Content-Disposition: attachment; filename=\"{fname}\"\r\n".encode())
writer.write(b"\r\n")
writer.write(data)
await writer.drain()
writer.close()
return
writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n")
await writer.drain()
writer.close()
return
# GET /tunnel/<dest> — WebSocket upgrade for SSH tunnel
if method == "GET" and path.startswith("/tunnel"):
# Parse destination: /tunnel/parspack, /tunnel/mequ, or /tunnel (default: parspack)
parts = path.strip("/").split("/")
dest_name = parts[1] if len(parts) > 1 else "parspack"
target = TUNNEL_TARGETS.get(dest_name)
if not target:
names = ", ".join(TUNNEL_TARGETS.keys())
err = f"Unknown destination '{dest_name}'. Available: {names}".encode()
writer.write(b"HTTP/1.1 404 Not Found\r\n")
writer.write(f"Content-Length: {len(err)}\r\n".encode())
writer.write(b"\r\n")
writer.write(err)
await writer.drain()
writer.close()
return
ws_key = headers.get("sec-websocket-key", "")
if not ws_key:
writer.write(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n")
await writer.drain()
writer.close()
return
accept = base64.b64encode(
hashlib.sha1((ws_key + "258EAFA5-E914-47DA-95CA-5AB9DC65B5F3").encode()).digest()
).decode()
writer.write(f"HTTP/1.1 101 Switching Protocols\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Accept: {accept}\r\n"
f"\r\n".encode())
await writer.drain()
await handle_ws_tunnel(reader, writer, target)
return
# 404
writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n")
await writer.drain()
writer.close()
# ── Connection handler (TCP chat + HTTP on same port) ──────────────────
async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
first_line = (await reader.readline()).decode().strip()
# HTTP request
if first_line.startswith(("GET ", "POST ", "PUT ", "HEAD ")):
await handle_http(reader, writer, first_line)
return
# Raw TCP chat client — first line is the name
name = first_line
clients[writer] = name
for msg in history:
writer.write((json.dumps(msg) + "\n").encode())
await writer.drain()
await broadcast({"ts": time.time(), "user": "***", "text": f"{name} joined"})
print(f"+ {name} connected ({len(clients)} online)")
try:
while True:
data = await reader.readline()
if not data:
break
line = data.decode().rstrip("\n")
if not line:
continue
# Try JSON (new protocol: supports multiline + files)
try:
pkt = json.loads(line)
if pkt.get("type") == "file":
file_data = base64.b64decode(pkt["data"])
if len(file_data) > MAX_FILE_SIZE:
continue
file_id = hashlib.sha256(file_data + str(time.time()).encode()).hexdigest()[:16]
store_file(file_id, file_data)
await broadcast({
"ts": time.time(), "user": name,
"text": f"[file: {pkt['filename']}]",
"file_id": file_id, "filename": pkt["filename"],
"file_size": len(file_data)
})
else:
text = pkt.get("text", "").strip()
if text:
await broadcast({"ts": time.time(), "user": name, "text": text})
except (json.JSONDecodeError, KeyError):
# Legacy plain text (single line, backwards compat)
if line.strip():
await broadcast({"ts": time.time(), "user": name, "text": line.strip()})
except Exception:
pass
finally:
clients.pop(writer, None)
await broadcast({"ts": time.time(), "user": "***", "text": f"{name} left"})
print(f"- {name} disconnected ({len(clients)} online)")
writer.close()
async def run_server(port: int):
srv = await asyncio.start_server(handle, "0.0.0.0", port)
print(f"Chat server listening on :{port}")
print(f" Web UI: http://localhost:{port}/chat")
print(f" CLI: python3 chat.py localhost <name> {port}")
async with srv:
await srv.serve_forever()
# ── Client TUI ──────────────────────────────────────────────────────────
# Curses color pair assignments
CP_SEPARATOR = 1
CP_MY_NAME = 2
CP_SYSTEM = 3
CP_TIMESTAMP = 4
# 10+ for other users
OTHER_CURSES_COLORS = [
curses.COLOR_YELLOW,
curses.COLOR_MAGENTA,
curses.COLOR_CYAN,
curses.COLOR_RED,
curses.COLOR_BLUE,
curses.COLOR_WHITE,
]
def user_color_pair(username: str, my_name: str) -> int:
"""Return curses color pair number for a username."""
if username == "***":
return CP_SYSTEM
if username == my_name:
return CP_MY_NAME
h = 0
for c in username:
h = ((h << 5) - h + ord(c)) & 0xFFFFFFFF
return 10 + (h % len(OTHER_CURSES_COLORS))
class ChatClient:
def __init__(self, host: str, name: str, port: int):
self.host = host
self.name = name
self.port = port
self.messages: list[tuple[str, int]] = [] # (text, color_pair)
self.input_buf = ""
self.scroll = 0
self.reader: asyncio.StreamReader | None = None
self.writer: asyncio.StreamWriter | None = None
self.running = True
def notify(self, msg: dict):
"""Send terminal bell + update title for messages from others."""
if msg["user"] == self.name or msg["user"] == "***":
return
# Terminal bell
sys.stdout.write("\a")
sys.stdout.flush()
# OSC title update (works in most terminals: iTerm2, Terminal.app, etc.)
preview = msg["text"][:50] if not msg.get("file_id") else f"shared {msg['filename']}"
sys.stdout.write(f"\033]0;[NEW] {msg['user']}: {preview}\007")
sys.stdout.flush()
def reset_title(self):
sys.stdout.write(f"\033]0;Chat - {self.name}\007")
sys.stdout.flush()
def add_message(self, msg: dict):
t = time.strftime("%H:%M", time.localtime(msg["ts"]))
cp = user_color_pair(msg["user"], self.name)
if msg["user"] == "***":
self.messages.append((f" {t} {msg['text']}", CP_SYSTEM))
elif msg.get("file_id"):
self.messages.append((
f" {t} {msg['user']}: [file: {msg['filename']} ({msg['file_size']} bytes)]", cp))
else:
text = msg["text"]
lines = text.split("\n")
self.messages.append((f" {t} {msg['user']}: {lines[0]}", cp))
for extra in lines[1:]:
self.messages.append((f" {extra}", cp))
self.notify(msg)
async def recv_loop(self):
while self.running:
line = await self.reader.readline()
if not line:
self.messages.append((" *** connection lost", CP_SYSTEM))
self.running = False
break
msg = json.loads(line.decode())
self.add_message(msg)
async def send_json(self, pkt: dict):
"""Send a JSON packet to the server."""
self.writer.write((json.dumps(pkt) + "\n").encode())
await self.writer.drain()
async def send_msg(self, text: str):
"""Send a chat message (supports multiline)."""
await self.send_json({"type": "msg", "text": text})
async def send_file(self, filepath: str):
"""Upload a file to the chat."""
filepath = os.path.expanduser(filepath)
if not os.path.isfile(filepath):
self.messages.append((f" *** file not found: {filepath}", CP_SYSTEM))
return
size = os.path.getsize(filepath)
if size > MAX_FILE_SIZE:
self.messages.append((f" *** file too large ({size} bytes, max {MAX_FILE_SIZE})", CP_SYSTEM))
return
with open(filepath, "rb") as f:
data = f.read()
await self.send_json({
"type": "file",
"filename": os.path.basename(filepath),
"data": base64.b64encode(data).decode()
})
async def run(self, stdscr):
curses.curs_set(1)
curses.use_default_colors()
curses.init_pair(CP_SEPARATOR, curses.COLOR_CYAN, -1)
curses.init_pair(CP_MY_NAME, curses.COLOR_GREEN, -1)
curses.init_pair(CP_SYSTEM, curses.COLOR_CYAN, -1)
curses.init_pair(CP_TIMESTAMP, curses.COLOR_WHITE, -1)
for i, color in enumerate(OTHER_CURSES_COLORS):
curses.init_pair(10 + i, color, -1)
stdscr.nodelay(True)
stdscr.timeout(50)
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
# Send name as plain first line (server expects it)
self.writer.write((self.name + "\n").encode())
await self.writer.drain()
recv_task = asyncio.create_task(self.recv_loop())
while self.running:
h, w = stdscr.getmaxyx()
stdscr.erase()
sep_y = h - 2
stdscr.addstr(sep_y, 0, "" * w, curses.color_pair(CP_SEPARATOR))
visible = self.messages[-(sep_y + self.scroll):len(self.messages) - self.scroll if self.scroll else None]
for i, (line, cp) in enumerate(visible[-(sep_y):]):
try:
stdscr.addnstr(i, 0, line, w - 1, curses.color_pair(cp))
except curses.error:
pass
prompt = f" {self.name}> "
try:
stdscr.addstr(h - 1, 0, prompt, curses.color_pair(CP_MY_NAME))
stdscr.addnstr(h - 1, len(prompt), self.input_buf, w - len(prompt) - 1)
stdscr.move(h - 1, min(len(prompt) + len(self.input_buf), w - 1))
except curses.error:
pass
stdscr.refresh()
try:
ch = stdscr.get_wch()
except curses.error:
await asyncio.sleep(0.05)
continue
self.reset_title()
if isinstance(ch, str):
if ch == "\n":
if self.input_buf.strip():
cmd = self.input_buf.strip()
if cmd == "/quit":
break
elif cmd.startswith("/file "):
await self.send_file(cmd[6:].strip())
else:
await self.send_msg(self.input_buf)
self.input_buf = ""
self.scroll = 0
elif ch == "\x7f" or ch == "\b":
self.input_buf = self.input_buf[:-1]
elif ch == "\x1b":
pass
elif ch.isprintable() or ch == "\t":
self.input_buf += ch
elif isinstance(ch, int):
if ch == curses.KEY_BACKSPACE:
self.input_buf = self.input_buf[:-1]
elif ch == curses.KEY_PPAGE:
self.scroll = min(self.scroll + 5, max(0, len(self.messages) - (h - 2)))
elif ch == curses.KEY_NPAGE:
self.scroll = max(self.scroll - 5, 0)
self.running = False
recv_task.cancel()
self.writer.close()
def run_client(host: str, name: str, port: int):
client = ChatClient(host, name, port)
async def main(stdscr):
await client.run(stdscr)
curses.wrapper(lambda stdscr: asyncio.run(main(stdscr)))
# ── Main ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
if len(sys.argv) < 2:
print(__doc__.strip())
sys.exit(1)
if sys.argv[1] == "server":
port = int(sys.argv[2]) if len(sys.argv) > 2 else PORT
asyncio.run(run_server(port))
else:
host = sys.argv[1]
name = sys.argv[2] if len(sys.argv) > 2 else "anon"
port = int(sys.argv[3]) if len(sys.argv) > 3 else PORT
run_client(host, name, port)