Compare commits
2 Commits
v0.1.0
..
df19c60b17
| Author | SHA1 | Date | |
|---|---|---|---|
| df19c60b17 | |||
| a29889b731 |
+90
-21
@@ -64,6 +64,41 @@ class Settings:
|
|||||||
raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}")
|
raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}")
|
||||||
|
|
||||||
|
|
||||||
|
def _split_lines(data):
|
||||||
|
"""Split a byte string into lines, normalising CR/CRLF/LF endings."""
|
||||||
|
text = data.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
|
||||||
|
lines = text.split(b"\n")
|
||||||
|
if lines and lines[-1] == b"":
|
||||||
|
lines = lines[:-1]
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
def format_pop3_body(body, max_body_lines=None):
|
||||||
|
"""Prepare a message for POP3 RETR/TOP transmission (RFC 1939).
|
||||||
|
|
||||||
|
Normalises line endings to CRLF, byte-stuffs any line beginning with ".",
|
||||||
|
and appends the terminating ".\\r\\n" on its own line. When ``max_body_lines``
|
||||||
|
is given, the full header block is kept and the body is truncated to that
|
||||||
|
many lines (TOP semantics).
|
||||||
|
"""
|
||||||
|
lines = _split_lines(body)
|
||||||
|
if max_body_lines is not None:
|
||||||
|
try:
|
||||||
|
separator = lines.index(b"")
|
||||||
|
except ValueError:
|
||||||
|
separator = len(lines)
|
||||||
|
header_lines = lines[: separator + 1]
|
||||||
|
body_lines = lines[separator + 1 :][:max_body_lines]
|
||||||
|
lines = header_lines + body_lines
|
||||||
|
stuffed = []
|
||||||
|
for line in lines:
|
||||||
|
if line.startswith(b"."):
|
||||||
|
line = b"." + line
|
||||||
|
stuffed.append(line)
|
||||||
|
stuffed.append(b".")
|
||||||
|
return b"\r\n".join(stuffed) + b"\r\n"
|
||||||
|
|
||||||
|
|
||||||
class IMAPBackend:
|
class IMAPBackend:
|
||||||
"""Minimal IMAP wrapper for backend mailbox access."""
|
"""Minimal IMAP wrapper for backend mailbox access."""
|
||||||
|
|
||||||
@@ -76,9 +111,13 @@ class IMAPBackend:
|
|||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
if Settings.BACKEND_IMAP_USE_SSL:
|
if Settings.BACKEND_IMAP_USE_SSL:
|
||||||
client = imaplib.IMAP4_SSL(Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT)
|
client = imaplib.IMAP4_SSL(
|
||||||
|
Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
client = imaplib.IMAP4(Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT)
|
client = imaplib.IMAP4(
|
||||||
|
Settings.BACKEND_IMAP_HOST, Settings.BACKEND_IMAP_PORT, timeout=30
|
||||||
|
)
|
||||||
if Settings.BACKEND_IMAP_USE_STARTTLS:
|
if Settings.BACKEND_IMAP_USE_STARTTLS:
|
||||||
client.starttls(ssl_context=ssl.create_default_context())
|
client.starttls(ssl_context=ssl.create_default_context())
|
||||||
return client
|
return client
|
||||||
@@ -116,12 +155,30 @@ class IMAPBackend:
|
|||||||
match = re.search(rb"RFC822\.SIZE (\d+)", response)
|
match = re.search(rb"RFC822\.SIZE (\d+)", response)
|
||||||
return int(match.group(1)) if match else 0
|
return int(match.group(1)) if match else 0
|
||||||
|
|
||||||
|
def fetch_all_sizes(self):
|
||||||
|
"""Return a {uid: size} mapping for all messages in one IMAP round-trip."""
|
||||||
|
sizes = {}
|
||||||
|
typ, data = self.connection.uid("fetch", "1:*", "(RFC822.SIZE)")
|
||||||
|
if typ != "OK" or not data:
|
||||||
|
return sizes
|
||||||
|
for response in data:
|
||||||
|
if isinstance(response, tuple):
|
||||||
|
response = response[0]
|
||||||
|
if not isinstance(response, bytes):
|
||||||
|
continue
|
||||||
|
match = self.UID_PATTERN.search(response)
|
||||||
|
if match:
|
||||||
|
sizes[match.group(1)] = int(match.group(2))
|
||||||
|
return sizes
|
||||||
|
|
||||||
def fetch_message(self, uid):
|
def fetch_message(self, uid):
|
||||||
typ, data = self.connection.uid("fetch", uid, "(RFC822)")
|
typ, data = self.connection.uid("fetch", uid, "(RFC822)")
|
||||||
if typ != "OK" or not data:
|
if typ != "OK" or not data:
|
||||||
raise RuntimeError("IMAP message fetch failed")
|
raise RuntimeError("IMAP message fetch failed")
|
||||||
parts = [chunk for chunk in data if isinstance(chunk, bytes)]
|
for chunk in data:
|
||||||
return b"\r\n".join(parts) + b"\r\n"
|
if isinstance(chunk, tuple) and len(chunk) > 1:
|
||||||
|
return chunk[1]
|
||||||
|
raise RuntimeError("IMAP message fetch returned no body")
|
||||||
|
|
||||||
def mark_deleted(self, uid):
|
def mark_deleted(self, uid):
|
||||||
self.connection.uid("STORE", uid, "+FLAGS.SILENT", "(\\Deleted)")
|
self.connection.uid("STORE", uid, "+FLAGS.SILENT", "(\\Deleted)")
|
||||||
@@ -179,6 +236,8 @@ class POP3Session:
|
|||||||
return await self.handle_list(args)
|
return await self.handle_list(args)
|
||||||
if command == "RETR":
|
if command == "RETR":
|
||||||
return await self.handle_retr(args)
|
return await self.handle_retr(args)
|
||||||
|
if command == "TOP":
|
||||||
|
return await self.handle_top(args)
|
||||||
if command == "DELE":
|
if command == "DELE":
|
||||||
return await self.handle_dele(args)
|
return await self.handle_dele(args)
|
||||||
if command == "NOOP":
|
if command == "NOOP":
|
||||||
@@ -220,29 +279,26 @@ class POP3Session:
|
|||||||
backend = IMAPBackend(username, password)
|
backend = IMAPBackend(username, password)
|
||||||
backend.login()
|
backend.login()
|
||||||
self._imap = backend
|
self._imap = backend
|
||||||
self._refresh_mailbox()
|
# Snapshot the maildrop once; it stays static for the session lifetime
|
||||||
|
# (RFC 1939), so DELE marks are not wiped by later STAT/LIST/UIDL.
|
||||||
def _refresh_mailbox(self):
|
self.message_ids = backend.list_uids()
|
||||||
self.message_ids = self._imap.list_uids()
|
|
||||||
self.deleted.clear()
|
|
||||||
|
|
||||||
async def handle_stat(self):
|
async def handle_stat(self):
|
||||||
self._require_auth()
|
self._require_auth()
|
||||||
self._refresh_mailbox()
|
sizes = await asyncio.to_thread(self._imap.fetch_all_sizes)
|
||||||
count = len(self.message_ids) - len(self.deleted)
|
count = len(self.message_ids) - len(self.deleted)
|
||||||
total_size = sum(self._imap.fetch_message_size(uid) for uid in self.message_ids if uid not in self.deleted)
|
total_size = sum(sizes.get(uid, 0) for uid in self.message_ids if uid not in self.deleted)
|
||||||
return await self.send_line(f"+OK {count} {total_size}")
|
return await self.send_line(f"+OK {count} {total_size}")
|
||||||
|
|
||||||
async def handle_list(self, args):
|
async def handle_list(self, args):
|
||||||
self._require_auth()
|
self._require_auth()
|
||||||
self._refresh_mailbox()
|
sizes = await asyncio.to_thread(self._imap.fetch_all_sizes)
|
||||||
if not args:
|
if not args:
|
||||||
lines = [f"+OK {len(self.message_ids)} messages"]
|
lines = [f"+OK {len(self.message_ids)} messages"]
|
||||||
for index, uid in enumerate(self.message_ids, start=1):
|
for index, uid in enumerate(self.message_ids, start=1):
|
||||||
if uid in self.deleted:
|
if uid in self.deleted:
|
||||||
continue
|
continue
|
||||||
size = self._imap.fetch_message_size(uid)
|
lines.append(f"{index} {sizes.get(uid, 0)}")
|
||||||
lines.append(f"{index} {size}")
|
|
||||||
lines.append(".")
|
lines.append(".")
|
||||||
return await self.send_lines(lines)
|
return await self.send_lines(lines)
|
||||||
if len(args) != 1 or not args[0].isdigit():
|
if len(args) != 1 or not args[0].isdigit():
|
||||||
@@ -253,8 +309,7 @@ class POP3Session:
|
|||||||
uid = self.message_ids[message_number - 1]
|
uid = self.message_ids[message_number - 1]
|
||||||
if uid in self.deleted:
|
if uid in self.deleted:
|
||||||
return await self.send_line("-ERR message deleted")
|
return await self.send_line("-ERR message deleted")
|
||||||
size = self._imap.fetch_message_size(uid)
|
return await self.send_line(f"+OK {message_number} {sizes.get(uid, 0)}")
|
||||||
return await self.send_line(f"+OK {message_number} {size}")
|
|
||||||
|
|
||||||
async def handle_retr(self, args):
|
async def handle_retr(self, args):
|
||||||
self._require_auth()
|
self._require_auth()
|
||||||
@@ -267,9 +322,25 @@ class POP3Session:
|
|||||||
if uid in self.deleted:
|
if uid in self.deleted:
|
||||||
return await self.send_line("-ERR message deleted")
|
return await self.send_line("-ERR message deleted")
|
||||||
message = await asyncio.to_thread(self._imap.fetch_message, uid)
|
message = await asyncio.to_thread(self._imap.fetch_message, uid)
|
||||||
|
payload = format_pop3_body(message)
|
||||||
await self.send_line(f"+OK {len(message)} octets")
|
await self.send_line(f"+OK {len(message)} octets")
|
||||||
await self.send_raw(message)
|
await self.send_raw(payload)
|
||||||
await self.send_line(".")
|
|
||||||
|
async def handle_top(self, args):
|
||||||
|
self._require_auth()
|
||||||
|
if len(args) != 2 or not args[0].isdigit() or not args[1].isdigit():
|
||||||
|
return await self.send_line("-ERR TOP requires message number and line count")
|
||||||
|
index = int(args[0])
|
||||||
|
line_count = int(args[1])
|
||||||
|
if index < 1 or index > len(self.message_ids):
|
||||||
|
return await self.send_line("-ERR no such message")
|
||||||
|
uid = self.message_ids[index - 1]
|
||||||
|
if uid in self.deleted:
|
||||||
|
return await self.send_line("-ERR message deleted")
|
||||||
|
message = await asyncio.to_thread(self._imap.fetch_message, uid)
|
||||||
|
payload = format_pop3_body(message, max_body_lines=line_count)
|
||||||
|
await self.send_line("+OK top of message follows")
|
||||||
|
await self.send_raw(payload)
|
||||||
|
|
||||||
async def handle_dele(self, args):
|
async def handle_dele(self, args):
|
||||||
self._require_auth()
|
self._require_auth()
|
||||||
@@ -284,7 +355,6 @@ class POP3Session:
|
|||||||
|
|
||||||
async def handle_uidl(self, args):
|
async def handle_uidl(self, args):
|
||||||
self._require_auth()
|
self._require_auth()
|
||||||
self._refresh_mailbox()
|
|
||||||
if not args:
|
if not args:
|
||||||
lines = ["+OK UID list follows"]
|
lines = ["+OK UID list follows"]
|
||||||
for index, uid in enumerate(self.message_ids, start=1):
|
for index, uid in enumerate(self.message_ids, start=1):
|
||||||
@@ -346,7 +416,6 @@ class SMTPProxyHandler:
|
|||||||
|
|
||||||
def send_message(self, sender, recipients, data):
|
def send_message(self, sender, recipients, data):
|
||||||
"""Forward a complete SMTP message to the backend SMTP server."""
|
"""Forward a complete SMTP message to the backend SMTP server."""
|
||||||
message = data.decode("utf-8", errors="replace")
|
|
||||||
if Settings.BACKEND_SMTP_USE_SSL:
|
if Settings.BACKEND_SMTP_USE_SSL:
|
||||||
smtp = smtplib.SMTP_SSL(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
|
smtp = smtplib.SMTP_SSL(Settings.BACKEND_SMTP_HOST, Settings.BACKEND_SMTP_PORT, timeout=30)
|
||||||
else:
|
else:
|
||||||
@@ -356,7 +425,7 @@ class SMTPProxyHandler:
|
|||||||
try:
|
try:
|
||||||
if Settings.BACKEND_SMTP_USER and Settings.BACKEND_SMTP_PASS:
|
if Settings.BACKEND_SMTP_USER and Settings.BACKEND_SMTP_PASS:
|
||||||
smtp.login(Settings.BACKEND_SMTP_USER, Settings.BACKEND_SMTP_PASS)
|
smtp.login(Settings.BACKEND_SMTP_USER, Settings.BACKEND_SMTP_PASS)
|
||||||
smtp.sendmail(sender, recipients, message)
|
smtp.sendmail(sender, recipients, data)
|
||||||
finally:
|
finally:
|
||||||
smtp.quit()
|
smtp.quit()
|
||||||
|
|
||||||
|
|||||||
+126
-7
@@ -1,15 +1,24 @@
|
|||||||
|
import asyncio
|
||||||
import imaplib
|
import imaplib
|
||||||
import smtplib
|
import smtplib
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from proxy_server import IMAPBackend, SMTPProxyHandler, Settings, env_bool
|
from proxy_server import (
|
||||||
|
IMAPBackend,
|
||||||
|
POP3Session,
|
||||||
|
SMTPProxyHandler,
|
||||||
|
Settings,
|
||||||
|
env_bool,
|
||||||
|
format_pop3_body,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class DummyIMAP:
|
class DummyIMAP:
|
||||||
def __init__(self, host, port):
|
def __init__(self, host, port, timeout=0):
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
|
self.timeout = timeout
|
||||||
self.logged_in = False
|
self.logged_in = False
|
||||||
self.selected = None
|
self.selected = None
|
||||||
|
|
||||||
@@ -23,9 +32,16 @@ class DummyIMAP:
|
|||||||
if command == "search":
|
if command == "search":
|
||||||
return "OK", [b"1 2 3"]
|
return "OK", [b"1 2 3"]
|
||||||
if command == "fetch" and args[1] == "(RFC822.SIZE)":
|
if command == "fetch" and args[1] == "(RFC822.SIZE)":
|
||||||
|
if args[0] == "1:*":
|
||||||
|
return "OK", [
|
||||||
|
b"1 (UID 1 RFC822.SIZE 1024)",
|
||||||
|
b"2 (UID 2 RFC822.SIZE 2048)",
|
||||||
|
b"3 (UID 3 RFC822.SIZE 512)",
|
||||||
|
]
|
||||||
return "OK", [(b"1 (RFC822.SIZE 1024)", b"")]
|
return "OK", [(b"1 (RFC822.SIZE 1024)", b"")]
|
||||||
if command == "fetch" and args[1] == "(RFC822)":
|
if command == "fetch" and args[1] == "(RFC822)":
|
||||||
return "OK", [b"1 (RFC822 {10}", b"Hello", b" World", b")"]
|
body = b"Subject: Hi\r\n\r\nHello World\r\n"
|
||||||
|
return "OK", [(b"1 (RFC822 {%d}" % len(body), body), b")"]
|
||||||
return "NO", []
|
return "NO", []
|
||||||
|
|
||||||
def logout(self):
|
def logout(self):
|
||||||
@@ -68,14 +84,17 @@ def test_settings_validate_succeeds_with_backends():
|
|||||||
|
|
||||||
|
|
||||||
def test_imap_backend_can_login_and_fetch(monkeypatch):
|
def test_imap_backend_can_login_and_fetch(monkeypatch):
|
||||||
monkeypatch.setattr(imaplib, "IMAP4_SSL", lambda host, port: DummyIMAP(host, port))
|
monkeypatch.setattr(
|
||||||
|
imaplib, "IMAP4_SSL", lambda host, port, timeout=0: DummyIMAP(host, port, timeout)
|
||||||
|
)
|
||||||
backend = IMAPBackend("user", "pass")
|
backend = IMAPBackend("user", "pass")
|
||||||
backend.login()
|
backend.login()
|
||||||
assert backend.connection.logged_in
|
assert backend.connection.logged_in
|
||||||
assert backend.connection.selected == "INBOX"
|
assert backend.connection.selected == "INBOX"
|
||||||
assert backend.list_uids() == [b"1", b"2", b"3"]
|
assert backend.list_uids() == [b"1", b"2", b"3"]
|
||||||
assert backend.fetch_message_size(b"1") == 1024
|
assert backend.fetch_message_size(b"1") == 1024
|
||||||
assert b"Hello" in backend.fetch_message(b"1")
|
assert backend.fetch_all_sizes() == {b"1": 1024, b"2": 2048, b"3": 512}
|
||||||
|
assert b"Hello World" in backend.fetch_message(b"1")
|
||||||
|
|
||||||
|
|
||||||
def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
|
def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
|
||||||
@@ -107,15 +126,115 @@ def test_smtp_proxy_handler_forwards_message_over_ssl(monkeypatch):
|
|||||||
Settings.BACKEND_SMTP_USER = "smtp-user"
|
Settings.BACKEND_SMTP_USER = "smtp-user"
|
||||||
Settings.BACKEND_SMTP_PASS = "smtp-pass"
|
Settings.BACKEND_SMTP_PASS = "smtp-pass"
|
||||||
|
|
||||||
|
# Include an 8-bit byte that would be mangled by a utf-8 decode.
|
||||||
|
raw = b"Subject: Test\r\n\r\nBody \x80\r\n"
|
||||||
handler = SMTPProxyHandler()
|
handler = SMTPProxyHandler()
|
||||||
handler.send_message("from@example.com", ["to@example.com"], b"Subject: Test\r\n\r\nBody\r\n")
|
handler.send_message("from@example.com", ["to@example.com"], raw)
|
||||||
|
|
||||||
assert captured["instance"].host == Settings.BACKEND_SMTP_HOST
|
assert captured["instance"].host == Settings.BACKEND_SMTP_HOST
|
||||||
assert captured["instance"].logged_in is True
|
assert captured["instance"].logged_in is True
|
||||||
assert captured["instance"].sent[0] == "from@example.com"
|
assert captured["instance"].sent[0] == "from@example.com"
|
||||||
assert captured["instance"].sent[1] == ["to@example.com"]
|
assert captured["instance"].sent[1] == ["to@example.com"]
|
||||||
assert "Subject: Test" in captured["instance"].sent[2]
|
# Raw bytes must be forwarded unchanged, not decoded.
|
||||||
|
assert captured["instance"].sent[2] == raw
|
||||||
|
|
||||||
Settings.BACKEND_SMTP_USE_SSL = previous_ssl
|
Settings.BACKEND_SMTP_USE_SSL = previous_ssl
|
||||||
Settings.BACKEND_SMTP_USER = previous_user
|
Settings.BACKEND_SMTP_USER = previous_user
|
||||||
Settings.BACKEND_SMTP_PASS = previous_pass
|
Settings.BACKEND_SMTP_PASS = previous_pass
|
||||||
|
|
||||||
|
|
||||||
|
class FakeWriter:
|
||||||
|
def __init__(self):
|
||||||
|
self.buffer = bytearray()
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
self.buffer.extend(data)
|
||||||
|
|
||||||
|
async def drain(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def wait_closed(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class RecordingIMAP:
|
||||||
|
"""In-memory IMAPBackend stand-in for POP3 session tests."""
|
||||||
|
|
||||||
|
def __init__(self, uids):
|
||||||
|
self.uids = list(uids)
|
||||||
|
self.marked = []
|
||||||
|
self.expunged = False
|
||||||
|
self.logged_out = False
|
||||||
|
|
||||||
|
def list_uids(self):
|
||||||
|
return list(self.uids)
|
||||||
|
|
||||||
|
def fetch_all_sizes(self):
|
||||||
|
return {uid: 100 for uid in self.uids}
|
||||||
|
|
||||||
|
def fetch_message(self, uid):
|
||||||
|
return b"Subject: Hi\r\nHeader: 1\r\n\r\n.dotted line\r\nsecond\r\nthird\r\n"
|
||||||
|
|
||||||
|
def mark_deleted(self, uid):
|
||||||
|
self.marked.append(uid)
|
||||||
|
|
||||||
|
def expunge(self):
|
||||||
|
self.expunged = True
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
self.logged_out = True
|
||||||
|
|
||||||
|
|
||||||
|
def make_session(uids):
|
||||||
|
session = POP3Session(None, FakeWriter())
|
||||||
|
session._imap = RecordingIMAP(uids)
|
||||||
|
session.message_ids = session._imap.list_uids()
|
||||||
|
return session
|
||||||
|
|
||||||
|
|
||||||
|
def test_format_pop3_body_dot_stuffs_and_terminates():
|
||||||
|
payload = format_pop3_body(b".secret\r\nplain\r\n")
|
||||||
|
assert payload == b"..secret\r\nplain\r\n.\r\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_format_pop3_body_top_keeps_headers_and_limits_body():
|
||||||
|
message = b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\nline2\r\nline3\r\n"
|
||||||
|
payload = format_pop3_body(message, max_body_lines=1)
|
||||||
|
assert payload == b"Subject: Hi\r\nHeader: 1\r\n\r\nline1\r\n.\r\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_retr_preserves_leading_dot_line():
|
||||||
|
session = make_session([b"1", b"2"])
|
||||||
|
asyncio.run(session.handle_retr(["1"]))
|
||||||
|
output = bytes(session.writer.buffer)
|
||||||
|
assert b"+OK" in output
|
||||||
|
# The body line ".dotted line" must survive intact after dot-stuffing/parse.
|
||||||
|
assert b"\r\n..dotted line\r\n" in output
|
||||||
|
assert output.endswith(b".\r\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_top_returns_headers_and_limited_body():
|
||||||
|
session = make_session([b"1"])
|
||||||
|
asyncio.run(session.handle_top(["1", "1"]))
|
||||||
|
output = bytes(session.writer.buffer)
|
||||||
|
assert output.startswith(b"+OK top of message follows\r\n")
|
||||||
|
assert b"Subject: Hi\r\n" in output
|
||||||
|
assert b"\r\n..dotted line\r\n" in output
|
||||||
|
assert b"second" not in output
|
||||||
|
assert output.endswith(b".\r\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_dele_survives_stat_list_uidl_until_quit():
|
||||||
|
session = make_session([b"1", b"2", b"3"])
|
||||||
|
asyncio.run(session.handle_dele(["2"]))
|
||||||
|
# Multidrop status commands must not clear the deletion mark.
|
||||||
|
asyncio.run(session.handle_stat())
|
||||||
|
asyncio.run(session.handle_list([]))
|
||||||
|
asyncio.run(session.handle_uidl([]))
|
||||||
|
assert b"2" in session.deleted
|
||||||
|
asyncio.run(session.handle_quit())
|
||||||
|
assert session._imap.marked == [b"2"]
|
||||||
|
assert session._imap.expunged is True
|
||||||
|
|||||||
Reference in New Issue
Block a user