chore: add documented POP3/SMTP proxy scaffold and Renovate config

This commit is contained in:
2026-06-17 16:08:06 +01:00
parent a41fbf04da
commit e6eca290ba
6 changed files with 478 additions and 0 deletions
+388
View File
@@ -0,0 +1,388 @@
"""Legacy POP3/SMTP proxy to IMAP/SMTP backends.
This module implements a lightweight proxy that exposes legacy, unauthenticated
POP3 and SMTP interfaces and translates them to authenticated encrypted backend
connections. It is intended to run inside Docker on a supported Python LTS
release.
"""
import asyncio
import imaplib
import logging
import os
import re
import smtplib
import ssl
from aiosmtpd.controller import Controller
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
def env_bool(name, default=False):
"""Return a boolean environment variable value.
Accepts common truthy strings: 1, true, yes, on.
"""
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in ("1", "true", "yes", "on")
class Settings:
"""Configuration values read from environment variables."""
POP3_BIND_ADDR = os.getenv("POP3_BIND_ADDR", "0.0.0.0")
POP3_BIND_PORT = int(os.getenv("POP3_BIND_PORT", "110"))
SMTP_BIND_ADDR = os.getenv("SMTP_BIND_ADDR", "0.0.0.0")
SMTP_BIND_PORT = int(os.getenv("SMTP_BIND_PORT", "25"))
BACKEND_IMAP_HOST = os.getenv("BACKEND_IMAP_HOST")
BACKEND_IMAP_PORT = int(os.getenv("BACKEND_IMAP_PORT", "993"))
BACKEND_IMAP_USER = os.getenv("BACKEND_IMAP_USER")
BACKEND_IMAP_PASS = os.getenv("BACKEND_IMAP_PASS")
BACKEND_IMAP_USE_SSL = env_bool("BACKEND_IMAP_USE_SSL", True)
BACKEND_IMAP_USE_STARTTLS = env_bool("BACKEND_IMAP_USE_STARTTLS", False)
BACKEND_SMTP_HOST = os.getenv("BACKEND_SMTP_HOST")
BACKEND_SMTP_PORT = int(os.getenv("BACKEND_SMTP_PORT", "465"))
BACKEND_SMTP_USER = os.getenv("BACKEND_SMTP_USER")
BACKEND_SMTP_PASS = os.getenv("BACKEND_SMTP_PASS")
BACKEND_SMTP_USE_SSL = env_bool("BACKEND_SMTP_USE_SSL", True)
BACKEND_SMTP_USE_TLS = env_bool("BACKEND_SMTP_USE_TLS", False)
@classmethod
def validate(cls):
"""Validate that required backend settings are configured."""
missing = []
if not cls.BACKEND_IMAP_HOST:
missing.append("BACKEND_IMAP_HOST")
if not cls.BACKEND_SMTP_HOST:
missing.append("BACKEND_SMTP_HOST")
if missing:
raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}")
class IMAPBackend:
"""Minimal IMAP wrapper for backend mailbox access."""
UID_PATTERN = re.compile(rb"\(UID (\d+) RFC822\.SIZE (\d+)\)")
def __init__(self, username, password):
self.username = username
self.password = password
self.connection = None
def _connect(self):
if Settings.BACKEND_IMAP_USE_SSL:
client = imaplib.IMAP4_SSL(Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT)
else:
client = imaplib.IMAP4(Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT)
if Settings.BACKEND_IMAP_USE_STARTTLS:
client.starttls(ssl_context=ssl.create_default_context())
return client
def login(self):
"""Open an IMAP connection and select INBOX."""
client = self._connect()
client.login(self.username, self.password)
client.select("INBOX")
self.connection = client
return client
def logout(self):
"""Safely close the IMAP connection."""
if self.connection is not None:
try:
self.connection.logout()
except Exception:
pass
self.connection = None
def list_uids(self):
typ, data = self.connection.uid("search", None, "ALL")
if typ != "OK":
raise RuntimeError("IMAP UID search failed")
return data[0].split() if data and data[0] else []
def fetch_message_size(self, uid):
typ, data = self.connection.uid("fetch", uid, "(RFC822.SIZE)")
if typ != "OK" or not data:
raise RuntimeError("IMAP size fetch failed")
response = data[0]
if isinstance(response, tuple):
response = response[0]
match = re.search(rb"RFC822\.SIZE (\d+)", response)
return int(match.group(1)) if match else 0
def fetch_message(self, uid):
typ, data = self.connection.uid("fetch", uid, "(RFC822)")
if typ != "OK" or not data:
raise RuntimeError("IMAP message fetch failed")
parts = [chunk for chunk in data if isinstance(chunk, bytes)]
return b"\r\n".join(parts) + b"\r\n"
def mark_deleted(self, uid):
self.connection.uid("STORE", uid, "+FLAGS.SILENT", "(\\Deleted)")
def expunge(self):
self.connection.expunge()
class POP3Session:
"""Session state and command handling for a single POP3 client."""
def __init__(self, reader, writer):
self.reader = reader
self.writer = writer
self._authenticated = False
self._imap = None
self.username = None
self.password = None
self.message_ids = []
self.deleted = set()
async def run(self):
"""Process POP3 commands until the client disconnects."""
await self.send_line("+OK POP3 proxy ready")
while True:
line = await self.reader.readline()
if not line:
break
command = line.decode("utf-8", errors="ignore").strip()
if not command:
continue
logger.info("POP3 command: %s", command)
try:
response = await self.handle_command(command)
except Exception as exc:
logger.exception("POP3 handling failed")
await self.send_line(f"-ERR {exc}")
break
if response is False:
break
await self.close()
async def handle_command(self, command_line):
parts = command_line.split()
command = parts[0].upper()
args = parts[1:]
if command == "USER":
return await self.handle_user(args)
if command == "PASS":
return await self.handle_pass(args)
if command == "STAT":
return await self.handle_stat()
if command == "LIST":
return await self.handle_list(args)
if command == "RETR":
return await self.handle_retr(args)
if command == "DELE":
return await self.handle_dele(args)
if command == "NOOP":
return await self.send_line("+OK")
if command == "RSET":
self.deleted.clear()
return await self.send_line("+OK")
if command == "QUIT":
await self.handle_quit()
return False
if command == "UIDL":
return await self.handle_uidl(args)
return await self.send_line("-ERR Unsupported command")
async def handle_user(self, args):
if len(args) != 1:
return await self.send_line("-ERR USER requires username")
self.username = args[0]
return await self.send_line("+OK")
async def handle_pass(self, args):
if len(args) != 1:
return await self.send_line("-ERR PASS requires password")
self.password = args[0]
await asyncio.to_thread(self.authenticate)
return await self.send_line("+OK User authenticated")
def authenticate(self):
"""Authenticate to the IMAP backend using configured credentials."""
if Settings.BACKEND_IMAP_USER and Settings.BACKEND_IMAP_PASS:
username = Settings.BACKEND_IMAP_USER
password = Settings.BACKEND_IMAP_PASS
elif self.username and self.password:
username = self.username
password = self.password
else:
raise RuntimeError("No IMAP credentials available")
backend = IMAPBackend(username, password)
backend.login()
self._imap = backend
self._refresh_mailbox()
def _refresh_mailbox(self):
self.message_ids = self._imap.list_uids()
self.deleted.clear()
async def handle_stat(self):
self._require_auth()
self._refresh_mailbox()
count = len(self.message_ids) - len(self.deleted)
total_size = sum(self._imap.fetch_message_size(uid) for uid in self.message_ids if uid not in self.deleted)
return await self.send_line(f"+OK {count} {total_size}")
async def handle_list(self, args):
self._require_auth()
self._refresh_mailbox()
if not args:
lines = [f"+OK {len(self.message_ids)} messages"]
for index, uid in enumerate(self.message_ids, start=1):
if uid in self.deleted:
continue
size = self._imap.fetch_message_size(uid)
lines.append(f"{index} {size}")
lines.append(".")
return await self.send_lines(lines)
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR LIST takes optional message number")
message_number = int(args[0])
if message_number < 1 or message_number > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[message_number - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
size = self._imap.fetch_message_size(uid)
return await self.send_line(f"+OK {message_number} {size}")
async def handle_retr(self, args):
self._require_auth()
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR RETR requires message number")
index = int(args[0])
if index < 1 or index > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[index - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
message = await asyncio.to_thread(self._imap.fetch_message, uid)
await self.send_line(f"+OK {len(message)} octets")
await self.send_raw(message)
await self.send_line(".")
async def handle_dele(self, args):
self._require_auth()
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR DELE requires message number")
index = int(args[0])
if index < 1 or index > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[index - 1]
self.deleted.add(uid)
return await self.send_line("+OK message marked for deletion")
async def handle_uidl(self, args):
self._require_auth()
self._refresh_mailbox()
if not args:
lines = ["+OK UID list follows"]
for index, uid in enumerate(self.message_ids, start=1):
if uid in self.deleted:
continue
lines.append(f"{index} {uid.decode('ascii')}")
lines.append(".")
return await self.send_lines(lines)
if len(args) != 1 or not args[0].isdigit():
return await self.send_line("-ERR UIDL takes a single message number")
message_number = int(args[0])
if message_number < 1 or message_number > len(self.message_ids):
return await self.send_line("-ERR no such message")
uid = self.message_ids[message_number - 1]
if uid in self.deleted:
return await self.send_line("-ERR message deleted")
return await self.send_line(f"+OK {message_number} {uid.decode('ascii')}")
async def handle_quit(self):
if self._imap:
for uid in self.deleted:
await asyncio.to_thread(self._imap.mark_deleted, uid)
await asyncio.to_thread(self._imap.expunge)
self._imap.logout()
await self.send_line("+OK Goodbye")
async def send_line(self, line):
self.writer.write((line + "\r\n").encode("utf-8"))
await self.writer.drain()
async def send_lines(self, lines):
for line in lines:
self.writer.write((line + "\r\n").encode("utf-8"))
await self.writer.drain()
async def send_raw(self, data):
self.writer.write(data)
await self.writer.drain()
def _require_auth(self):
if self._imap is None:
raise RuntimeError("Not authenticated")
async def close(self):
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
class SMTPProxyHandler:
"""SMTP handler that relays inbound messages to the backend SMTP server."""
async def handle_DATA(self, server, session, envelope):
logger.info("SMTP message received from %s to %s", envelope.mail_from, envelope.rcpt_tos)
await asyncio.to_thread(self.send_message, envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 Message accepted for delivery"
def send_message(self, sender, recipients, data):
"""Forward a complete SMTP message to the backend SMTP server."""
message = data.decode("utf-8", errors="replace")
if Settings.BACKEND_SMTP_USE_SSL:
smtp = smtplib.SMTP_SSL(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
else:
smtp = smtplib.SMTP(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
if Settings.BACKEND_SMTP_USE_TLS:
smtp.starttls(context=ssl.create_default_context())
try:
if Settings.BACKEND_SMTP_USER and Settings.BACKEND_SMTP_PASS:
smtp.login(Settings.BACKEND_SMTP_USER, Settings.BACKEND_SMTP_PASS)
smtp.sendmail(sender, recipients, message)
finally:
smtp.quit()
async def handle_pop3_client(reader, writer):
"""Create and run a POP3 session for a new client connection."""
session = POP3Session(reader, writer)
await session.run()
async def main():
"""Initialize the proxy and start the POP3 and SMTP servers."""
Settings.validate()
logger.info("Starting POP3 server on %s:%s", Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT)
pop3_server = await asyncio.start_server(handle_pop3_client, Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT)
logger.info("Starting SMTP server on %s:%s", Settings.SMTP_BIND_ADDR, Settings.SMTP_BIND_PORT)
smtp_controller = Controller(SMTPProxyHandler(), hostname=Settings.SMTP_BIND_ADDR, port=Settings.SMTP_BIND_PORT)
smtp_controller.start()
async with pop3_server:
await pop3_server.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Shutdown requested")