Compare commits
11 Commits
a41fbf04da
..
v0.3.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 4bde4f884d | |||
| 9a4bab33e2 | |||
| e51740b8db | |||
| 4ab12f8ce6 | |||
| 7930235efd | |||
| bde999185a | |||
| e05f08995e | |||
| 91d70cf10c | |||
| aa746b780d | |||
| 739280f930 | |||
| e6eca290ba |
@@ -0,0 +1,6 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
.env
|
||||
*.swp
|
||||
@@ -0,0 +1,167 @@
|
||||
name: Build and publish container
|
||||
|
||||
on:
|
||||
# On merge to main, only build/release when image-affecting files change;
|
||||
# CI-config, Renovate-config and docs changes do not produce a new image.
|
||||
push:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'Dockerfile'
|
||||
- '.dockerignore'
|
||||
- 'proxy_server.py'
|
||||
- 'requirements.txt'
|
||||
# Pull requests always run (the build is a required check); no path filter.
|
||||
pull_request:
|
||||
branches: [main]
|
||||
workflow_dispatch:
|
||||
|
||||
# A newer run cancels an older in-flight run in the same group (keyed by ref),
|
||||
# so a fresh merge to main supersedes the previous build and only the latest
|
||||
# release is produced, avoiding tags that would be immediately replaced. Each
|
||||
# pull request likewise supersedes only its own earlier runs.
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
packages: write
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
|
||||
with:
|
||||
# Full history and tags are required to derive the next version
|
||||
# from the conventional-commit messages since the last release.
|
||||
fetch-depth: 0
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.12
|
||||
|
||||
- name: Cache pip dependencies
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pip
|
||||
key: "${{ runner.os }}-pip-${{ hashFiles('requirements-dev.txt') }}"
|
||||
restore-keys: |
|
||||
${{ runner.os }}-pip-
|
||||
|
||||
- name: Install test dependencies
|
||||
run: python -m pip install --upgrade pip && pip install -r requirements-dev.txt
|
||||
|
||||
- name: Run unit tests
|
||||
run: python -m pytest -q
|
||||
|
||||
- name: Determine registry host
|
||||
run: echo "REGISTRY=${GITHUB_SERVER_URL#*://}" >> "$GITHUB_ENV"
|
||||
|
||||
# Derive the release version from conventional commits since the last
|
||||
# v* tag: feat -> minor, fix/perf -> patch, ! or BREAKING CHANGE -> major.
|
||||
# Anything else (chore, ci, docs, build) produces no release; those builds
|
||||
# are published under a sha-<short> tag only.
|
||||
- name: Compute version and image tags
|
||||
id: version
|
||||
run: |
|
||||
set -euo pipefail
|
||||
image="${REGISTRY}/${GITHUB_REPOSITORY,,}"
|
||||
|
||||
last_tag="$(git tag --list 'v*' --sort=-v:refname | head -n1 || true)"
|
||||
if [ -n "$last_tag" ]; then
|
||||
range="${last_tag}..HEAD"
|
||||
base="${last_tag#v}"
|
||||
else
|
||||
range=""
|
||||
base="0.0.0"
|
||||
fi
|
||||
|
||||
subjects="$(git log ${range} --format='%s')"
|
||||
bodies="$(git log ${range} --format='%B')"
|
||||
|
||||
bump="none"
|
||||
if printf '%s\n' "$bodies" | grep -qiE 'BREAKING[ -]CHANGE' \
|
||||
|| printf '%s\n' "$subjects" | grep -qE '^[a-z]+([(][^)]*[)])?!:'; then
|
||||
bump="major"
|
||||
elif printf '%s\n' "$subjects" | grep -qE '^feat([(][^)]*[)])?:'; then
|
||||
bump="minor"
|
||||
elif printf '%s\n' "$subjects" | grep -qE '^(fix|perf)([(][^)]*[)])?:'; then
|
||||
bump="patch"
|
||||
fi
|
||||
|
||||
major="${base%%.*}"
|
||||
rest="${base#*.}"
|
||||
minor="${rest%%.*}"
|
||||
patch="${rest##*.}"
|
||||
|
||||
release="false"
|
||||
if [ "${GITHUB_EVENT_NAME}" = "push" ] && [ "$bump" != "none" ]; then
|
||||
release="true"
|
||||
case "$bump" in
|
||||
major) major=$((major + 1)); minor=0; patch=0 ;;
|
||||
minor) minor=$((minor + 1)); patch=0 ;;
|
||||
patch) patch=$((patch + 1)) ;;
|
||||
esac
|
||||
version="${major}.${minor}.${patch}"
|
||||
{
|
||||
echo "tags<<__EOT__"
|
||||
echo "${image}:${version}"
|
||||
echo "${image}:${major}.${minor}"
|
||||
echo "${image}:${major}"
|
||||
echo "${image}:latest"
|
||||
echo "__EOT__"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
echo "version=${version}" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
short="$(git rev-parse --short HEAD)"
|
||||
{
|
||||
echo "tags<<__EOT__"
|
||||
echo "${image}:sha-${short}"
|
||||
echo "__EOT__"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
echo "release=${release}" >> "$GITHUB_OUTPUT"
|
||||
echo "Computed bump=${bump}, release=${release}, base=${base}"
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@06116385d9baf250c9f4dcb4858b16962ea869c3 # v4
|
||||
|
||||
- name: Set up Buildx
|
||||
uses: docker/setup-buildx-action@d7f5e7f509e45cec5c76c4d5afdd7de93d0b3df5 # v4
|
||||
|
||||
- name: Log in to the Gitea container registry
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.PACKAGES_TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@f9f3042f7e2789586610d6e8b85c8f03e5195baf # v7
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.version.outputs.tags }}
|
||||
labels: |
|
||||
org.opencontainers.image.source=${{ github.server_url }}/${{ github.repository }}
|
||||
org.opencontainers.image.revision=${{ github.sha }}
|
||||
|
||||
# Record the release as an annotated git tag so the next run computes the
|
||||
# following version from it. This push does not re-trigger the workflow,
|
||||
# which only listens on the main branch and pull requests.
|
||||
- name: Tag the release
|
||||
if: steps.version.outputs.release == 'true'
|
||||
run: |
|
||||
set -euo pipefail
|
||||
v="v${{ steps.version.outputs.version }}"
|
||||
git config user.name "${{ github.actor }}"
|
||||
git config user.email "${{ github.actor }}@users.noreply.${REGISTRY}"
|
||||
git tag -a "$v" -m "$v"
|
||||
git push origin "$v"
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
|
||||
# Create a dedicated non-root user and group to run the proxy.
|
||||
RUN groupadd --system appuser && useradd --system --gid appuser appuser
|
||||
|
||||
COPY requirements.txt ./
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY --chown=appuser:appuser proxy_server.py ./
|
||||
|
||||
EXPOSE 110 25
|
||||
|
||||
USER appuser
|
||||
|
||||
CMD ["python", "proxy_server.py"]
|
||||
@@ -0,0 +1,78 @@
|
||||
# Legacy Email Proxy
|
||||
|
||||
Proxy an unauthenticated, unencrypted POP3 / SMTP server to authenticated IMAPS and SMTPS backends.
|
||||
|
||||
## Features
|
||||
|
||||
- Exposes legacy `POP3` on `0.0.0.0:110` and legacy `SMTP` on `0.0.0.0:25`
|
||||
- Forwards POP3 mailbox access to an IMAP backend
|
||||
- Forwards SMTP submissions to an SMTPS backend
|
||||
- Backend host, ports, and credentials are configured via environment variables
|
||||
|
||||
## Environment Variables
|
||||
|
||||
- `POP3_BIND_ADDR` (default `0.0.0.0`)
|
||||
- `POP3_BIND_PORT` (default `110`)
|
||||
- `SMTP_BIND_ADDR` (default `0.0.0.0`)
|
||||
- `SMTP_BIND_PORT` (default `25`)
|
||||
|
||||
- `BACKEND_IMAP_HOST`
|
||||
- `BACKEND_IMAP_PORT` (default `993`)
|
||||
- `BACKEND_IMAP_USER`
|
||||
- `BACKEND_IMAP_PASS`
|
||||
- `BACKEND_IMAP_USE_SSL` (default `true`)
|
||||
- `BACKEND_IMAP_USE_STARTTLS` (default `false`)
|
||||
|
||||
- `BACKEND_SMTP_HOST`
|
||||
- `BACKEND_SMTP_PORT` (default `465`)
|
||||
- `BACKEND_SMTP_USER`
|
||||
- `BACKEND_SMTP_PASS`
|
||||
- `BACKEND_SMTP_USE_SSL` (default `true`)
|
||||
- `BACKEND_SMTP_USE_TLS` (default `false`)
|
||||
|
||||
- `BACKEND_MUTATE` (default `false`) - when `true`, POP3 deletions are propagated to the
|
||||
backend IMAP server (STORE +FLAGS / EXPUNGE). Default behaviour is to never mutate
|
||||
the backend mailbox on POP client deletions; the proxy only hides messages for the
|
||||
duration of the client session.
|
||||
|
||||
## Build and run
|
||||
|
||||
This project targets the latest Python LTS release. The included `Dockerfile` uses `python:3.12-slim`, which is compatible with Python 3.12 and later LTS releases.
|
||||
|
||||
```bash
|
||||
docker build -t legacy-email-proxy .
|
||||
docker run --rm -p 110:110 -p 25:25 \
|
||||
-e BACKEND_IMAP_HOST=imap.example.com \
|
||||
-e BACKEND_IMAP_PORT=993 \
|
||||
-e BACKEND_IMAP_USER=imap-user \
|
||||
-e BACKEND_IMAP_PASS=imap-pass \
|
||||
-e BACKEND_SMTP_HOST=smtp.example.com \
|
||||
-e BACKEND_SMTP_PORT=465 \
|
||||
-e BACKEND_SMTP_USER=smtp-user \
|
||||
-e BACKEND_SMTP_PASS=smtp-pass \
|
||||
legacy-email-proxy
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
||||
Install development dependencies and run the test suite:
|
||||
|
||||
```bash
|
||||
python -m venv .venv
|
||||
source .venv/bin/activate
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements-dev.txt
|
||||
pytest -q
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
This implementation begins the proxy with a minimal POP3 command set and SMTP delivery path. It is designed to start development on the required application architecture.
|
||||
|
||||
## Security
|
||||
|
||||
By design, the front-end POP3 (port 110) and SMTP (port 25) listeners are **unencrypted** and **unauthenticated**. Anyone who can reach port 110 obtains full mailbox access, and anyone who can reach port 25 can relay mail through the configured backend SMTP credentials, which is an open relay from the network's perspective.
|
||||
|
||||
Because of this, the listeners **must** be bound to a trusted internal network only, such as a private Docker bridge, a VPN interface, or localhost, and **must not** be exposed to untrusted networks or the public internet.
|
||||
|
||||
Operators who need to restrict the bind address can set `POP3_BIND_ADDR` / `SMTP_BIND_ADDR` to a specific internal interface instead of `0.0.0.0`.
|
||||
+465
@@ -0,0 +1,465 @@
|
||||
"""Legacy POP3/SMTP proxy to IMAP/SMTP backends.
|
||||
|
||||
This module implements a lightweight proxy that exposes legacy, unauthenticated
|
||||
POP3 and SMTP interfaces and translates them to authenticated encrypted backend
|
||||
connections. It is intended to run inside Docker on a supported Python LTS
|
||||
release.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import imaplib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import smtplib
|
||||
import ssl
|
||||
from aiosmtpd.controller import Controller
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def env_bool(name, default=False):
|
||||
"""Return a boolean environment variable value.
|
||||
|
||||
Accepts common truthy strings: 1, true, yes, on.
|
||||
"""
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
return raw.strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
class Settings:
|
||||
"""Configuration values read from environment variables."""
|
||||
|
||||
POP3_BIND_ADDR = os.getenv("POP3_BIND_ADDR", "0.0.0.0")
|
||||
POP3_BIND_PORT = int(os.getenv("POP3_BIND_PORT", "110"))
|
||||
SMTP_BIND_ADDR = os.getenv("SMTP_BIND_ADDR", "0.0.0.0")
|
||||
SMTP_BIND_PORT = int(os.getenv("SMTP_BIND_PORT", "25"))
|
||||
|
||||
BACKEND_IMAP_HOST = os.getenv("BACKEND_IMAP_HOST")
|
||||
BACKEND_IMAP_PORT = int(os.getenv("BACKEND_IMAP_PORT", "993"))
|
||||
BACKEND_IMAP_USER = os.getenv("BACKEND_IMAP_USER")
|
||||
BACKEND_IMAP_PASS = os.getenv("BACKEND_IMAP_PASS")
|
||||
BACKEND_IMAP_USE_SSL = env_bool("BACKEND_IMAP_USE_SSL", True)
|
||||
BACKEND_IMAP_USE_STARTTLS = env_bool("BACKEND_IMAP_USE_STARTTLS", False)
|
||||
|
||||
BACKEND_SMTP_HOST = os.getenv("BACKEND_SMTP_HOST")
|
||||
BACKEND_SMTP_PORT = int(os.getenv("BACKEND_SMTP_PORT", "465"))
|
||||
BACKEND_SMTP_USER = os.getenv("BACKEND_SMTP_USER")
|
||||
BACKEND_SMTP_PASS = os.getenv("BACKEND_SMTP_PASS")
|
||||
BACKEND_SMTP_USE_SSL = env_bool("BACKEND_SMTP_USE_SSL", True)
|
||||
BACKEND_SMTP_USE_TLS = env_bool("BACKEND_SMTP_USE_TLS", False)
|
||||
# When false (default) the proxy will not mutate backend mailboxes
|
||||
# (no STORE +FLAGS / EXPUNGE). Set to true only when deletions should
|
||||
# be propagated to the backend IMAP server.
|
||||
BACKEND_MUTATE = env_bool("BACKEND_MUTATE", False)
|
||||
|
||||
@classmethod
|
||||
def validate(cls):
|
||||
"""Validate that required backend settings are configured."""
|
||||
missing = []
|
||||
if not cls.BACKEND_IMAP_HOST:
|
||||
missing.append("BACKEND_IMAP_HOST")
|
||||
if not cls.BACKEND_SMTP_HOST:
|
||||
missing.append("BACKEND_SMTP_HOST")
|
||||
if missing:
|
||||
raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}")
|
||||
|
||||
|
||||
def _split_lines(data):
|
||||
"""Split a byte string into lines, normalising CR/CRLF/LF endings."""
|
||||
text = data.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
|
||||
lines = text.split(b"\n")
|
||||
if lines and lines[-1] == b"":
|
||||
lines = lines[:-1]
|
||||
return lines
|
||||
|
||||
|
||||
def format_pop3_body(body, max_body_lines=None):
|
||||
"""Prepare a message for POP3 RETR/TOP transmission (RFC 1939).
|
||||
|
||||
Normalises line endings to CRLF, byte-stuffs any line beginning with ".",
|
||||
and appends the terminating ".\\r\\n" on its own line. When ``max_body_lines``
|
||||
is given, the full header block is kept and the body is truncated to that
|
||||
many lines (TOP semantics).
|
||||
"""
|
||||
lines = _split_lines(body)
|
||||
if max_body_lines is not None:
|
||||
try:
|
||||
separator = lines.index(b"")
|
||||
except ValueError:
|
||||
separator = len(lines)
|
||||
header_lines = lines[: separator + 1]
|
||||
body_lines = lines[separator + 1 :][:max_body_lines]
|
||||
lines = header_lines + body_lines
|
||||
stuffed = []
|
||||
for line in lines:
|
||||
if line.startswith(b"."):
|
||||
line = b"." + line
|
||||
stuffed.append(line)
|
||||
stuffed.append(b".")
|
||||
return b"\r\n".join(stuffed) + b"\r\n"
|
||||
|
||||
|
||||
class IMAPBackend:
|
||||
"""Minimal IMAP wrapper for backend mailbox access."""
|
||||
|
||||
UID_PATTERN = re.compile(rb"\(UID (\d+) RFC822\.SIZE (\d+)\)")
|
||||
|
||||
def __init__(self, username, password):
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.connection = None
|
||||
|
||||
def _connect(self):
|
||||
if Settings.BACKEND_IMAP_USE_SSL:
|
||||
client = imaplib.IMAP4_SSL(
|
||||
Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30
|
||||
)
|
||||
else:
|
||||
client = imaplib.IMAP4(
|
||||
Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30
|
||||
)
|
||||
if Settings.BACKEND_IMAP_USE_STARTTLS:
|
||||
client.starttls(ssl_context=ssl.create_default_context())
|
||||
return client
|
||||
|
||||
def login(self):
|
||||
"""Open an IMAP connection and select INBOX."""
|
||||
client = self._connect()
|
||||
client.login(self.username, self.password)
|
||||
client.select("INBOX")
|
||||
self.connection = client
|
||||
return client
|
||||
|
||||
def logout(self):
|
||||
"""Safely close the IMAP connection."""
|
||||
if self.connection is not None:
|
||||
try:
|
||||
self.connection.logout()
|
||||
except Exception:
|
||||
pass
|
||||
self.connection = None
|
||||
|
||||
def list_uids(self):
|
||||
typ, data = self.connection.uid("search", None, "ALL")
|
||||
if typ != "OK":
|
||||
raise RuntimeError("IMAP UID search failed")
|
||||
return data[0].split() if data and data[0] else []
|
||||
|
||||
def fetch_message_size(self, uid):
|
||||
typ, data = self.connection.uid("fetch", uid, "(RFC822.SIZE)")
|
||||
if typ != "OK" or not data:
|
||||
raise RuntimeError("IMAP size fetch failed")
|
||||
response = data[0]
|
||||
if isinstance(response, tuple):
|
||||
response = response[0]
|
||||
match = re.search(rb"RFC822\.SIZE (\d+)", response)
|
||||
return int(match.group(1)) if match else 0
|
||||
|
||||
def fetch_all_sizes(self):
|
||||
"""Return a {uid: size} mapping for all messages in one IMAP round-trip."""
|
||||
sizes = {}
|
||||
typ, data = self.connection.uid("fetch", "1:*", "(RFC822.SIZE)")
|
||||
if typ != "OK" or not data:
|
||||
return sizes
|
||||
for response in data:
|
||||
if isinstance(response, tuple):
|
||||
response = response[0]
|
||||
if not isinstance(response, bytes):
|
||||
continue
|
||||
match = self.UID_PATTERN.search(response)
|
||||
if match:
|
||||
sizes[match.group(1)] = int(match.group(2))
|
||||
return sizes
|
||||
|
||||
def fetch_message(self, uid):
|
||||
typ, data = self.connection.uid("fetch", uid, "(RFC822)")
|
||||
if typ != "OK" or not data:
|
||||
raise RuntimeError("IMAP message fetch failed")
|
||||
for chunk in data:
|
||||
if isinstance(chunk, tuple) and len(chunk) > 1:
|
||||
return chunk[1]
|
||||
raise RuntimeError("IMAP message fetch returned no body")
|
||||
|
||||
def mark_deleted(self, uid):
|
||||
self.connection.uid("STORE", uid, "+FLAGS.SILENT", "(\\Deleted)")
|
||||
|
||||
def expunge(self):
|
||||
self.connection.expunge()
|
||||
|
||||
|
||||
class POP3Session:
|
||||
"""Session state and command handling for a single POP3 client."""
|
||||
|
||||
def __init__(self, reader, writer):
|
||||
self.reader = reader
|
||||
self.writer = writer
|
||||
self._authenticated = False
|
||||
self._imap = None
|
||||
self.username = None
|
||||
self.password = None
|
||||
self.message_ids = []
|
||||
self.deleted = set()
|
||||
|
||||
async def run(self):
|
||||
"""Process POP3 commands until the client disconnects."""
|
||||
await self.send_line("+OK POP3 proxy ready")
|
||||
while True:
|
||||
line = await self.reader.readline()
|
||||
if not line:
|
||||
break
|
||||
command = line.decode("utf-8", errors="ignore").strip()
|
||||
if not command:
|
||||
continue
|
||||
logger.info("POP3 command: %s", command)
|
||||
try:
|
||||
response = await self.handle_command(command)
|
||||
except Exception as exc:
|
||||
logger.exception("POP3 handling failed")
|
||||
await self.send_line(f"-ERR {exc}")
|
||||
break
|
||||
if response is False:
|
||||
break
|
||||
await self.close()
|
||||
|
||||
async def handle_command(self, command_line):
|
||||
parts = command_line.split()
|
||||
command = parts[0].upper()
|
||||
args = parts[1:]
|
||||
|
||||
if command == "USER":
|
||||
return await self.handle_user(args)
|
||||
if command == "PASS":
|
||||
return await self.handle_pass(args)
|
||||
if command == "STAT":
|
||||
return await self.handle_stat()
|
||||
if command == "LIST":
|
||||
return await self.handle_list(args)
|
||||
if command == "RETR":
|
||||
return await self.handle_retr(args)
|
||||
if command == "TOP":
|
||||
return await self.handle_top(args)
|
||||
if command == "DELE":
|
||||
return await self.handle_dele(args)
|
||||
if command == "NOOP":
|
||||
return await self.send_line("+OK")
|
||||
if command == "RSET":
|
||||
self.deleted.clear()
|
||||
return await self.send_line("+OK")
|
||||
if command == "QUIT":
|
||||
await self.handle_quit()
|
||||
return False
|
||||
if command == "UIDL":
|
||||
return await self.handle_uidl(args)
|
||||
return await self.send_line("-ERR Unsupported command")
|
||||
|
||||
async def handle_user(self, args):
|
||||
# Accept any username. Client credentials are intentionally ignored;
|
||||
# some legacy clients insist on supplying them, so they are accepted
|
||||
# blindly. The backend is always reached with the proxy's own creds.
|
||||
self.username = args[0] if args else None
|
||||
return await self.send_line("+OK")
|
||||
|
||||
async def handle_pass(self, args):
|
||||
# Accept any password. See handle_user: client credentials are
|
||||
# accepted but never used or validated.
|
||||
self.password = args[0] if args else None
|
||||
await asyncio.to_thread(self.authenticate)
|
||||
return await self.send_line("+OK User authenticated")
|
||||
|
||||
def authenticate(self):
|
||||
"""Authenticate to the IMAP backend using the configured proxy credentials.
|
||||
|
||||
Client-supplied POP3 credentials are deliberately ignored: the proxy
|
||||
always connects to the backend with ``BACKEND_IMAP_USER`` /
|
||||
``BACKEND_IMAP_PASS``. This is by design for legacy clients that require
|
||||
credentials to be entered even though the proxy does not use them.
|
||||
"""
|
||||
if not (Settings.BACKEND_IMAP_USER and Settings.BACKEND_IMAP_PASS):
|
||||
raise RuntimeError("Backend IMAP credentials are not configured")
|
||||
|
||||
backend = IMAPBackend(Settings.BACKEND_IMAP_USER, Settings.BACKEND_IMAP_PASS)
|
||||
backend.login()
|
||||
self._imap = backend
|
||||
# Snapshot the maildrop once; it stays static for the session lifetime
|
||||
# (RFC 1939), so DELE marks are not wiped by later STAT/LIST/UIDL.
|
||||
self.message_ids = backend.list_uids()
|
||||
|
||||
async def handle_stat(self):
|
||||
self._require_auth()
|
||||
sizes = await asyncio.to_thread(self._imap.fetch_all_sizes)
|
||||
count = len(self.message_ids) - len(self.deleted)
|
||||
total_size = sum(sizes.get(uid, 0) for uid in self.message_ids if uid not in self.deleted)
|
||||
return await self.send_line(f"+OK {count} {total_size}")
|
||||
|
||||
async def handle_list(self, args):
|
||||
self._require_auth()
|
||||
sizes = await asyncio.to_thread(self._imap.fetch_all_sizes)
|
||||
if not args:
|
||||
lines = [f"+OK {len(self.message_ids)} messages"]
|
||||
for index, uid in enumerate(self.message_ids, start=1):
|
||||
if uid in self.deleted:
|
||||
continue
|
||||
lines.append(f"{index} {sizes.get(uid, 0)}")
|
||||
lines.append(".")
|
||||
return await self.send_lines(lines)
|
||||
if len(args) != 1 or not args[0].isdigit():
|
||||
return await self.send_line("-ERR LIST takes optional message number")
|
||||
message_number = int(args[0])
|
||||
if message_number < 1 or message_number > len(self.message_ids):
|
||||
return await self.send_line("-ERR no such message")
|
||||
uid = self.message_ids[message_number - 1]
|
||||
if uid in self.deleted:
|
||||
return await self.send_line("-ERR message deleted")
|
||||
return await self.send_line(f"+OK {message_number} {sizes.get(uid, 0)}")
|
||||
|
||||
async def handle_retr(self, args):
|
||||
self._require_auth()
|
||||
if len(args) != 1 or not args[0].isdigit():
|
||||
return await self.send_line("-ERR RETR requires message number")
|
||||
index = int(args[0])
|
||||
if index < 1 or index > len(self.message_ids):
|
||||
return await self.send_line("-ERR no such message")
|
||||
uid = self.message_ids[index - 1]
|
||||
if uid in self.deleted:
|
||||
return await self.send_line("-ERR message deleted")
|
||||
message = await asyncio.to_thread(self._imap.fetch_message, uid)
|
||||
payload = format_pop3_body(message)
|
||||
await self.send_line(f"+OK {len(message)} octets")
|
||||
await self.send_raw(payload)
|
||||
|
||||
async def handle_top(self, args):
|
||||
self._require_auth()
|
||||
if len(args) != 2 or not args[0].isdigit() or not args[1].isdigit():
|
||||
return await self.send_line("-ERR TOP requires message number and line count")
|
||||
index = int(args[0])
|
||||
line_count = int(args[1])
|
||||
if index < 1 or index > len(self.message_ids):
|
||||
return await self.send_line("-ERR no such message")
|
||||
uid = self.message_ids[index - 1]
|
||||
if uid in self.deleted:
|
||||
return await self.send_line("-ERR message deleted")
|
||||
message = await asyncio.to_thread(self._imap.fetch_message, uid)
|
||||
payload = format_pop3_body(message, max_body_lines=line_count)
|
||||
await self.send_line("+OK top of message follows")
|
||||
await self.send_raw(payload)
|
||||
|
||||
async def handle_dele(self, args):
|
||||
self._require_auth()
|
||||
if len(args) != 1 or not args[0].isdigit():
|
||||
return await self.send_line("-ERR DELE requires message number")
|
||||
index = int(args[0])
|
||||
if index < 1 or index > len(self.message_ids):
|
||||
return await self.send_line("-ERR no such message")
|
||||
uid = self.message_ids[index - 1]
|
||||
self.deleted.add(uid)
|
||||
return await self.send_line("+OK message marked for deletion")
|
||||
|
||||
async def handle_uidl(self, args):
|
||||
self._require_auth()
|
||||
if not args:
|
||||
lines = ["+OK UID list follows"]
|
||||
for index, uid in enumerate(self.message_ids, start=1):
|
||||
if uid in self.deleted:
|
||||
continue
|
||||
lines.append(f"{index} {uid.decode('ascii')}")
|
||||
lines.append(".")
|
||||
return await self.send_lines(lines)
|
||||
if len(args) != 1 or not args[0].isdigit():
|
||||
return await self.send_line("-ERR UIDL takes a single message number")
|
||||
message_number = int(args[0])
|
||||
if message_number < 1 or message_number > len(self.message_ids):
|
||||
return await self.send_line("-ERR no such message")
|
||||
uid = self.message_ids[message_number - 1]
|
||||
if uid in self.deleted:
|
||||
return await self.send_line("-ERR message deleted")
|
||||
return await self.send_line(f"+OK {message_number} {uid.decode('ascii')}")
|
||||
|
||||
async def handle_quit(self):
|
||||
if self._imap:
|
||||
# Only propagate deletes to the backend when explicitly enabled.
|
||||
if Settings.BACKEND_MUTATE:
|
||||
for uid in self.deleted:
|
||||
await asyncio.to_thread(self._imap.mark_deleted, uid)
|
||||
await asyncio.to_thread(self._imap.expunge)
|
||||
# Always logout the backend connection.
|
||||
self._imap.logout()
|
||||
await self.send_line("+OK Goodbye")
|
||||
|
||||
async def send_line(self, line):
|
||||
self.writer.write((line + "\r\n").encode("utf-8"))
|
||||
await self.writer.drain()
|
||||
|
||||
async def send_lines(self, lines):
|
||||
for line in lines:
|
||||
self.writer.write((line + "\r\n").encode("utf-8"))
|
||||
await self.writer.drain()
|
||||
|
||||
async def send_raw(self, data):
|
||||
self.writer.write(data)
|
||||
await self.writer.drain()
|
||||
|
||||
def _require_auth(self):
|
||||
if self._imap is None:
|
||||
raise RuntimeError("Not authenticated")
|
||||
|
||||
async def close(self):
|
||||
try:
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class SMTPProxyHandler:
|
||||
"""SMTP handler that relays inbound messages to the backend SMTP server."""
|
||||
|
||||
async def handle_DATA(self, server, session, envelope):
|
||||
logger.info("SMTP message received from %s to %s", envelope.mail_from, envelope.rcpt_tos)
|
||||
await asyncio.to_thread(self.send_message, envelope.mail_from, envelope.rcpt_tos, envelope.content)
|
||||
return "250 Message accepted for delivery"
|
||||
|
||||
def send_message(self, sender, recipients, data):
|
||||
"""Forward a complete SMTP message to the backend SMTP server."""
|
||||
if Settings.BACKEND_SMTP_USE_SSL:
|
||||
smtp = smtplib.SMTP_SSL(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
|
||||
else:
|
||||
smtp = smtplib.SMTP(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
|
||||
if Settings.BACKEND_SMTP_USE_TLS:
|
||||
smtp.starttls(context=ssl.create_default_context())
|
||||
try:
|
||||
if Settings.BACKEND_SMTP_USER and Settings.BACKEND_SMTP_PASS:
|
||||
smtp.login(Settings.BACKEND_SMTP_USER, Settings.BACKEND_SMTP_PASS)
|
||||
smtp.sendmail(sender, recipients, data)
|
||||
finally:
|
||||
smtp.quit()
|
||||
|
||||
|
||||
async def handle_pop3_client(reader, writer):
|
||||
"""Create and run a POP3 session for a new client connection."""
|
||||
session = POP3Session(reader, writer)
|
||||
await session.run()
|
||||
|
||||
|
||||
async def main():
|
||||
"""Initialize the proxy and start the POP3 and SMTP servers."""
|
||||
Settings.validate()
|
||||
logger.info("Starting POP3 server on %s:%s", Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT)
|
||||
pop3_server = await asyncio.start_server(handle_pop3_client, Settings.POP3_BIND_ADDR, Settings.POP3_BIND_PORT)
|
||||
|
||||
logger.info("Starting SMTP server on %s:%s", Settings.SMTP_BIND_ADDR, Settings.SMTP_BIND_PORT)
|
||||
smtp_controller = Controller(SMTPProxyHandler(), hostname=Settings.SMTP_BIND_ADDR, port=Settings.SMTP_BIND_PORT)
|
||||
smtp_controller.start()
|
||||
|
||||
async with pop3_server:
|
||||
await pop3_server.serve_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Shutdown requested")
|
||||
@@ -0,0 +1,5 @@
|
||||
[pytest]
|
||||
minversion = 7.0
|
||||
testpaths = tests
|
||||
python_files = test_*.py
|
||||
addopts = -q
|
||||
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"extends": ["config:base"],
|
||||
"timezone": "UTC",
|
||||
"packageRules": [
|
||||
{
|
||||
"matchPackagePatterns": ["*"],
|
||||
"groupName": "all dependencies",
|
||||
"enabled": true
|
||||
}
|
||||
],
|
||||
"labels": ["dependencies"],
|
||||
"automerge": false,
|
||||
"prHourlyLimit": 2,
|
||||
"rangeStrategy": "bump",
|
||||
"postUpdateOptions": ["gomodTidy"],
|
||||
"ignorePaths": ["**/node_modules/**"]
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
-r requirements.txt
|
||||
pytest>=8.0.0
|
||||
@@ -0,0 +1 @@
|
||||
aiosmtpd>=1.4.6,<1.5
|
||||
@@ -0,0 +1,7 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Ensure the project root is on sys.path when running tests.
|
||||
ROOT = os.path.dirname(os.path.dirname(__file__))
|
||||
if ROOT not in sys.path:
|
||||
sys.path.insert(0, ROOT)
|
||||
@@ -0,0 +1,393 @@
|
||||
import asyncio
|
||||
import imaplib
|
||||
import smtplib
|
||||
|
||||
import pytest
|
||||
|
||||
from proxy_server import (
|
||||
IMAPBackend,
|
||||
POP3Session,
|
||||
SMTPProxyHandler,
|
||||
Settings,
|
||||
env_bool,
|
||||
format_pop3_body,
|
||||
)
|
||||
|
||||
|
||||
class DummyIMAP:
|
||||
def __init__(self, host, port, timeout=0):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self.logged_in = False
|
||||
self.selected = None
|
||||
|
||||
def login(self, username, password):
|
||||
self.logged_in = True
|
||||
|
||||
def select(self, mailbox):
|
||||
self.selected = mailbox
|
||||
|
||||
def uid(self, command, *args):
|
||||
if command == "search":
|
||||
return "OK", [b"1 2 3"]
|
||||
if command == "fetch" and args[1] == "(RFC822.SIZE)":
|
||||
if args[0] == "1:*":
|
||||
return "OK", [
|
||||
b"1 (UID 1 RFC822.SIZE 1024)",
|
||||
b"2 (UID 2 RFC822.SIZE 2048)",
|
||||
b"3 (UID 3 RFC822.SIZE 512)",
|
||||
]
|
||||
return "OK", [(b"1 (RFC822.SIZE 1024)", b"")]
|
||||
if command == "fetch" and args[1] == "(RFC822)":
|
||||
body = b"Subject: Hi\r\n\r\nHello World\r\n"
|
||||
return "OK", [(b"1 (RFC822 {%d}" % len(body), body), b")"]
|
||||
return "NO", []
|
||||
|
||||
def logout(self):
|
||||
pass
|
||||
|
||||
def expunge(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_env_bool_interprets_truthy_values(monkeypatch):
|
||||
monkeypatch.setenv("FEATURE_ENABLED", "true")
|
||||
assert env_bool("FEATURE_ENABLED") is True
|
||||
monkeypatch.setenv("FEATURE_ENABLED", "1")
|
||||
assert env_bool("FEATURE_ENABLED") is True
|
||||
monkeypatch.setenv("FEATURE_ENABLED", "off")
|
||||
assert env_bool("FEATURE_ENABLED") is False
|
||||
monkeypatch.delenv("FEATURE_ENABLED", raising=False)
|
||||
assert env_bool("FEATURE_ENABLED", default=True) is True
|
||||
|
||||
|
||||
def test_settings_validate_requires_backend_hosts():
|
||||
original_imap = Settings.BACKEND_IMAP_HOST
|
||||
original_smtp = Settings.BACKEND_SMTP_HOST
|
||||
Settings.BACKEND_IMAP_HOST = None
|
||||
Settings.BACKEND_SMTP_HOST = None
|
||||
with pytest.raises(RuntimeError, match="Missing required environment variables"):
|
||||
Settings.validate()
|
||||
Settings.BACKEND_IMAP_HOST = original_imap
|
||||
Settings.BACKEND_SMTP_HOST = original_smtp
|
||||
|
||||
|
||||
def test_settings_validate_succeeds_with_backends():
|
||||
original_imap = Settings.BACKEND_IMAP_HOST
|
||||
original_smtp = Settings.BACKEND_SMTP_HOST
|
||||
Settings.BACKEND_IMAP_HOST = "imap.example.com"
|
||||
Settings.BACKEND_SMTP_HOST = "smtp.example.com"
|
||||
Settings.validate()
|
||||
Settings.BACKEND_IMAP_HOST = original_imap
|
||||
Settings.BACKEND_SMTP_HOST = original_smtp
|
||||
|
||||
|
||||
def test_imap_backend_can_login_and_fetch(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
imaplib, "IMAP4_SSL", lambda host, port, timeout=0: DummyIMAP(host, port, timeout)
|
||||
)
|
||||
backend = IMAPBackend("user", "pass")
|
||||
backend.login()
|
||||
assert backend.connection.logged_in
|
||||
assert backend.connection.selected == "INBOX"
|
||||
assert backend.list_uids() == [b"1", b"2", b"3"]
|
||||
assert backend.fetch_message_size(b"1") == 1024
|
||||
assert backend.fetch_all_sizes() == {b"1": 1024, b"2": 2048, b"3": 512}
|
||||
assert b"Hello World" in backend.fetch_message(b"1")
|
||||
|
||||
|
||||
def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
class DummySMTP:
|
||||
def __init__(self, host, port, timeout=0):
|
||||
captured["instance"] = self
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self.logged_in = False
|
||||
self.sent = None
|
||||
|
||||
def login(self, user, password):
|
||||
self.logged_in = True
|
||||
|
||||
def sendmail(self, sender, recipients, message):
|
||||
self.sent = (sender, recipients, message)
|
||||
|
||||
def quit(self):
|
||||
captured["quit"] = True
|
||||
|
||||
monkeypatch.setattr(smtplib, "SMTP_SSL", DummySMTP)
|
||||
previous_ssl = Settings.BACKEND_SMTP_USE_SSL
|
||||
previous_user = Settings.BACKEND_SMTP_USER
|
||||
previous_pass = Settings.BACKEND_SMTP_PASS
|
||||
Settings.BACKEND_SMTP_USE_SSL = True
|
||||
Settings.BACKEND_SMTP_USER = "smtp-user"
|
||||
Settings.BACKEND_SMTP_PASS = "smtp-pass"
|
||||
|
||||
# Include an 8-bit byte that would be mangled by a utf-8 decode.
|
||||
raw = b"Subject: Test\r\n\r\nBody \x80\r\n"
|
||||
handler = SMTPProxyHandler()
|
||||
handler.send_message("from@example.com", ["to@example.com"], raw)
|
||||
|
||||
assert captured["instance"].host == Settings.BACKEND_SMTP_HOST
|
||||
assert captured["instance"].logged_in is True
|
||||
assert captured["instance"].sent[0] == "from@example.com"
|
||||
assert captured["instance"].sent[1] == ["to@example.com"]
|
||||
# Raw bytes must be forwarded unchanged, not decoded.
|
||||
assert captured["instance"].sent[2] == raw
|
||||
|
||||
Settings.BACKEND_SMTP_USE_SSL = previous_ssl
|
||||
Settings.BACKEND_SMTP_USER = previous_user
|
||||
Settings.BACKEND_SMTP_PASS = previous_pass
|
||||
|
||||
|
||||
def test_pop3_quit_does_not_mutate_backend_by_default():
|
||||
import asyncio
|
||||
|
||||
class DummyBackend:
|
||||
def __init__(self):
|
||||
self.deleted_called = []
|
||||
self.expunge_called = False
|
||||
self.logged_out = False
|
||||
|
||||
def mark_deleted(self, uid):
|
||||
self.deleted_called.append(uid)
|
||||
|
||||
def expunge(self):
|
||||
self.expunge_called = True
|
||||
|
||||
def logout(self):
|
||||
self.logged_out = True
|
||||
|
||||
session = POP3Session(None, None)
|
||||
backend = DummyBackend()
|
||||
session._imap = backend
|
||||
session.deleted = {b"1", b"2"}
|
||||
|
||||
# Provide a dummy writer so send_line can be awaited without a socket.
|
||||
class DummyWriter:
|
||||
def __init__(self):
|
||||
self.buf = b""
|
||||
self.closed = False
|
||||
|
||||
def write(self, data):
|
||||
self.buf += data
|
||||
|
||||
async def drain(self):
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
async def wait_closed(self):
|
||||
return None
|
||||
|
||||
session.writer = DummyWriter()
|
||||
|
||||
# Ensure default is not to mutate
|
||||
previous_mutate = Settings.BACKEND_MUTATE
|
||||
Settings.BACKEND_MUTATE = False
|
||||
asyncio.run(session.handle_quit())
|
||||
Settings.BACKEND_MUTATE = previous_mutate
|
||||
|
||||
assert backend.deleted_called == []
|
||||
assert backend.expunge_called is False
|
||||
assert backend.logged_out is True
|
||||
|
||||
|
||||
def test_pop3_quit_mutates_backend_when_enabled():
|
||||
import asyncio
|
||||
|
||||
class DummyBackend:
|
||||
def __init__(self):
|
||||
self.deleted_called = []
|
||||
self.expunge_called = False
|
||||
self.logged_out = False
|
||||
|
||||
def mark_deleted(self, uid):
|
||||
self.deleted_called.append(uid)
|
||||
|
||||
def expunge(self):
|
||||
self.expunge_called = True
|
||||
|
||||
def logout(self):
|
||||
self.logged_out = True
|
||||
|
||||
session = POP3Session(None, None)
|
||||
backend = DummyBackend()
|
||||
session._imap = backend
|
||||
session.deleted = {b"1", b"2"}
|
||||
|
||||
session.writer = DummyWriter()
|
||||
|
||||
previous_mutate = Settings.BACKEND_MUTATE
|
||||
Settings.BACKEND_MUTATE = True
|
||||
asyncio.run(session.handle_quit())
|
||||
Settings.BACKEND_MUTATE = previous_mutate
|
||||
|
||||
assert set(backend.deleted_called) == {b"1", b"2"}
|
||||
assert backend.expunge_called is True
|
||||
assert backend.logged_out is True
|
||||
|
||||
|
||||
class FakeWriter:
|
||||
def __init__(self):
|
||||
self.buffer = bytearray()
|
||||
|
||||
def write(self, data):
|
||||
self.buffer.extend(data)
|
||||
|
||||
async def drain(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
async def wait_closed(self):
|
||||
pass
|
||||
|
||||
|
||||
class DummyWriter:
|
||||
"""Simple writer used by tests where we only need write/drain/close."""
|
||||
|
||||
def __init__(self):
|
||||
self.buf = b""
|
||||
self.closed = False
|
||||
|
||||
def write(self, data):
|
||||
self.buf += data
|
||||
|
||||
async def drain(self):
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
async def wait_closed(self):
|
||||
return None
|
||||
|
||||
|
||||
class RecordingIMAP:
|
||||
"""In-memory IMAPBackend stand-in for POP3 session tests."""
|
||||
|
||||
def __init__(self, uids):
|
||||
self.uids = list(uids)
|
||||
self.marked = []
|
||||
self.expunged = False
|
||||
self.logged_out = False
|
||||
|
||||
def list_uids(self):
|
||||
return list(self.uids)
|
||||
|
||||
def fetch_all_sizes(self):
|
||||
return {uid: 100 for uid in self.uids}
|
||||
|
||||
def fetch_message(self, uid):
|
||||
return b"Subject: Hi\r\nHeader: 1\r\n\r\n.dotted line\r\nsecond\r\nthird\r\n"
|
||||
|
||||
def mark_deleted(self, uid):
|
||||
self.marked.append(uid)
|
||||
|
||||
def expunge(self):
|
||||
self.expunged = True
|
||||
|
||||
def logout(self):
|
||||
self.logged_out = True
|
||||
|
||||
|
||||
def make_session(uids):
|
||||
session = POP3Session(None, FakeWriter())
|
||||
session._imap = RecordingIMAP(uids)
|
||||
session.message_ids = session._imap.list_uids()
|
||||
return session
|
||||
|
||||
|
||||
def test_format_pop3_body_dot_stuffs_and_terminates():
|
||||
payload = format_pop3_body(b".secret\r\nplain\r\n")
|
||||
assert payload == b"..secret\r\nplain\r\n.\r\n"
|
||||
|
||||
|
||||
def test_format_pop3_body_top_keeps_headers_and_limits_body():
|
||||
message = b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\nline2\r\nline3\r\n"
|
||||
payload = format_pop3_body(message, max_body_lines=1)
|
||||
assert payload == b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\n.\r\n"
|
||||
|
||||
|
||||
def test_retr_preserves_leading_dot_line():
|
||||
session = make_session([b"1", b"2"])
|
||||
asyncio.run(session.handle_retr(["1"]))
|
||||
output = bytes(session.writer.buffer)
|
||||
assert b"+OK" in output
|
||||
# The body line ".dotted line" must survive intact after dot-stuffing/parse.
|
||||
assert b"\r\n..dotted line\r\n" in output
|
||||
assert output.endswith(b".\r\n")
|
||||
|
||||
|
||||
def test_top_returns_headers_and_limited_body():
|
||||
session = make_session([b"1"])
|
||||
asyncio.run(session.handle_top(["1", "1"]))
|
||||
output = bytes(session.writer.buffer)
|
||||
assert output.startswith(b"+OK top of message follows\r\n")
|
||||
assert b"Subject: Hi\r\n" in output
|
||||
assert b"\r\n..dotted line\r\n" in output
|
||||
assert b"second" not in output
|
||||
assert output.endswith(b".\r\n")
|
||||
|
||||
|
||||
def test_authenticate_ignores_client_credentials(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
def fake_login(self):
|
||||
captured["username"] = self.username
|
||||
captured["password"] = self.password
|
||||
|
||||
monkeypatch.setattr(IMAPBackend, "login", fake_login)
|
||||
monkeypatch.setattr(IMAPBackend, "list_uids", lambda self: [])
|
||||
previous_user = Settings.BACKEND_IMAP_USER
|
||||
previous_pass = Settings.BACKEND_IMAP_PASS
|
||||
Settings.BACKEND_IMAP_USER = "backend-user"
|
||||
Settings.BACKEND_IMAP_PASS = "backend-pass"
|
||||
|
||||
session = POP3Session(None, FakeWriter())
|
||||
session.username = "client-user"
|
||||
session.password = "client-pass"
|
||||
session.authenticate()
|
||||
|
||||
# The proxy must connect with its own credentials, never the client's.
|
||||
assert captured["username"] == "backend-user"
|
||||
assert captured["password"] == "backend-pass"
|
||||
|
||||
Settings.BACKEND_IMAP_USER = previous_user
|
||||
Settings.BACKEND_IMAP_PASS = previous_pass
|
||||
|
||||
|
||||
def test_authenticate_requires_backend_credentials(monkeypatch):
|
||||
previous_user = Settings.BACKEND_IMAP_USER
|
||||
previous_pass = Settings.BACKEND_IMAP_PASS
|
||||
Settings.BACKEND_IMAP_USER = None
|
||||
Settings.BACKEND_IMAP_PASS = None
|
||||
|
||||
session = POP3Session(None, FakeWriter())
|
||||
session.username = "client-user"
|
||||
session.password = "client-pass"
|
||||
with pytest.raises(RuntimeError, match="Backend IMAP credentials are not configured"):
|
||||
session.authenticate()
|
||||
|
||||
Settings.BACKEND_IMAP_USER = previous_user
|
||||
Settings.BACKEND_IMAP_PASS = previous_pass
|
||||
|
||||
|
||||
def test_dele_survives_stat_list_uidl_until_quit():
|
||||
session = make_session([b"1", b"2", b"3"])
|
||||
asyncio.run(session.handle_dele(["2"]))
|
||||
# Multidrop status commands must not clear the deletion mark.
|
||||
asyncio.run(session.handle_stat())
|
||||
asyncio.run(session.handle_list([]))
|
||||
asyncio.run(session.handle_uidl([]))
|
||||
assert b"2" in session.deleted
|
||||
# Default behaviour is not to mutate the backend mailbox on QUIT.
|
||||
asyncio.run(session.handle_quit())
|
||||
assert session._imap.marked == []
|
||||
assert session._imap.expunged is False
|
||||
Reference in New Issue
Block a user