import argparse import os import sys import json import time import asyncio import sqlite3 import logging import mimetypes from datetime import datetime, timedelta, timezone from enum import Enum from typing import List, Dict, Optional, Union, Any from pathlib import Path from urllib.parse import unquote, urlparse # Third-party libraries import nest_asyncio from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP, Context from mcp.types import Annotations, TextContent, ToolAnnotations from mcp.shared.exceptions import McpError from pythonjsonlogger import jsonlogger from telethon import TelegramClient, functions, types, utils from telethon.sessions import StringSession from telethon.tl.types import ( User, Chat, Channel, ChatAdminRights, ChatBannedRights, ChannelParticipantsKicked, ChannelParticipantsAdmins, InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty, InputPeerUser, InputPeerChat, InputPeerChannel, DialogFilter, DialogFilterChatlist, DialogFilterDefault, TextWithEntities, ) import re from functools import wraps import telethon.errors.rpcerrorlist from sanitize import sanitize_user_content, sanitize_name, sanitize_dict, format_tool_result class ValidationError(Exception): """Custom exception for validation errors.""" pass def json_serializer(obj): """Helper function to convert non-serializable objects for JSON serialization.""" if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, bytes): return obj.decode("utf-8", errors="replace") # Add other non-serializable types as needed raise TypeError(f"Object of type {type(obj)} is not JSON serializable") def get_entity_type(entity: Any) -> str: """Return a normalized, human-readable chat/entity type.""" if isinstance(entity, User): return "User" if isinstance(entity, Chat): return "Group (Basic)" if isinstance(entity, Channel): if getattr(entity, "megagroup", False): return "Supergroup" return "Channel" if getattr(entity, "broadcast", False) else "Group" return type(entity).__name__ def get_marked_id(entity: Any) -> int: """Return a Telethon-compatible marked ID for an entity.""" if isinstance(entity, Channel): return -1000000000000 - entity.id if isinstance(entity, Chat): return -entity.id return entity.id def get_entity_filter_type(entity: Any) -> Optional[str]: """Return list_chats-compatible filter type: user/group/channel.""" entity_type = get_entity_type(entity) if entity_type == "User": return "user" if entity_type in ("Group (Basic)", "Group", "Supergroup"): return "group" if entity_type == "Channel": return "channel" return None load_dotenv() TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID")) TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH") mcp = FastMCP("telegram") # Annotate all tool results with audience=["user"] so MCP clients know # the content is user-generated data, not instructions for the model. # We wrap the low-level request handler (after FastMCP registers it) to inject # annotations into the final CallToolResult, preserving structured output. _USER_AUDIENCE = Annotations(audience=["user"]) def _install_annotation_hook() -> None: from mcp.types import CallToolRequest, ServerResult, CallToolResult original_handler = mcp._mcp_server.request_handlers[CallToolRequest] async def annotated_handler(req): response = await original_handler(req) if isinstance(response, ServerResult) and isinstance(response.root, CallToolResult): content = response.root.content if content: response.root.content = [ ( block.model_copy(update={"annotations": _USER_AUDIENCE}) if isinstance(block, TextContent) and block.annotations is None else block ) for block in content ] return response mcp._mcp_server.request_handlers[CallToolRequest] = annotated_handler _install_annotation_hook() _EXPOSED_TOOLS_MODES = {"all", "read-only"} def _get_exposed_tools_mode(value: Optional[str] = None) -> str: """Return the configured MCP tool exposure mode. ``TELEGRAM_EXPOSED_TOOLS=read-only`` keeps only tools annotated with ``readOnlyHint=True``. The default is ``all`` for backward compatibility. """ raw_value = os.getenv("TELEGRAM_EXPOSED_TOOLS", "all") if value is None else value mode = raw_value.strip().lower() if mode not in _EXPOSED_TOOLS_MODES: accepted = ", ".join(sorted(_EXPOSED_TOOLS_MODES)) raise SystemExit( f"Invalid TELEGRAM_EXPOSED_TOOLS '{raw_value}'. Expected one of: {accepted}." ) return mode def _apply_exposed_tools_mode(server: FastMCP = mcp, mode: Optional[str] = None) -> list[str]: """Prune registered MCP tools according to the configured exposure mode.""" selected_mode = _get_exposed_tools_mode() if mode is None else _get_exposed_tools_mode(mode) if selected_mode == "all": return [] removed: list[str] = [] for tool in list(server._tool_manager.list_tools()): annotations = getattr(tool, "annotations", None) if not getattr(annotations, "readOnlyHint", False): server._tool_manager.remove_tool(tool.name) removed.append(tool.name) return removed # --------------------------------------------------------------------------- # Multi-account configuration # --------------------------------------------------------------------------- _PROXY_TYPES_SOCKS_HTTP = {"socks5", "socks4", "http"} _PROXY_TYPES_ALL = _PROXY_TYPES_SOCKS_HTTP | {"mtproxy"} def _get_proxy_env(name: str, label: str) -> Optional[str]: """Resolve a TELEGRAM_PROXY_* env var with optional ``_