# arniemailbufferserver v3 # copyright 2021 Andrew Stuart andrew.stuart@supercoders.com.au # MIT licensed import asyncio import email import email.utils import json import logging import os import re import signal import shutil import sys import time import uuid from configparser import ConfigParser from dataclasses import dataclass from functools import partial from pathlib import Path from typing import Dict, List, Optional, Set import aiofiles import aiosmtplib from aiosmtpd.smtp import SMTP as SMTPProtocol from aiosmtpd.handlers import Handler from aiosmtpd.smtp import Envelope, Session # --- Structured Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - [%(levelname)s] - %(name)s - %(message)s' ) logger = logging.getLogger(__name__) # --- Constants --- MAX_MESSAGE_SIZE_BYTES = 100 * 1024 * 1024 # 100 MB DEFAULT_QUEUE_SIZE = 1000 MAX_RETRY_ATTEMPTS = 5 INITIAL_RETRY_DELAY = 60 MAX_RETRY_DELAY = 3600 # --- Rate Limiting --- # All access is on the single main event loop -- no locking needed. class RateLimiter: def __init__(self, max_messages_per_hour: int = 100, max_bytes_per_hour: int = 50 * 1024 * 1024): self.max_messages = max_messages_per_hour self.max_bytes = max_bytes_per_hour self.clients: Dict[str, dict] = {} self.cleanup_interval = 3600 self.last_cleanup = time.time() def _cleanup_old_entries(self): now = time.time() if now - self.last_cleanup > self.cleanup_interval: cutoff = now - 3600 for client_ip in list(self.clients.keys()): client_data = self.clients[client_ip] client_data['messages'] = [(t, s) for t, s in client_data['messages'] if t > cutoff] if not client_data['messages']: del self.clients[client_ip] self.last_cleanup = now def check(self, client_ip: str, message_size: int) -> bool: """Check if a message can be accepted WITHOUT recording it.""" self._cleanup_old_entries() now = time.time() cutoff = now - 3600 if client_ip not in self.clients: return True client_data = self.clients[client_ip] recent = [(t, s) for t, s in client_data['messages'] if t > cutoff] current_count = len(recent) current_bytes = sum(s for _, s in recent) return current_count < self.max_messages and current_bytes + message_size <= self.max_bytes def record(self, client_ip: str, message_size: int): """Record a successfully accepted message. Call only after confirmed queued.""" now = time.time() if client_ip not in self.clients: self.clients[client_ip] = {'messages': []} self.clients[client_ip]['messages'].append((now, message_size)) # --- Input Validation --- def is_valid_uuid(value: str) -> bool: try: uuid.UUID(value) return True except ValueError: return False def sanitize_filename(filename: str) -> str: base = os.path.basename(filename) sanitized = re.sub(r'[^a-zA-Z0-9\-\.]', '', base) return sanitized or 'invalid' def validate_email_address(email_addr: str) -> bool: """Permissive validation -- accepts localhost, IP literals, apostrophes, etc.""" if not email_addr or len(email_addr) > 254: return False if any(char in email_addr for char in ['\n', '\r', '\0']): return False parsed = email.utils.parseaddr(email_addr) if not parsed[1]: return False addr = parsed[1] if '@' not in addr: return False local, _, domain = addr.rpartition('@') if not local or not domain: return False if len(local) > 64 or len(domain) > 253: return False return True def validate_message_content(content: bytes) -> bool: """Basic size validation. Allows null bytes for BINARYMIME compatibility.""" if not content or len(content) > MAX_MESSAGE_SIZE_BYTES: return False return True # --- Retry Configuration --- @dataclass class RetryConfig: max_attempts: int = MAX_RETRY_ATTEMPTS initial_delay: int = INITIAL_RETRY_DELAY max_delay: int = MAX_RETRY_DELAY backoff_multiplier: float = 2.0 # --- Circuit Breaker --- # All access is on the single main event loop -- no locking needed. class CircuitBreaker: def __init__(self, failure_threshold: int = 5, timeout: int = 300): self.failure_threshold = failure_threshold self.timeout = timeout self.failure_count = 0 self.last_failure_time = 0.0 self.state = 'CLOSED' def can_execute(self) -> bool: if self.state == 'CLOSED': return True elif self.state == 'OPEN': if time.time() - self.last_failure_time >= self.timeout: self.state = 'HALF_OPEN' return True return False else: return True def record_success(self): self.failure_count = 0 self.state = 'CLOSED' def record_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.state == 'HALF_OPEN' or (self.state == 'CLOSED' and self.failure_count >= self.failure_threshold): self.state = 'OPEN' # --- Task Manager for Proper Cleanup --- class TaskManager: def __init__(self): self.tasks: Set[asyncio.Task] = set() def create_task(self, coro) -> asyncio.Task: task = asyncio.create_task(coro) self.tasks.add(task) task.add_done_callback(self.tasks.discard) return task async def cancel_all(self): if self.tasks: logger.info(f"Cancelling {len(self.tasks)} background tasks...") for task in list(self.tasks): if not task.done(): task.cancel() try: await asyncio.gather(*self.tasks, return_exceptions=True) except Exception as e: logger.error(f"Error during task cancellation: {e}") finally: self.tasks.clear() logger.info("All background tasks cancelled.") # --- Async File Operations (non-blocking) --- async def atomic_write(filepath: Path, content: bytes): """Write content atomically: write to .tmp then rename. Relies on os.rename being atomic on the same filesystem (POSIX guarantee).""" temp_path = filepath.with_suffix(filepath.suffix + '.tmp') try: async with aiofiles.open(temp_path, 'wb') as f: await f.write(content) await asyncio.to_thread(os.rename, temp_path, filepath) except Exception as e: try: exists = await asyncio.to_thread(temp_path.exists) if exists: await asyncio.to_thread(os.unlink, temp_path) except OSError: pass raise e async def atomic_move(src: Path, dest: Path): """Atomic file move via os.rename. src and dest MUST be on the same filesystem.""" await asyncio.to_thread(dest.parent.mkdir, parents=True, exist_ok=True) try: await asyncio.to_thread(os.rename, src, dest) except OSError as e: logger.error(f"Failed to move {src} -> {dest}: {e}") raise async def safe_unlink(path: Path): try: exists = await asyncio.to_thread(path.exists) if exists: await asyncio.to_thread(os.unlink, path) except Exception as e: logger.error(f"Failed to delete file {path}: {e}") async def path_exists(path: Path) -> bool: return await asyncio.to_thread(path.exists) # --- Envelope Metadata Persistence --- # The SMTP envelope (MAIL FROM / RCPT TO) can differ from message headers. # We persist it alongside the .eml so recovery after crash does not lose routing info. # Retry count is also persisted so poison messages eventually reach the failed directory. async def save_envelope_meta(dirs: Dict[str, Path], message_id: str, mail_from: str, rcpt_tos: List[str], retry_count: int): meta = {'mail_from': mail_from, 'rcpt_tos': rcpt_tos, 'retry_count': retry_count} meta_path = dirs['outbox'] / f"{sanitize_filename(message_id)}.meta.json" await atomic_write(meta_path, json.dumps(meta).encode('utf-8')) async def load_envelope_meta(meta_path: Path) -> Optional[dict]: try: async with aiofiles.open(meta_path, 'r') as f: return json.loads(await f.read()) except Exception: return None def meta_name_for(message_id: str) -> str: return f"{sanitize_filename(message_id)}.meta.json" # --- Configuration with Validation --- def load_config() -> ConfigParser: config = ConfigParser() config['server'] = { 'host': '127.0.0.1', 'port': '8025', 'files_directory': '~/.arniesmtpbufferserver', 'max_message_size': str(MAX_MESSAGE_SIZE_BYTES), 'queue_size': str(DEFAULT_QUEUE_SIZE), 'disk_health_check_interval': '30', 'disk_high_watermark_percent': '95', 'disk_low_watermark_percent': '90', 'max_messages_per_hour': '100', 'max_bytes_per_hour': str(50 * 1024 * 1024), } config['sender'] = { 'smtp_host': '127.0.0.1', 'smtp_port': '1025', 'smtp_user': '', 'smtp_password': '', 'smtp_timeout': '60', 'use_tls': 'false', 'start_tls': 'true', 'save_sent_mail': 'true', 'max_retry_attempts': str(MAX_RETRY_ATTEMPTS), 'initial_retry_delay': str(INITIAL_RETRY_DELAY), 'max_retry_delay': str(MAX_RETRY_DELAY), 'circuit_breaker_threshold': '5', 'circuit_breaker_timeout': '300', 'max_concurrent_retries': '10', } script_dir = Path(__file__).resolve().parent config_path = script_dir / 'config.ini' if config_path.exists(): config.read(config_path) else: with open(config_path, 'w') as f: config.write(f) logger.info(f"Created default config at {config_path}") use_tls = config.getboolean('sender', 'use_tls') start_tls = config.getboolean('sender', 'start_tls') if use_tls and start_tls: logger.error("Config error: use_tls and start_tls cannot both be true. Disabling start_tls.") config.set('sender', 'start_tls', 'false') files_dir_raw = config.get('server', 'files_directory') config.set('server', 'files_directory', os.path.expanduser(files_dir_raw)) return config # --- Health State with Metrics --- # Single event loop -- plain attributes, no locks. class HealthState: def __init__(self): self._is_healthy = True self.metrics = { 'messages_received': 0, 'messages_sent': 0, 'messages_failed': 0, 'queue_size': 0, 'disk_usage_percent': 0.0, 'inode_usage_percent': 0.0, } def get_is_healthy(self) -> bool: return self._is_healthy def set_is_healthy(self, value: bool) -> None: if self._is_healthy != value: logger.warning(f"Health state changed to: {'HEALTHY' if value else 'UNHEALTHY'}") self._is_healthy = value def update_metric(self, key: str, value): self.metrics[key] = value def increment_metric(self, key: str): self.metrics[key] = self.metrics.get(key, 0) + 1 def get_metrics(self) -> dict: return self.metrics.copy() # --- Disk Health Monitor --- class DiskHealthMonitor: def __init__(self, config: ConfigParser, health_state: HealthState, task_manager: TaskManager, shutdown_event: asyncio.Event): self.path = Path(config.get('server', 'files_directory')) self.interval = config.getint('server', 'disk_health_check_interval') self.high_watermark = config.getint('server', 'disk_high_watermark_percent') self.low_watermark = config.getint('server', 'disk_low_watermark_percent') self.health_state = health_state self.task_manager = task_manager self.shutdown_event = shutdown_event async def run(self): logger.info(f"Disk health monitor started for {self.path}.") while not self.shutdown_event.is_set(): try: usage = await asyncio.to_thread(shutil.disk_usage, self.path) percent_used = (usage.used / usage.total) * 100 self.health_state.update_metric('disk_usage_percent', round(percent_used, 2)) worst = percent_used try: stat = await asyncio.to_thread(os.statvfs, self.path) if stat.f_files > 0: inode_pct = ((stat.f_files - stat.f_ffree) / stat.f_files) * 100 self.health_state.update_metric('inode_usage_percent', round(inode_pct, 2)) worst = max(worst, inode_pct) except (OSError, AttributeError): pass is_healthy = self.health_state.get_is_healthy() if worst >= self.high_watermark and is_healthy: logger.warning(f"Resource usage ({worst:.1f}%) exceeds high watermark ({self.high_watermark}%). Marking unhealthy.") self.health_state.set_is_healthy(False) elif worst < self.low_watermark and not is_healthy: logger.info(f"Resource usage ({worst:.1f}%) below low watermark ({self.low_watermark}%). Marking healthy.") self.health_state.set_is_healthy(True) await asyncio.sleep(self.interval) except FileNotFoundError: logger.error(f"Monitored directory {self.path} not found. Stopping monitor.") self.health_state.set_is_healthy(False) break except Exception as e: logger.error(f"Error in disk health monitor: {e}") await asyncio.sleep(self.interval) # --- Periodic Metrics Logger --- class MetricsLogger: def __init__(self, health_state: HealthState, shutdown_event: asyncio.Event, interval: int = 300): self.health_state = health_state self.shutdown_event = shutdown_event self.interval = interval async def run(self): while not self.shutdown_event.is_set(): try: await asyncio.sleep(self.interval) logger.info(f"Metrics: {self.health_state.get_metrics()}") except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in metrics logger: {e}") # --- SMTP Handler --- class BufferSMTPHandler(Handler): def __init__(self, dirs: Dict[str, Path], health_state: HealthState, sender_queue: asyncio.Queue, rate_limiter: RateLimiter): self.dirs = dirs self.health_state = health_state self.sender_queue = sender_queue self.rate_limiter = rate_limiter super().__init__() async def handle_RCPT(self, server, session, envelope, address, rcpt_options): if not self.health_state.get_is_healthy(): return '452 4.3.1 Insufficient system storage' if not validate_email_address(address): return '550 5.1.1 Invalid recipient address' if self.sender_queue.full(): return '452 4.3.1 Server busy, try again later' # Do NOT manually append -- aiosmtpd appends on 250 return '250 OK' async def handle_DATA(self, server, session: Session, envelope: Envelope): if not self.health_state.get_is_healthy(): return '452 4.3.1 Insufficient system storage' client_ip = session.peer[0] if session.peer else 'unknown' if envelope.mail_from and not validate_email_address(envelope.mail_from): return '550 5.1.7 Invalid sender address' if not validate_message_content(envelope.content): return '554 5.6.0 Invalid message content' if not self.rate_limiter.check(client_ip, len(envelope.content)): return '450 4.7.1 Rate limit exceeded, try again later' message_id = str(uuid.uuid4()) filename = f"{sanitize_filename(message_id)}.eml" final_filepath = self.dirs['outbox'] / filename try: await atomic_write(final_filepath, envelope.content) valid_rcpts = [rcpt for rcpt in envelope.rcpt_tos if validate_email_address(rcpt)] if not valid_rcpts: await safe_unlink(final_filepath) return '550 5.1.1 No valid recipients' await save_envelope_meta(self.dirs, message_id, envelope.mail_from, valid_rcpts, 0) queue_item = (message_id, envelope.mail_from, valid_rcpts, 0) try: self.sender_queue.put_nowait(queue_item) except asyncio.QueueFull: await safe_unlink(final_filepath) await safe_unlink(self.dirs['outbox'] / meta_name_for(message_id)) return '452 4.3.1 Queue full, try again later' # Record rate limit only AFTER message is confirmed in queue self.rate_limiter.record(client_ip, len(envelope.content)) self.health_state.increment_metric('messages_received') self.health_state.update_metric('queue_size', self.sender_queue.qsize()) logger.info(f"Message {message_id} queued ({len(envelope.content)} bytes)") return '250 OK: Message accepted for delivery' except Exception as e: logger.error(f"Error handling message {message_id}: {e}") await safe_unlink(final_filepath) await safe_unlink(self.dirs['outbox'] / meta_name_for(message_id)) return '451 4.3.0 Temporary failure' # --- Sender --- class Sender: def __init__(self, config: ConfigParser, dirs: Dict[str, Path], shutdown_event: asyncio.Event, queue: asyncio.Queue, health_state: HealthState, task_manager: TaskManager): self.config = config['sender'] self.dirs = dirs self.shutdown_event = shutdown_event self.queue = queue self.health_state = health_state self.task_manager = task_manager self.retry_config = RetryConfig( max_attempts=self.config.getint('max_retry_attempts'), initial_delay=self.config.getint('initial_retry_delay'), max_delay=self.config.getint('max_retry_delay') ) self.circuit_breaker = CircuitBreaker( failure_threshold=self.config.getint('circuit_breaker_threshold'), timeout=self.config.getint('circuit_breaker_timeout') ) self.smtp_settings = { 'hostname': self.config['smtp_host'], 'port': self.config.getint('smtp_port'), 'username': self.config.get('smtp_user') or None, 'password': self.config.get('smtp_password') or None, 'timeout': self.config.getint('smtp_timeout'), 'use_tls': self.config.getboolean('use_tls'), 'start_tls': self.config.getboolean('start_tls') } self.retry_semaphore = asyncio.Semaphore(self.config.getint('max_concurrent_retries')) async def run(self) -> None: logger.info("Sender started with circuit breaker and retry limits") while not self.shutdown_event.is_set(): try: queue_item = await asyncio.wait_for(self.queue.get(), timeout=1.0) message_id, mail_from, rcpt_tos, retry_count = queue_item self.task_manager.create_task( self.process_one_file(message_id, mail_from, rcpt_tos, retry_count) ) self.health_state.update_metric('queue_size', self.queue.qsize()) except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Error in sender main loop: {e}") logger.info("Sender run loop finished.") async def recover_messages_on_startup(self) -> None: """Recover queued messages from a previous session. Uses persisted .meta.json for envelope info and retry count. Falls back to header parsing if no metadata file exists.""" outbox_dir = self.dirs['outbox'] if not await path_exists(outbox_dir): return recovered = 0 skipped = 0 logger.info("Starting recovery of messages from outbox...") try: filenames = await asyncio.to_thread(os.listdir, outbox_dir) eml_files = sorted(f for f in filenames if f.endswith('.eml')) for filename in eml_files: filepath = outbox_dir / filename message_id = filename[:-4] if not is_valid_uuid(message_id): logger.warning(f"Invalid message ID {message_id}, moving to failed.") try: await atomic_move(filepath, self.dirs['failed'] / filename) except Exception: pass continue meta_path = outbox_dir / meta_name_for(message_id) meta = await load_envelope_meta(meta_path) if meta and meta.get('mail_from') and meta.get('rcpt_tos'): mail_from = meta['mail_from'] rcpt_tos = meta['rcpt_tos'] retry_count = meta.get('retry_count', 0) else: # Fallback: parse headers (lossy -- envelope != headers) try: async with aiofiles.open(filepath, 'rb') as f: content = await f.read() if not validate_message_content(content): logger.warning(f"Invalid content in {filename}, moving to failed.") await atomic_move(filepath, self.dirs['failed'] / filename) continue msg = email.message_from_bytes(content) mail_from = msg.get('Return-Path', '').strip('<>') or msg.get('From', '') if not validate_email_address(mail_from): logger.error(f"Invalid sender in {filename}, moving to failed.") await atomic_move(filepath, self.dirs['failed'] / filename) continue rcpt_tos = [] for hdr in ['To', 'Cc']: # Don't parse Bcc -- usually stripped val = msg.get(hdr) if val: for _, addr in email.utils.getaddresses([val]): if validate_email_address(addr): rcpt_tos.append(addr) if not rcpt_tos: logger.warning(f"No valid recipients in {filename}, moving to failed.") await atomic_move(filepath, self.dirs['failed'] / filename) continue retry_count = 0 except Exception as e: logger.error(f"Failed to recover {filename}: {e}") try: await atomic_move(filepath, self.dirs['failed'] / filename) except Exception: pass continue try: self.queue.put_nowait((message_id, mail_from, rcpt_tos, retry_count)) recovered += 1 except asyncio.QueueFull: skipped += 1 logger.warning(f"Queue full, {filename} stays in outbox for next send cycle.") except Exception as e: logger.error(f"Error during message recovery: {e}") if recovered: logger.info(f"Recovered {recovered} messages.") if skipped: logger.warning(f"{skipped} messages remain in outbox (queue full).") if not recovered and not skipped: logger.info("No messages to recover from outbox.") async def process_one_file(self, message_id: str, mail_from: str, rcpt_tos: List[str], retry_count: int) -> None: if not is_valid_uuid(message_id): logger.error(f"Invalid message ID {message_id}, skipping.") return filename = f"{sanitize_filename(message_id)}.eml" mfn = meta_name_for(message_id) outbox_path = self.dirs['outbox'] / filename processing_path = self.dirs['processing'] / filename if not await path_exists(outbox_path): logger.warning(f"File {filename} not in outbox, likely already processed.") return # Atomic move into processing/ -- this IS the dequeue operation try: await atomic_move(outbox_path, processing_path) outbox_meta = self.dirs['outbox'] / mfn processing_meta = self.dirs['processing'] / mfn if await path_exists(outbox_meta): await atomic_move(outbox_meta, processing_meta) except Exception as e: logger.error(f"Could not move {message_id} to processing: {e}") return if not self.circuit_breaker.can_execute(): logger.warning(f"Circuit breaker OPEN, delaying message {message_id}") await self._handle_failure(processing_path, message_id, mail_from, rcpt_tos, retry_count, is_send_failure=False) return try: success = await self._send_message(processing_path, mail_from, rcpt_tos) if success: self.circuit_breaker.record_success() await self._handle_success(processing_path, message_id) else: self.circuit_breaker.record_failure() await self._handle_failure(processing_path, message_id, mail_from, rcpt_tos, retry_count) except Exception as e: logger.error(f"Error processing message {message_id}: {e}") self.circuit_breaker.record_failure() await self._handle_failure(processing_path, message_id, mail_from, rcpt_tos, retry_count) async def _send_message(self, msg_path: Path, mail_from: str, rcpt_tos: List[str]) -> bool: try: async with aiofiles.open(msg_path, 'rb') as f: message_content = await f.read() await aiosmtplib.send( message_content, sender=mail_from, recipients=rcpt_tos, **self.smtp_settings ) self.health_state.increment_metric('messages_sent') return True except Exception as e: logger.warning(f"Failed to send {msg_path.name}: {e}") self.health_state.increment_metric('messages_failed') return False async def _handle_success(self, msg_path: Path, message_id: str) -> None: mfn = meta_name_for(message_id) try: logger.info(f"Successfully sent message {message_id}") if self.config.getboolean('save_sent_mail'): await atomic_move(msg_path, self.dirs['sent'] / msg_path.name) else: await safe_unlink(msg_path) await safe_unlink(self.dirs['processing'] / mfn) except Exception as e: logger.error(f"Error handling success for {message_id}: {e}") async def _handle_failure(self, msg_path: Path, message_id: str, mail_from: str, rcpt_tos: List[str], retry_count: int, is_send_failure: bool = True) -> None: mfn = meta_name_for(message_id) try: if is_send_failure: retry_count += 1 if retry_count >= self.retry_config.max_attempts: await atomic_move(msg_path, self.dirs['failed'] / msg_path.name) processing_meta = self.dirs['processing'] / mfn if await path_exists(processing_meta): await atomic_move(processing_meta, self.dirs['failed'] / mfn) logger.error(f"Message {message_id} failed after {retry_count} attempts, moved to failed.") else: await atomic_move(msg_path, self.dirs['outbox'] / msg_path.name) await save_envelope_meta(self.dirs, message_id, mail_from, rcpt_tos, retry_count) processing_meta = self.dirs['processing'] / mfn if await path_exists(processing_meta): await safe_unlink(processing_meta) await self._schedule_retry(message_id, mail_from, rcpt_tos, retry_count) except Exception as e: logger.error(f"Error handling failure for {message_id}: {e}") async def _schedule_retry(self, message_id: str, mail_from: str, rcpt_tos: List[str], retry_count: int) -> None: delay = min( self.retry_config.initial_delay * (self.retry_config.backoff_multiplier ** retry_count), self.retry_config.max_delay ) logger.info(f"Retry {retry_count}/{self.retry_config.max_attempts} for {message_id} in {delay:.0f}s") self.task_manager.create_task( self._delayed_requeue(message_id, mail_from, rcpt_tos, retry_count, delay) ) async def _delayed_requeue(self, message_id: str, mail_from: str, rcpt_tos: List[str], retry_count: int, delay: float) -> None: """Wait then re-queue. Uses put_nowait to avoid blocking on shutdown.""" async with self.retry_semaphore: try: await asyncio.sleep(delay) if not self.shutdown_event.is_set(): try: self.queue.put_nowait((message_id, mail_from, rcpt_tos, retry_count)) except asyncio.QueueFull: logger.warning(f"Queue full during requeue of {message_id}, leaving in outbox.") except asyncio.CancelledError: logger.debug(f"Requeue cancelled for {message_id}") except Exception as e: logger.error(f"Error during delayed requeue for {message_id}: {e}") # --- Main Application --- async def main(): try: config = load_config() base_dir = Path(config.get('server', 'files_directory')) base_dir.mkdir(parents=True, exist_ok=True) dirs = { 'outbox': base_dir / 'outbox', 'processing': base_dir / 'processing', 'sent': base_dir / 'sent', 'failed': base_dir / 'failed', } for dir_path in dirs.values(): dir_path.mkdir(exist_ok=True) shutdown_event = asyncio.Event() task_manager = TaskManager() health_state = HealthState() rate_limiter = RateLimiter( max_messages_per_hour=config.getint('server', 'max_messages_per_hour'), max_bytes_per_hour=config.getint('server', 'max_bytes_per_hour') ) queue_size = config.getint('server', 'queue_size') sender_queue = asyncio.Queue(maxsize=queue_size) def signal_handler(): if not shutdown_event.is_set(): logger.info("Shutdown signal received") shutdown_event.set() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, signal_handler) smtp_server = None try: sender = Sender(config, dirs, shutdown_event, sender_queue, health_state, task_manager) disk_monitor = DiskHealthMonitor(config, health_state, task_manager, shutdown_event) metrics_logger = MetricsLogger(health_state, shutdown_event) await sender.recover_messages_on_startup() task_manager.create_task(sender.run()) task_manager.create_task(disk_monitor.run()) task_manager.create_task(metrics_logger.run()) handler = BufferSMTPHandler(dirs, health_state, sender_queue, rate_limiter) listen_host = config.get('server', 'host') listen_port = config.getint('server', 'port') data_size_limit = config.getint('server', 'max_message_size') # Run SMTP protocol directly on the main event loop -- no threads smtp_factory = partial( SMTPProtocol, handler, data_size_limit=data_size_limit, hostname=listen_host, ) smtp_server = await loop.create_server(smtp_factory, host=listen_host, port=listen_port) logger.info(f"SMTP server listening on {listen_host}:{listen_port}") await shutdown_event.wait() finally: logger.info("Shutting down...") if smtp_server is not None: smtp_server.close() await smtp_server.wait_closed() await task_manager.cancel_all() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.remove_signal_handler(sig) except Exception: pass try: logger.info(f"Final metrics: {health_state.get_metrics()}") except Exception as e: logger.error(f"Error getting final metrics: {e}") logger.info("Shutdown complete") except Exception as e: logger.critical(f"Fatal error in main: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Interrupted by user")