Files
2026-06-17 18:43:22 +01:00

466 lines
18 KiB
Python

"""Legacy POP3/SMTP proxy to IMAP/SMTP backends.
This module implements a lightweight proxy that exposes legacy, unauthenticated
POP3 and SMTP interfaces and translates them to authenticated encrypted backend
connections. It is intended to run inside Docker on a supported Python LTS
release.
"""
import asyncio
import imaplib
import logging
import os
import re
import smtplib
import ssl
from aiosmtpd.controller import Controller
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
def env_bool(name, default=False):
"""Return a boolean environment variable value.
Accepts common truthy strings: 1, true, yes, on.
"""
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in ("1", "true", "yes", "on")
class Settings:
"""Configuration values read from environment variables."""
POP3_BIND_ADDR = os.getenv("POP3_BIND_ADDR", "0.0.0.0")
POP3_BIND_PORT = int(os.getenv("POP3_BIND_PORT", "110"))
SMTP_BIND_ADDR = os.getenv("SMTP_BIND_ADDR", "0.0.0.0")
SMTP_BIND_PORT = int(os.getenv("SMTP_BIND_PORT", "25"))
BACKEND_IMAP_HOST = os.getenv("BACKEND_IMAP_HOST")
BACKEND_IMAP_PORT = int(os.getenv("BACKEND_IMAP_PORT", "993"))
BACKEND_IMAP_USER = os.getenv("BACKEND_IMAP_USER")
BACKEND_IMAP_PASS = os.getenv("BACKEND_IMAP_PASS")
BACKEND_IMAP_USE_SSL = env_bool("BACKEND_IMAP_USE_SSL", True)
BACKEND_IMAP_USE_STARTTLS = env_bool("BACKEND_IMAP_USE_STARTTLS", False)
BACKEND_SMTP_HOST = os.getenv("BACKEND_SMTP_HOST")
BACKEND_SMTP_PORT = int(os.getenv("BACKEND_SMTP_PORT", "465"))
BACKEND_SMTP_USER = os.getenv("BACKEND_SMTP_USER")
BACKEND_SMTP_PASS = os.getenv("BACKEND_SMTP_PASS")
BACKEND_SMTP_USE_SSL = env_bool("BACKEND_SMTP_USE_SSL", True)
BACKEND_SMTP_USE_TLS = env_bool("BACKEND_SMTP_USE_TLS", False)
# When false (default) the proxy will not mutate backend mailboxes
# (no STORE +FLAGS / EXPUNGE). Set to true only when deletions should
# be propagated to the backend IMAP server.
BACKEND_MUTATE = env_bool("BACKEND_MUTATE", False)
@classmethod
def validate(cls):
"""Validate that required backend settings are configured."""
missing = []
if not cls.BACKEND_IMAP_HOST:
missing.append("BACKEND_IMAP_HOST")
if not cls.BACKEND_SMTP_HOST:
missing.append("BACKEND_SMTP_HOST")
if missing:
raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}")
def _split_lines(data):
"""Split a byte string into lines, normalising CR/CRLF/LF endings."""
text = data.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
lines = text.split(b"\n")
if lines and lines[-1] == b"":
lines = lines[:-1]
return lines
def format_pop3_body(body, max_body_lines=None):
"""Prepare a message for POP3 RETR/TOP transmission (RFC 1939).
Normalises line endings to CRLF, byte-stuffs any line beginning with ".",
and appends the terminating ".\\r\\n" on its own line. When ``max_body_lines``
is given, the full header block is kept and the body is truncated to that
many lines (TOP semantics).
"""
lines = _split_lines(body)
if max_body_lines is not None:
try:
separator = lines.index(b"")
except ValueError:
separator = len(lines)
header_lines = lines[: separator + 1]
body_lines = lines[separator + 1 :][:max_body_lines]
lines = header_lines + body_lines
stuffed = []
for line in lines:
if line.startswith(b"."):
line = b"." + line
stuffed.append(line)
stuffed.append(b".")
return b"\r\n".join(stuffed) + b"\r\n"
class IMAPBackend:
"""Minimal IMAP wrapper for backend mailbox access."""
UID_PATTERN = re.compile(rb"\(UID (\d+) RFC822\.SIZE (\d+)\)")
def __init__(self, username, password):
self.username = username
self.password = password
self.connection = None
def _connect(self):
if Settings.BACKEND_IMAP_USE_SSL:
client = imaplib.IMAP4_SSL(
Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30
)
else:
client = imaplib.IMAP4(
Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30
)
if Settings.BACKEND_IMAP_USE_STARTTLS:
client.starttls(ssl_context=ssl.create_default_context())
return client
def login(self):
"""Open an IMAP connection and select INBOX."""
client = self._connect()
client.login(self.username, self.password)
client.select("INBOX")
self.connection = client
return client
def logout(self):
"""Safely close the IMAP connection."""
if self.connection is not None:
try:
self.connection.logout()
except Exception:
pass
self.connection = None
def list_uids(self):
typ, data = self.connection.uid("search", None, "ALL")
if typ != "OK":
raise RuntimeError("IMAP UID search failed")
return data[0].split() if data and data[0] else []
def fetch_message_size(self, uid):
typ, data = self.connection.uid("fetch", uid, "(RFC822.SIZE)")
if typ != "OK" or not data:
raise RuntimeError("IMAP size fetch failed")
response = data[0]
if isinstance(response, tuple):
response = response[0]
match = re.search(rb"RFC822\.SIZE (\d+)", response)
return int(match.group(1)) if match else 0
def fetch_all_sizes(self):
"""Return a {uid: size} mapping for all messages in one IMAP round-trip."""
sizes = {}
typ, data = self.connection.uid("fetch", "1:*", "(RFC822.SIZE)")
if typ != "OK" or not data:
return sizes
for response in data:
if isinstance(response, tuple):
response = response[0]
if not isinstance(response, bytes):
continue
match = self.UID_PATTERN.search(response)
if match:
sizes[match.group(1)] = int(match.group(2))
return sizes
def fetch_message(self, uid):
typ, data = self.connection.uid("fetch", uid, "(RFC822)")
if typ != "OK" or not data:
raise RuntimeError("IMAP message fetch failed")
for chunk in data:
if isinstance(chunk, tuple) and len(chunk) > 1:
return chunk[1]
raise RuntimeError("IMAP message fetch returned no body")
def mark_deleted(self, uid):
self.connection.uid("STORE", uid, "+FLAGS.SILENT", "(\\Deleted)")
def expunge(self):
self.connection.expunge()
class POP3Session:
"""Session state and command handling for a single POP3 client."""
def __init__(self, reader, writer):
self.reader = reader
self.writer = writer
self._authenticated = False
self._imap = None
self.username = None
self.password = None
self.message_ids = []
self.deleted = set()
async def run(self):
"""Process POP3 commands until the client disconnects."""
await self.send_line("+OK POP3 proxy ready")
while True:
line = await self.reader.readline()
if not line:
break
command = line.decode("utf-8", errors="ignore").strip()
if not command:
continue
logger.info("POP3 command: %s", command)
try:
response = await self.handle_command(command)
except Exception as exc:
logger.exception("POP3 handling failed")
await self.send_line(f"-ERR {exc}")
break
if response is False:
break
await self.close()
async def handle_command(self, command_line):
parts = command_line.split()
command = parts[0].upper()
args = parts[1:]
if command == "USER":
return await self.handle_user(args)
if command == "PASS":
return await self.handle_pass(args)
if command == "STAT":
return await self.handle_stat()
if command == "LIST":
return await self.handle_list(args)
if command == "RETR":
return await self.handle_retr(args)
if command == "TOP":
return await self.handle_top(args)
if command == "DELE":
return await self.handle_dele(args)
if command == "NOOP":
return await self.send_line("+OK")
if command == "RSET":
self.deleted.clear()
return await self.send_line("+OK")
if command == "QUIT":
await self.handle_quit()
return False
if command == "UIDL":
return await self.handle_uidl(args)
return await self.send_line("-ERR Unsupported command")
async def handle_user(self, args):
# Accept any username. Client credentials are intentionally ignored;
# some legacy clients insist on supplying them, so they are accepted
# blindly. The backend is always reached with the proxy's own creds.
self.username = args[0] if args else None
return await self.send_line("+OK")
async def handle_pass(self, args):
# Accept any password. See handle_user: client credentials are
# accepted but never used or validated.
self.password = args[0] if args else None
await asyncio.to_thread(self.authenticate)
return await self.send_line("+OK User authenticated")
def authenticate(self):
"""Authenticate to the IMAP backend using the configured proxy credentials.
Client-supplied POP3 credentials are deliberately ignored: the proxy
always connects to the backend with ``BACKEND_IMAP_USER`` /
``BACKEND_IMAP_PASS``. This is by design for legacy clients that require
credentials to be entered even though the proxy does not use them.
"""
if not (Settings.BACKEND_IMAP_USER and Settings.BACKEND_IMAP_PASS):
raise RuntimeError("Backend IMAP credentials are not configured")
backend = IMAPBackend(Settings.BACKEND_IMAP_USER, Settings.BACKEND_IMAP_PASS)
backend.login()
self._imap = backend
# Snapshot the maildrop once; it stays static for the session lifetime
# (RFC 1939), so DELE marks are not wiped by later STAT/LIST/UIDL.
self.message_ids = backend.list_uids()
async def handle_stat(self):
self._require_auth()
sizes = await asyncio.to_thread(self._imap.fetch_all_sizes)
count = len(self.message_ids) - len(self.deleted)
total_size = sum(sizes.get(uid, 0) for uid in self.message_ids if uid not in self.deleted)
return await self.send_line(f"+OK {count} {total_size}")
async def handle_list(self, args):
self._require_auth()
sizes = await asyncio.to_thread(self._imap.fetch_all_sizes)
if not args:
lines = [f"+OK {len(self.message_ids)} messages"]
for index, uid in enumerate(self.message_ids, start=1):
if uid in self.deleted:
continue
lines.append(f"{index} {sizes.get(uid, 0)}")
lines.append(".")
return await self.send_lines(lines)
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR LIST takes optional message number")
message_number = int(args[0])
if message_number < 1 or message_number > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[message_number - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
return await self.send_line(f"+OK {message_number} {sizes.get(uid, 0)}")
async def handle_retr(self, args):
self._require_auth()
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR RETR requires message number")
index = int(args[0])
if index < 1 or index > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[index - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
message = await asyncio.to_thread(self._imap.fetch_message, uid)
payload = format_pop3_body(message)
await self.send_line(f"+OK {len(message)} octets")
await self.send_raw(payload)
async def handle_top(self, args):
self._require_auth()
if len(args) != 2 or not args[0].isdigit() or not args[1].isdigit():
return await self.send_line("-ERR TOP requires message number and line count")
index = int(args[0])
line_count = int(args[1])
if index < 1 or index > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[index - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
message = await asyncio.to_thread(self._imap.fetch_message, uid)
payload = format_pop3_body(message, max_body_lines=line_count)
await self.send_line("+OK top of message follows")
await self.send_raw(payload)
async def handle_dele(self, args):
self._require_auth()
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR DELE requires message number")
index = int(args[0])
if index < 1 or index > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[index - 1]
self.deleted.add(uid)
return await self.send_line("+OK message marked for deletion")
async def handle_uidl(self, args):
self._require_auth()
if not args:
lines = ["+OK UID list follows"]
for index, uid in enumerate(self.message_ids, start=1):
if uid in self.deleted:
continue
lines.append(f"{index} {uid.decode('ascii')}")
lines.append(".")
return await self.send_lines(lines)
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR UIDL takes a single message number")
message_number = int(args[0])
if message_number < 1 or message_number > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[message_number - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
return await self.send_line(f"+OK {message_number} {uid.decode('ascii')}")
async def handle_quit(self):
if self._imap:
# Only propagate deletes to the backend when explicitly enabled.
if Settings.BACKEND_MUTATE:
for uid in self.deleted:
await asyncio.to_thread(self._imap.mark_deleted, uid)
await asyncio.to_thread(self._imap.expunge)
# Always logout the backend connection.
self._imap.logout()
await self.send_line("+OK Goodbye")
async def send_line(self, line):
self.writer.write((line + "\r\n").encode("utf-8"))
await self.writer.drain()
async def send_lines(self, lines):
for line in lines:
self.writer.write((line + "\r\n").encode("utf-8"))
await self.writer.drain()
async def send_raw(self, data):
self.writer.write(data)
await self.writer.drain()
def _require_auth(self):
if self._imap is None:
raise RuntimeError("Not authenticated")
async def close(self):
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
class SMTPProxyHandler:
"""SMTP handler that relays inbound messages to the backend SMTP server."""
async def handle_DATA(self, server, session, envelope):
logger.info("SMTP message received from %s to %s", envelope.mail_from, envelope.rcpt_tos)
await asyncio.to_thread(self.send_message, envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 Message accepted for delivery"
def send_message(self, sender, recipients, data):
"""Forward a complete SMTP message to the backend SMTP server."""
if Settings.BACKEND_SMTP_USE_SSL:
smtp = smtplib.SMTP_SSL(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
else:
smtp = smtplib.SMTP(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
if Settings.BACKEND_SMTP_USE_TLS:
smtp.starttls(context=ssl.create_default_context())
try:
if Settings.BACKEND_SMTP_USER and Settings.BACKEND_SMTP_PASS:
smtp.login(Settings.BACKEND_SMTP_USER, Settings.BACKEND_SMTP_PASS)
smtp.sendmail(sender, recipients, data)
finally:
smtp.quit()
async def handle_pop3_client(reader, writer):
"""Create and run a POP3 session for a new client connection."""
session = POP3Session(reader, writer)
await session.run()
async def main():
"""Initialize the proxy and start the POP3 and SMTP servers."""
Settings.validate()
logger.info("Starting POP3 server on %s:%s", Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT)
pop3_server = await asyncio.start_server(handle_pop3_client, Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT)
logger.info("Starting SMTP server on %s:%s", Settings.SMTP_BIND_ADDR, Settings.SMTP_BIND_PORT)
smtp_controller = Controller(SMTPProxyHandler(), hostname=Settings.SMTP_BIND_ADDR, port=Settings.SMTP_BIND_PORT)
smtp_controller.start()
async with pop3_server:
await pop3_server.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Shutdown requested")