"""Legacy POP3/SMTP proxy to IMAP/SMTP backends. This module implements a lightweight proxy that exposes legacy, unauthenticated POP3 and SMTP interfaces and translates them to authenticated encrypted backend connections. It is intended to run inside Docker on a supported Python LTS release. """ import asyncio import imaplib import logging import os import re import smtplib import ssl from aiosmtpd.controller import Controller logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s") logger = logging.getLogger(__name__) def env_bool(name, default=False): """Return a boolean environment variable value. Accepts common truthy strings: 1, true, yes, on. """ raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in ("1", "true", "yes", "on") class Settings: """Configuration values read from environment variables.""" POP3_BIND_ADDR = os.getenv("POP3_BIND_ADDR", "0.0.0.0") POP3_BIND_PORT = int(os.getenv("POP3_BIND_PORT", "110")) SMTP_BIND_ADDR = os.getenv("SMTP_BIND_ADDR", "0.0.0.0") SMTP_BIND_PORT = int(os.getenv("SMTP_BIND_PORT", "25")) BACKEND_IMAP_HOST = os.getenv("BACKEND_IMAP_HOST") BACKEND_IMAP_PORT = int(os.getenv("BACKEND_IMAP_PORT", "993")) BACKEND_IMAP_USER = os.getenv("BACKEND_IMAP_USER") BACKEND_IMAP_PASS = os.getenv("BACKEND_IMAP_PASS") BACKEND_IMAP_USE_SSL = env_bool("BACKEND_IMAP_USE_SSL", True) BACKEND_IMAP_USE_STARTTLS = env_bool("BACKEND_IMAP_USE_STARTTLS", False) BACKEND_SMTP_HOST = os.getenv("BACKEND_SMTP_HOST") BACKEND_SMTP_PORT = int(os.getenv("BACKEND_SMTP_PORT", "465")) BACKEND_SMTP_USER = os.getenv("BACKEND_SMTP_USER") BACKEND_SMTP_PASS = os.getenv("BACKEND_SMTP_PASS") BACKEND_SMTP_USE_SSL = env_bool("BACKEND_SMTP_USE_SSL", True) BACKEND_SMTP_USE_TLS = env_bool("BACKEND_SMTP_USE_TLS", False) # When false (default) the proxy will not mutate backend mailboxes # (no STORE +FLAGS / EXPUNGE). Set to true only when deletions should # be propagated to the backend IMAP server. BACKEND_MUTATE = env_bool("BACKEND_MUTATE", False) @classmethod def validate(cls): """Validate that required backend settings are configured.""" missing = [] if not cls.BACKEND_IMAP_HOST: missing.append("BACKEND_IMAP_HOST") if not cls.BACKEND_SMTP_HOST: missing.append("BACKEND_SMTP_HOST") if missing: raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}") def _split_lines(data): """Split a byte string into lines, normalising CR/CRLF/LF endings.""" text = data.replace(b"\r\n", b"\n").replace(b"\r", b"\n") lines = text.split(b"\n") if lines and lines[-1] == b"": lines = lines[:-1] return lines def format_pop3_body(body, max_body_lines=None): """Prepare a message for POP3 RETR/TOP transmission (RFC 1939). Normalises line endings to CRLF, byte-stuffs any line beginning with ".", and appends the terminating ".\\r\\n" on its own line. When ``max_body_lines`` is given, the full header block is kept and the body is truncated to that many lines (TOP semantics). """ lines = _split_lines(body) if max_body_lines is not None: try: separator = lines.index(b"") except ValueError: separator = len(lines) header_lines = lines[: separator + 1] body_lines = lines[separator + 1 :][:max_body_lines] lines = header_lines + body_lines stuffed = [] for line in lines: if line.startswith(b"."): line = b"." + line stuffed.append(line) stuffed.append(b".") return b"\r\n".join(stuffed) + b"\r\n" class IMAPBackend: """Minimal IMAP wrapper for backend mailbox access.""" UID_PATTERN = re.compile(rb"\(UID (\d+) RFC822\.SIZE (\d+)\)") def __init__(self, username, password): self.username = username self.password = password self.connection = None def _connect(self): if Settings.BACKEND_IMAP_USE_SSL: client = imaplib.IMAP4_SSL( Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30 ) else: client = imaplib.IMAP4( Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30 ) if Settings.BACKEND_IMAP_USE_STARTTLS: client.starttls(ssl_context=ssl.create_default_context()) return client def login(self): """Open an IMAP connection and select INBOX.""" client = self._connect() client.login(self.username, self.password) client.select("INBOX") self.connection = client return client def logout(self): """Safely close the IMAP connection.""" if self.connection is not None: try: self.connection.logout() except Exception: pass self.connection = None def list_uids(self): typ, data = self.connection.uid("search", None, "ALL") if typ != "OK": raise RuntimeError("IMAP UID search failed") return data[0].split() if data and data[0] else [] def fetch_message_size(self, uid): typ, data = self.connection.uid("fetch", uid, "(RFC822.SIZE)") if typ != "OK" or not data: raise RuntimeError("IMAP size fetch failed") response = data[0] if isinstance(response, tuple): response = response[0] match = re.search(rb"RFC822\.SIZE (\d+)", response) return int(match.group(1)) if match else 0 def fetch_all_sizes(self): """Return a {uid: size} mapping for all messages in one IMAP round-trip.""" sizes = {} typ, data = self.connection.uid("fetch", "1:*", "(RFC822.SIZE)") if typ != "OK" or not data: return sizes for response in data: if isinstance(response, tuple): response = response[0] if not isinstance(response, bytes): continue match = self.UID_PATTERN.search(response) if match: sizes[match.group(1)] = int(match.group(2)) return sizes def fetch_message(self, uid): typ, data = self.connection.uid("fetch", uid, "(RFC822)") if typ != "OK" or not data: raise RuntimeError("IMAP message fetch failed") for chunk in data: if isinstance(chunk, tuple) and len(chunk) > 1: return chunk[1] raise RuntimeError("IMAP message fetch returned no body") def mark_deleted(self, uid): self.connection.uid("STORE", uid, "+FLAGS.SILENT", "(\\Deleted)") def expunge(self): self.connection.expunge() class POP3Session: """Session state and command handling for a single POP3 client.""" def __init__(self, reader, writer): self.reader = reader self.writer = writer self._authenticated = False self._imap = None self.username = None self.password = None self.message_ids = [] self.deleted = set() async def run(self): """Process POP3 commands until the client disconnects.""" await self.send_line("+OK POP3 proxy ready") while True: line = await self.reader.readline() if not line: break command = line.decode("utf-8", errors="ignore").strip() if not command: continue logger.info("POP3 command: %s", command) try: response = await self.handle_command(command) except Exception as exc: logger.exception("POP3 handling failed") await self.send_line(f"-ERR {exc}") break if response is False: break await self.close() async def handle_command(self, command_line): parts = command_line.split() command = parts[0].upper() args = parts[1:] if command == "USER": return await self.handle_user(args) if command == "PASS": return await self.handle_pass(args) if command == "STAT": return await self.handle_stat() if command == "LIST": return await self.handle_list(args) if command == "RETR": return await self.handle_retr(args) if command == "TOP": return await self.handle_top(args) if command == "DELE": return await self.handle_dele(args) if command == "NOOP": return await self.send_line("+OK") if command == "RSET": self.deleted.clear() return await self.send_line("+OK") if command == "QUIT": await self.handle_quit() return False if command == "UIDL": return await self.handle_uidl(args) return await self.send_line("-ERR Unsupported command") async def handle_user(self, args): # Accept any username. Client credentials are intentionally ignored; # some legacy clients insist on supplying them, so they are accepted # blindly. The backend is always reached with the proxy's own creds. self.username = args[0] if args else None return await self.send_line("+OK") async def handle_pass(self, args): # Accept any password. See handle_user: client credentials are # accepted but never used or validated. self.password = args[0] if args else None await asyncio.to_thread(self.authenticate) return await self.send_line("+OK User authenticated") def authenticate(self): """Authenticate to the IMAP backend using the configured proxy credentials. Client-supplied POP3 credentials are deliberately ignored: the proxy always connects to the backend with ``BACKEND_IMAP_USER`` / ``BACKEND_IMAP_PASS``. This is by design for legacy clients that require credentials to be entered even though the proxy does not use them. """ if not (Settings.BACKEND_IMAP_USER and Settings.BACKEND_IMAP_PASS): raise RuntimeError("Backend IMAP credentials are not configured") backend = IMAPBackend(Settings.BACKEND_IMAP_USER, Settings.BACKEND_IMAP_PASS) backend.login() self._imap = backend # Snapshot the maildrop once; it stays static for the session lifetime # (RFC 1939), so DELE marks are not wiped by later STAT/LIST/UIDL. self.message_ids = backend.list_uids() async def handle_stat(self): self._require_auth() sizes = await asyncio.to_thread(self._imap.fetch_all_sizes) count = len(self.message_ids) - len(self.deleted) total_size = sum(sizes.get(uid, 0) for uid in self.message_ids if uid not in self.deleted) return await self.send_line(f"+OK {count} {total_size}") async def handle_list(self, args): self._require_auth() sizes = await asyncio.to_thread(self._imap.fetch_all_sizes) if not args: lines = [f"+OK {len(self.message_ids)} messages"] for index, uid in enumerate(self.message_ids, start=1): if uid in self.deleted: continue lines.append(f"{index} {sizes.get(uid, 0)}") lines.append(".") return await self.send_lines(lines) if len(args) != 1 or not args[0].isdigit(): return await self.send_line("-ERR LIST takes optional message number") message_number = int(args[0]) if message_number < 1 or message_number > len(self.message_ids): return await self.send_line("-ERR no such message") uid = self.message_ids[message_number - 1] if uid in self.deleted: return await self.send_line("-ERR message deleted") return await self.send_line(f"+OK {message_number} {sizes.get(uid, 0)}") async def handle_retr(self, args): self._require_auth() if len(args) != 1 or not args[0].isdigit(): return await self.send_line("-ERR RETR requires message number") index = int(args[0]) if index < 1 or index > len(self.message_ids): return await self.send_line("-ERR no such message") uid = self.message_ids[index - 1] if uid in self.deleted: return await self.send_line("-ERR message deleted") message = await asyncio.to_thread(self._imap.fetch_message, uid) payload = format_pop3_body(message) await self.send_line(f"+OK {len(message)} octets") await self.send_raw(payload) async def handle_top(self, args): self._require_auth() if len(args) != 2 or not args[0].isdigit() or not args[1].isdigit(): return await self.send_line("-ERR TOP requires message number and line count") index = int(args[0]) line_count = int(args[1]) if index < 1 or index > len(self.message_ids): return await self.send_line("-ERR no such message") uid = self.message_ids[index - 1] if uid in self.deleted: return await self.send_line("-ERR message deleted") message = await asyncio.to_thread(self._imap.fetch_message, uid) payload = format_pop3_body(message, max_body_lines=line_count) await self.send_line("+OK top of message follows") await self.send_raw(payload) async def handle_dele(self, args): self._require_auth() if len(args) != 1 or not args[0].isdigit(): return await self.send_line("-ERR DELE requires message number") index = int(args[0]) if index < 1 or index > len(self.message_ids): return await self.send_line("-ERR no such message") uid = self.message_ids[index - 1] self.deleted.add(uid) return await self.send_line("+OK message marked for deletion") async def handle_uidl(self, args): self._require_auth() if not args: lines = ["+OK UID list follows"] for index, uid in enumerate(self.message_ids, start=1): if uid in self.deleted: continue lines.append(f"{index} {uid.decode('ascii')}") lines.append(".") return await self.send_lines(lines) if len(args) != 1 or not args[0].isdigit(): return await self.send_line("-ERR UIDL takes a single message number") message_number = int(args[0]) if message_number < 1 or message_number > len(self.message_ids): return await self.send_line("-ERR no such message") uid = self.message_ids[message_number - 1] if uid in self.deleted: return await self.send_line("-ERR message deleted") return await self.send_line(f"+OK {message_number} {uid.decode('ascii')}") async def handle_quit(self): if self._imap: # Only propagate deletes to the backend when explicitly enabled. if Settings.BACKEND_MUTATE: for uid in self.deleted: await asyncio.to_thread(self._imap.mark_deleted, uid) await asyncio.to_thread(self._imap.expunge) # Always logout the backend connection. self._imap.logout() await self.send_line("+OK Goodbye") async def send_line(self, line): self.writer.write((line + "\r\n").encode("utf-8")) await self.writer.drain() async def send_lines(self, lines): for line in lines: self.writer.write((line + "\r\n").encode("utf-8")) await self.writer.drain() async def send_raw(self, data): self.writer.write(data) await self.writer.drain() def _require_auth(self): if self._imap is None: raise RuntimeError("Not authenticated") async def close(self): try: self.writer.close() await self.writer.wait_closed() except Exception: pass class SMTPProxyHandler: """SMTP handler that relays inbound messages to the backend SMTP server.""" async def handle_DATA(self, server, session, envelope): logger.info("SMTP message received from %s to %s", envelope.mail_from, envelope.rcpt_tos) await asyncio.to_thread(self.send_message, envelope.mail_from, envelope.rcpt_tos, envelope.content) return "250 Message accepted for delivery" def send_message(self, sender, recipients, data): """Forward a complete SMTP message to the backend SMTP server.""" if Settings.BACKEND_SMTP_USE_SSL: smtp = smtplib.SMTP_SSL(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30) else: smtp = smtplib.SMTP(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30) if Settings.BACKEND_SMTP_USE_TLS: smtp.starttls(context=ssl.create_default_context()) try: if Settings.BACKEND_SMTP_USER and Settings.BACKEND_SMTP_PASS: smtp.login(Settings.BACKEND_SMTP_USER, Settings.BACKEND_SMTP_PASS) smtp.sendmail(sender, recipients, data) finally: smtp.quit() async def handle_pop3_client(reader, writer): """Create and run a POP3 session for a new client connection.""" session = POP3Session(reader, writer) await session.run() async def main(): """Initialize the proxy and start the POP3 and SMTP servers.""" Settings.validate() logger.info("Starting POP3 server on %s:%s", Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT) pop3_server = await asyncio.start_server(handle_pop3_client, Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT) logger.info("Starting SMTP server on %s:%s", Settings.SMTP_BIND_ADDR, Settings.SMTP_BIND_PORT) smtp_controller = Controller(SMTPProxyHandler(), hostname=Settings.SMTP_BIND_ADDR, port=Settings.SMTP_BIND_PORT) smtp_controller.start() async with pop3_server: await pop3_server.serve_forever() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Shutdown requested")