Files

394 lines
12 KiB
Python
Raw Permalink Normal View History

import asyncio
import imaplib
import smtplib
import pytest
from proxy_server import (
IMAPBackend,
POP3Session,
SMTPProxyHandler,
Settings,
env_bool,
format_pop3_body,
)
class DummyIMAP:
def __init__(self, host, port, timeout=0):
self.host = host
self.port = port
self.timeout = timeout
self.logged_in = False
self.selected = None
def login(self, username, password):
self.logged_in = True
def select(self, mailbox):
self.selected = mailbox
def uid(self, command, *args):
if command == "search":
return "OK", [b"1 2 3"]
if command == "fetch" and args[1] == "(RFC822.SIZE)":
if args[0] == "1:*":
return "OK", [
b"1 (UID 1 RFC822.SIZE 1024)",
b"2 (UID 2 RFC822.SIZE 2048)",
b"3 (UID 3 RFC822.SIZE 512)",
]
return "OK", [(b"1 (RFC822.SIZE 1024)", b"")]
if command == "fetch" and args[1] == "(RFC822)":
body = b"Subject: Hi\r\n\r\nHello World\r\n"
return "OK", [(b"1 (RFC822 {%d}" % len(body), body), b")"]
return "NO", []
def logout(self):
pass
def expunge(self):
pass
def test_env_bool_interprets_truthy_values(monkeypatch):
monkeypatch.setenv("FEATURE_ENABLED", "true")
assert env_bool("FEATURE_ENABLED") is True
monkeypatch.setenv("FEATURE_ENABLED", "1")
assert env_bool("FEATURE_ENABLED") is True
monkeypatch.setenv("FEATURE_ENABLED", "off")
assert env_bool("FEATURE_ENABLED") is False
monkeypatch.delenv("FEATURE_ENABLED", raising=False)
assert env_bool("FEATURE_ENABLED", default=True) is True
def test_settings_validate_requires_backend_hosts():
original_imap = Settings.BACKEND_IMAP_HOST
original_smtp = Settings.BACKEND_SMTP_HOST
Settings.BACKEND_IMAP_HOST = None
Settings.BACKEND_SMTP_HOST = None
with pytest.raises(RuntimeError, match="Missing required environment variables"):
Settings.validate()
Settings.BACKEND_IMAP_HOST = original_imap
Settings.BACKEND_SMTP_HOST = original_smtp
def test_settings_validate_succeeds_with_backends():
original_imap = Settings.BACKEND_IMAP_HOST
original_smtp = Settings.BACKEND_SMTP_HOST
Settings.BACKEND_IMAP_HOST = "imap.example.com"
Settings.BACKEND_SMTP_HOST = "smtp.example.com"
Settings.validate()
Settings.BACKEND_IMAP_HOST = original_imap
Settings.BACKEND_SMTP_HOST = original_smtp
def test_imap_backend_can_login_and_fetch(monkeypatch):
monkeypatch.setattr(
imaplib, "IMAP4_SSL", lambda host, port, timeout=0: DummyIMAP(host, port, timeout)
)
backend = IMAPBackend("user", "pass")
backend.login()
assert backend.connection.logged_in
assert backend.connection.selected == "INBOX"
assert backend.list_uids() == [b"1", b"2", b"3"]
assert backend.fetch_message_size(b"1") == 1024
assert backend.fetch_all_sizes() == {b"1": 1024, b"2": 2048, b"3": 512}
assert b"Hello World" in backend.fetch_message(b"1")
def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
captured = {}
class DummySMTP:
def __init__(self, host, port, timeout=0):
captured["instance"] = self
self.host = host
self.port = port
self.timeout = timeout
self.logged_in = False
self.sent = None
def login(self, user, password):
self.logged_in = True
def sendmail(self, sender, recipients, message):
self.sent = (sender, recipients, message)
def quit(self):
captured["quit"] = True
monkeypatch.setattr(smtplib, "SMTP_SSL", DummySMTP)
previous_ssl = Settings.BACKEND_SMTP_USE_SSL
previous_user = Settings.BACKEND_SMTP_USER
previous_pass = Settings.BACKEND_SMTP_PASS
Settings.BACKEND_SMTP_USE_SSL = True
Settings.BACKEND_SMTP_USER = "smtp-user"
Settings.BACKEND_SMTP_PASS = "smtp-pass"
# Include an 8-bit byte that would be mangled by a utf-8 decode.
raw = b"Subject: Test\r\n\r\nBody \x80\r\n"
handler = SMTPProxyHandler()
handler.send_message("from@example.com", ["to@example.com"], raw)
assert captured["instance"].host == Settings.BACKEND_SMTP_HOST
assert captured["instance"].logged_in is True
assert captured["instance"].sent[0] == "from@example.com"
assert captured["instance"].sent[1] == ["to@example.com"]
# Raw bytes must be forwarded unchanged, not decoded.
assert captured["instance"].sent[2] == raw
Settings.BACKEND_SMTP_USE_SSL = previous_ssl
Settings.BACKEND_SMTP_USER = previous_user
Settings.BACKEND_SMTP_PASS = previous_pass
def test_pop3_quit_does_not_mutate_backend_by_default():
import asyncio
class DummyBackend:
def __init__(self):
self.deleted_called = []
self.expunge_called = False
self.logged_out = False
def mark_deleted(self, uid):
self.deleted_called.append(uid)
def expunge(self):
self.expunge_called = True
def logout(self):
self.logged_out = True
session = POP3Session(None, None)
backend = DummyBackend()
session._imap = backend
session.deleted = {b"1", b"2"}
# Provide a dummy writer so send_line can be awaited without a socket.
class DummyWriter:
def __init__(self):
self.buf = b""
self.closed = False
def write(self, data):
self.buf += data
async def drain(self):
return None
def close(self):
self.closed = True
async def wait_closed(self):
return None
session.writer = DummyWriter()
# Ensure default is not to mutate
previous_mutate = Settings.BACKEND_MUTATE
Settings.BACKEND_MUTATE = False
asyncio.run(session.handle_quit())
Settings.BACKEND_MUTATE = previous_mutate
assert backend.deleted_called == []
assert backend.expunge_called is False
assert backend.logged_out is True
def test_pop3_quit_mutates_backend_when_enabled():
import asyncio
class DummyBackend:
def __init__(self):
self.deleted_called = []
self.expunge_called = False
self.logged_out = False
def mark_deleted(self, uid):
self.deleted_called.append(uid)
def expunge(self):
self.expunge_called = True
def logout(self):
self.logged_out = True
session = POP3Session(None, None)
backend = DummyBackend()
session._imap = backend
session.deleted = {b"1", b"2"}
session.writer = DummyWriter()
previous_mutate = Settings.BACKEND_MUTATE
Settings.BACKEND_MUTATE = True
asyncio.run(session.handle_quit())
Settings.BACKEND_MUTATE = previous_mutate
assert set(backend.deleted_called) == {b"1", b"2"}
assert backend.expunge_called is True
assert backend.logged_out is True
class FakeWriter:
def __init__(self):
self.buffer = bytearray()
def write(self, data):
self.buffer.extend(data)
async def drain(self):
pass
def close(self):
pass
async def wait_closed(self):
pass
class DummyWriter:
"""Simple writer used by tests where we only need write/drain/close."""
def __init__(self):
self.buf = b""
self.closed = False
def write(self, data):
self.buf += data
async def drain(self):
return None
def close(self):
self.closed = True
async def wait_closed(self):
return None
class RecordingIMAP:
"""In-memory IMAPBackend stand-in for POP3 session tests."""
def __init__(self, uids):
self.uids = list(uids)
self.marked = []
self.expunged = False
self.logged_out = False
def list_uids(self):
return list(self.uids)
def fetch_all_sizes(self):
return {uid: 100 for uid in self.uids}
def fetch_message(self, uid):
return b"Subject: Hi\r\nHeader: 1\r\n\r\n.dotted line\r\nsecond\r\nthird\r\n"
def mark_deleted(self, uid):
self.marked.append(uid)
def expunge(self):
self.expunged = True
def logout(self):
self.logged_out = True
def make_session(uids):
session = POP3Session(None, FakeWriter())
session._imap = RecordingIMAP(uids)
session.message_ids = session._imap.list_uids()
return session
def test_format_pop3_body_dot_stuffs_and_terminates():
payload = format_pop3_body(b".secret\r\nplain\r\n")
assert payload == b"..secret\r\nplain\r\n.\r\n"
def test_format_pop3_body_top_keeps_headers_and_limits_body():
message = b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\nline2\r\nline3\r\n"
payload = format_pop3_body(message, max_body_lines=1)
assert payload == b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\n.\r\n"
def test_retr_preserves_leading_dot_line():
session = make_session([b"1", b"2"])
asyncio.run(session.handle_retr(["1"]))
output = bytes(session.writer.buffer)
assert b"+OK" in output
# The body line ".dotted line" must survive intact after dot-stuffing/parse.
assert b"\r\n..dotted line\r\n" in output
assert output.endswith(b".\r\n")
def test_top_returns_headers_and_limited_body():
session = make_session([b"1"])
asyncio.run(session.handle_top(["1", "1"]))
output = bytes(session.writer.buffer)
assert output.startswith(b"+OK top of message follows\r\n")
assert b"Subject: Hi\r\n" in output
assert b"\r\n..dotted line\r\n" in output
assert b"second" not in output
assert output.endswith(b".\r\n")
def test_authenticate_ignores_client_credentials(monkeypatch):
captured = {}
def fake_login(self):
captured["username"] = self.username
captured["password"] = self.password
monkeypatch.setattr(IMAPBackend, "login", fake_login)
monkeypatch.setattr(IMAPBackend, "list_uids", lambda self: [])
previous_user = Settings.BACKEND_IMAP_USER
previous_pass = Settings.BACKEND_IMAP_PASS
Settings.BACKEND_IMAP_USER = "backend-user"
Settings.BACKEND_IMAP_PASS = "backend-pass"
session = POP3Session(None, FakeWriter())
session.username = "client-user"
session.password = "client-pass"
session.authenticate()
# The proxy must connect with its own credentials, never the client's.
assert captured["username"] == "backend-user"
assert captured["password"] == "backend-pass"
Settings.BACKEND_IMAP_USER = previous_user
Settings.BACKEND_IMAP_PASS = previous_pass
def test_authenticate_requires_backend_credentials(monkeypatch):
previous_user = Settings.BACKEND_IMAP_USER
previous_pass = Settings.BACKEND_IMAP_PASS
Settings.BACKEND_IMAP_USER = None
Settings.BACKEND_IMAP_PASS = None
session = POP3Session(None, FakeWriter())
session.username = "client-user"
session.password = "client-pass"
with pytest.raises(RuntimeError, match="Backend IMAP credentials are not configured"):
session.authenticate()
Settings.BACKEND_IMAP_USER = previous_user
Settings.BACKEND_IMAP_PASS = previous_pass
def test_dele_survives_stat_list_uidl_until_quit():
session = make_session([b"1", b"2", b"3"])
asyncio.run(session.handle_dele(["2"]))
# Multidrop status commands must not clear the deletion mark.
asyncio.run(session.handle_stat())
asyncio.run(session.handle_list([]))
asyncio.run(session.handle_uidl([]))
assert b"2" in session.deleted
# Default behaviour is not to mutate the backend mailbox on QUIT.
asyncio.run(session.handle_quit())
assert session._imap.marked == []
assert session._imap.expunged is False