fix: correct POP3 RETR/TOP, static maildrop, and IMAP fetching
Fix several POP3/IMAP proxy correctness defects: - RETR returned an empty body because fetch_message kept only top-level bytes from the imaplib FETCH response; extract the RFC822 literal from the response tuple instead. - DELE marks were wiped mid-session because STAT/LIST/UIDL refreshed the mailbox and cleared the deleted set. Snapshot the UID list once at authentication and keep the maildrop static for the session lifetime. - RETR/TOP output now normalises line endings to CRLF, byte-stuffs lines beginning with ".", and emits the terminating ".\r\n" per RFC 1939. - STAT/LIST batch message sizes via a single threaded UID FETCH and the IMAP client now uses a 30s socket timeout, keeping blocking work off the event loop. - Implement the POP3 TOP command (headers plus first n body lines). Fixes #1 Fixes #2 Fixes #3 Fixes #5 Fixes #6 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+121
-5
@@ -1,15 +1,24 @@
|
||||
import asyncio
|
||||
import imaplib
|
||||
import smtplib
|
||||
|
||||
import pytest
|
||||
|
||||
from proxy_server import IMAPBackend, SMTPProxyHandler, Settings, env_bool
|
||||
from proxy_server import (
|
||||
IMAPBackend,
|
||||
POP3Session,
|
||||
SMTPProxyHandler,
|
||||
Settings,
|
||||
env_bool,
|
||||
format_pop3_body,
|
||||
)
|
||||
|
||||
|
||||
class DummyIMAP:
|
||||
def __init__(self, host, port):
|
||||
def __init__(self, host, port, timeout=0):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self.logged_in = False
|
||||
self.selected = None
|
||||
|
||||
@@ -23,9 +32,16 @@ class DummyIMAP:
|
||||
if command == "search":
|
||||
return "OK", [b"1 2 3"]
|
||||
if command == "fetch" and args[1] == "(RFC822.SIZE)":
|
||||
if args[0] == "1:*":
|
||||
return "OK", [
|
||||
b"1 (UID 1 RFC822.SIZE 1024)",
|
||||
b"2 (UID 2 RFC822.SIZE 2048)",
|
||||
b"3 (UID 3 RFC822.SIZE 512)",
|
||||
]
|
||||
return "OK", [(b"1 (RFC822.SIZE 1024)", b"")]
|
||||
if command == "fetch" and args[1] == "(RFC822)":
|
||||
return "OK", [b"1 (RFC822 {10}", b"Hello", b" World", b")"]
|
||||
body = b"Subject: Hi\r\n\r\nHello World\r\n"
|
||||
return "OK", [(b"1 (RFC822 {%d}" % len(body), body), b")"]
|
||||
return "NO", []
|
||||
|
||||
def logout(self):
|
||||
@@ -68,14 +84,17 @@ def test_settings_validate_succeeds_with_backends():
|
||||
|
||||
|
||||
def test_imap_backend_can_login_and_fetch(monkeypatch):
|
||||
monkeypatch.setattr(imaplib, "IMAP4_SSL", lambda host, port: DummyIMAP(host, port))
|
||||
monkeypatch.setattr(
|
||||
imaplib, "IMAP4_SSL", lambda host, port, timeout=0: DummyIMAP(host, port, timeout)
|
||||
)
|
||||
backend = IMAPBackend("user", "pass")
|
||||
backend.login()
|
||||
assert backend.connection.logged_in
|
||||
assert backend.connection.selected == "INBOX"
|
||||
assert backend.list_uids() == [b"1", b"2", b"3"]
|
||||
assert backend.fetch_message_size(b"1") == 1024
|
||||
assert b"Hello" in backend.fetch_message(b"1")
|
||||
assert backend.fetch_all_sizes() == {b"1": 1024, b"2": 2048, b"3": 512}
|
||||
assert b"Hello World" in backend.fetch_message(b"1")
|
||||
|
||||
|
||||
def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
|
||||
@@ -119,3 +138,100 @@ def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
|
||||
Settings.BACKEND_SMTP_USE_SSL = previous_ssl
|
||||
Settings.BACKEND_SMTP_USER = previous_user
|
||||
Settings.BACKEND_SMTP_PASS = previous_pass
|
||||
|
||||
|
||||
class FakeWriter:
|
||||
def __init__(self):
|
||||
self.buffer = bytearray()
|
||||
|
||||
def write(self, data):
|
||||
self.buffer.extend(data)
|
||||
|
||||
async def drain(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
async def wait_closed(self):
|
||||
pass
|
||||
|
||||
|
||||
class RecordingIMAP:
|
||||
"""In-memory IMAPBackend stand-in for POP3 session tests."""
|
||||
|
||||
def __init__(self, uids):
|
||||
self.uids = list(uids)
|
||||
self.marked = []
|
||||
self.expunged = False
|
||||
self.logged_out = False
|
||||
|
||||
def list_uids(self):
|
||||
return list(self.uids)
|
||||
|
||||
def fetch_all_sizes(self):
|
||||
return {uid: 100 for uid in self.uids}
|
||||
|
||||
def fetch_message(self, uid):
|
||||
return b"Subject: Hi\r\nHeader: 1\r\n\r\n.dotted line\r\nsecond\r\nthird\r\n"
|
||||
|
||||
def mark_deleted(self, uid):
|
||||
self.marked.append(uid)
|
||||
|
||||
def expunge(self):
|
||||
self.expunged = True
|
||||
|
||||
def logout(self):
|
||||
self.logged_out = True
|
||||
|
||||
|
||||
def make_session(uids):
|
||||
session = POP3Session(None, FakeWriter())
|
||||
session._imap = RecordingIMAP(uids)
|
||||
session.message_ids = session._imap.list_uids()
|
||||
return session
|
||||
|
||||
|
||||
def test_format_pop3_body_dot_stuffs_and_terminates():
|
||||
payload = format_pop3_body(b".secret\r\nplain\r\n")
|
||||
assert payload == b"..secret\r\nplain\r\n.\r\n"
|
||||
|
||||
|
||||
def test_format_pop3_body_top_keeps_headers_and_limits_body():
|
||||
message = b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\nline2\r\nline3\r\n"
|
||||
payload = format_pop3_body(message, max_body_lines=1)
|
||||
assert payload == b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\n.\r\n"
|
||||
|
||||
|
||||
def test_retr_preserves_leading_dot_line():
|
||||
session = make_session([b"1", b"2"])
|
||||
asyncio.run(session.handle_retr(["1"]))
|
||||
output = bytes(session.writer.buffer)
|
||||
assert b"+OK" in output
|
||||
# The body line ".dotted line" must survive intact after dot-stuffing/parse.
|
||||
assert b"\r\n..dotted line\r\n" in output
|
||||
assert output.endswith(b".\r\n")
|
||||
|
||||
|
||||
def test_top_returns_headers_and_limited_body():
|
||||
session = make_session([b"1"])
|
||||
asyncio.run(session.handle_top(["1", "1"]))
|
||||
output = bytes(session.writer.buffer)
|
||||
assert output.startswith(b"+OK top of message follows\r\n")
|
||||
assert b"Subject: Hi\r\n" in output
|
||||
assert b"\r\n..dotted line\r\n" in output
|
||||
assert b"second" not in output
|
||||
assert output.endswith(b".\r\n")
|
||||
|
||||
|
||||
def test_dele_survives_stat_list_uidl_until_quit():
|
||||
session = make_session([b"1", b"2", b"3"])
|
||||
asyncio.run(session.handle_dele(["2"]))
|
||||
# Multidrop status commands must not clear the deletion mark.
|
||||
asyncio.run(session.handle_stat())
|
||||
asyncio.run(session.handle_list([]))
|
||||
asyncio.run(session.handle_uidl([]))
|
||||
assert b"2" in session.deleted
|
||||
asyncio.run(session.handle_quit())
|
||||
assert session._imap.marked == [b"2"]
|
||||
assert session._imap.expunged is True
|
||||
|
||||
Reference in New Issue
Block a user