"""Create an MCP server that proxies requests through an MCP client. This server is created independent of any transport mechanism. """ import logging import typing as t from mcp import server, types from mcp.client.session import ClientSession from mcp.server.lowlevel.server import request_ctx logger = logging.getLogger(__name__) async def create_proxy_server(remote_app: ClientSession) -> server.Server[object]: # noqa: C901, PLR0915 """Create a server instance from a remote app.""" logger.debug("Sending initialization request to remote MCP server...") response = await remote_app.initialize() capabilities = response.capabilities logger.debug("Configuring proxied MCP server...") app: server.Server[object] = server.Server(name=response.serverInfo.name) if capabilities.prompts: logger.debug("Capabilities: adding Prompts...") async def _list_prompts(_: t.Any) -> types.ServerResult: # noqa: ANN401 result = await remote_app.list_prompts() return types.ServerResult(result) app.request_handlers[types.ListPromptsRequest] = _list_prompts async def _get_prompt(req: types.GetPromptRequest) -> types.ServerResult: result = await remote_app.get_prompt(req.params.name, req.params.arguments) return types.ServerResult(result) app.request_handlers[types.GetPromptRequest] = _get_prompt if capabilities.resources: logger.debug("Capabilities: adding Resources...") async def _list_resources(_: t.Any) -> types.ServerResult: # noqa: ANN401 result = await remote_app.list_resources() return types.ServerResult(result) app.request_handlers[types.ListResourcesRequest] = _list_resources async def _list_resource_templates(_: t.Any) -> types.ServerResult: # noqa: ANN401 result = await remote_app.list_resource_templates() return types.ServerResult(result) app.request_handlers[types.ListResourceTemplatesRequest] = _list_resource_templates async def _read_resource(req: types.ReadResourceRequest) -> types.ServerResult: result = await remote_app.read_resource(req.params.uri) return types.ServerResult(result) app.request_handlers[types.ReadResourceRequest] = _read_resource if capabilities.logging: logger.debug("Capabilities: adding Logging...") async def _set_logging_level(req: types.SetLevelRequest) -> types.ServerResult: await remote_app.set_logging_level(req.params.level) return types.ServerResult(types.EmptyResult()) app.request_handlers[types.SetLevelRequest] = _set_logging_level if capabilities.resources: logger.debug("Capabilities: adding Resources...") async def _subscribe_resource(req: types.SubscribeRequest) -> types.ServerResult: await remote_app.subscribe_resource(req.params.uri) return types.ServerResult(types.EmptyResult()) app.request_handlers[types.SubscribeRequest] = _subscribe_resource async def _unsubscribe_resource(req: types.UnsubscribeRequest) -> types.ServerResult: await remote_app.unsubscribe_resource(req.params.uri) return types.ServerResult(types.EmptyResult()) app.request_handlers[types.UnsubscribeRequest] = _unsubscribe_resource if capabilities.tools: logger.debug("Capabilities: adding Tools...") async def _list_tools(_: t.Any) -> types.ServerResult: # noqa: ANN401 tools = await remote_app.list_tools() return types.ServerResult(tools) app.request_handlers[types.ListToolsRequest] = _list_tools async def _call_tool(req: types.CallToolRequest) -> types.ServerResult: try: # Convert meta to dict if present (required for TypedDict compatibility) meta_dict = dict(req.params.meta) if req.params.meta else None # Only set up progress forwarding if progressToken is present progress_token = meta_dict.get("progressToken") if meta_dict else None progress_callback = None if progress_token is not None: # Get request context to access server session for progress forwarding ctx = request_ctx.get() # Create progress forwarder callback # Note: The callback receives individual parameters, not a # ProgressNotificationParams object async def progress_forwarder( progress: float, total: float | None, message: str | None, ) -> None: # Forward progress notification back to parent via server session await ctx.session.send_progress_notification( progress_token=progress_token, progress=progress, total=total, message=message, related_request_id=str(ctx.request_id), ) progress_callback = progress_forwarder result = await remote_app.call_tool( req.params.name, (req.params.arguments or {}), meta=meta_dict, progress_callback=progress_callback, ) return types.ServerResult(result) except Exception as e: # noqa: BLE001 return types.ServerResult( types.CallToolResult( content=[types.TextContent(type="text", text=str(e))], isError=True, ), ) app.request_handlers[types.CallToolRequest] = _call_tool async def _send_progress_notification(req: types.ProgressNotification) -> None: await remote_app.send_progress_notification( req.params.progressToken, req.params.progress, req.params.total, ) app.notification_handlers[types.ProgressNotification] = _send_progress_notification async def _complete(req: types.CompleteRequest) -> types.ServerResult: result = await remote_app.complete( req.params.ref, req.params.argument.model_dump(), ) return types.ServerResult(result) app.request_handlers[types.CompleteRequest] = _complete return app