In [None]:
from typing import Dict, Any, Optional, List, Type
from abc import ABC, abstractmethod
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
import modelcontextprotocol as mcp
from pydantic import BaseModel, Field

# Base resource class
class Resource(ABC):
 """Abstract base class for all resources."""
 
 def __init__(self, resource_id: str, config: Dict[str, Any]):
 self.resource_id = resource_id
 self.config = config
 self.created_at = datetime.now()
 self.last_accessed = datetime.now()
 self._is_initialized = False
 
 @abstractmethod
 async def initialize(self) -> None:
 """Initialize the resource."""
 pass
 
 @abstractmethod
 async def cleanup(self) -> None:
 """Clean up the resource."""
 pass
 
 def update_access_time(self):
 """Update the last access time."""
 self.last_accessed = datetime.now()
 
 @property
 def is_initialized(self) -> bool:
 return self._is_initialized
 
 def __repr__(self) -> str:
 return f"{self.__class__.__name__}(id={self.resource_id})"

# Resource provider
class ResourceProvider(ABC):
 """Abstract base class for resource providers."""
 
 def __init__(self):
 self.resources: Dict[str, Resource] = {}
 
 @abstractmethod
 async def create_resource(self, resource_id: str, config: Dict[str, Any]) -> Resource:
 """Create a new resource."""
 pass
 
 @abstractmethod
 async def get_resource(self, resource_id: str) -> Resource:
 """Get an existing resource."""
 pass
 
 async def delete_resource(self, resource_id: str) -> None:
 """Delete a resource."""
 if resource_id in self.resources:
 resource = self.resources[resource_id]
 await resource.cleanup()
 del self.resources[resource_id]
 
 async def cleanup_all(self) -> None:
 """Clean up all resources."""
 for resource_id in list(self.resources.keys()):
 await self.delete_resource(resource_id)

# Resource pool
class ResourcePool:
 """Manages a pool of resources."""
 
 def __init__(self, provider: ResourceProvider, max_size: int = 10):
 self.provider = provider
 self.max_size = max_size
 self.available: List[Resource] = []
 self.in_use: Dict[str, Resource] = {}
 
 async def acquire(self, config: Dict[str, Any]) -> Resource:
 """Acquire a resource from the pool."""
 if self.available:
 resource = self.available.pop()
 elif len(self.in_use) < self.max_size:
 resource = await self.provider.create_resource(
 f"pool-resource-{len(self.in_use)}", 
 config
 )
 else:
 raise RuntimeError("Resource pool exhausted")
 
 self.in_use[resource.resource_id] = resource
 return resource
 
 async def release(self, resource: Resource) -> None:
 """Release a resource back to the pool."""
 if resource.resource_id in self.in_use:
 del self.in_use[resource.resource_id]
 self.available.append(resource)
 
 async def cleanup(self) -> None:
 """Clean up all resources in the pool."""
 await self.provider.cleanup_all()


In [None]:
import sqlite3
from contextlib import asynccontextmanager

class DatabaseResource(Resource):
 """A database connection resource."""
 
 def __init__(self, resource_id: str, config: Dict[str, Any]):
 super().__init__(resource_id, config)
 self.connection: Optional[sqlite3.Connection] = None
 
 async def initialize(self) -> None:
 """Initialize the database connection."""
 if not self._is_initialized:
 self.connection = sqlite3.connect(self.config["database"])
 self._is_initialized = True
 
 async def cleanup(self) -> None:
 """Close the database connection."""
 if self.connection:
 self.connection.close()
 self.connection = None
 self._is_initialized = False
 
 def execute_query(self, query: str, parameters: Optional[tuple] = None) -> List[tuple]:
 """Execute a query on the database."""
 if not self.connection:
 raise RuntimeError("Database not initialized")
 
 self.update_access_time()
 cursor = self.connection.cursor()
 try:
 if parameters:
 cursor.execute(query, parameters)
 else:
 cursor.execute(query)
 return cursor.fetchall()
 finally:
 cursor.close()

class DatabaseProvider(ResourceProvider):
 """Provider for database connections."""
 
 async def create_resource(self, resource_id: str, config: Dict[str, Any]) -> Resource:
 """Create a new database connection."""
 resource = DatabaseResource(resource_id, config)
 await resource.initialize()
 self.resources[resource_id] = resource
 return resource
 
 async def get_resource(self, resource_id: str) -> Resource:
 """Get an existing database connection."""
 if resource_id not in self.resources:
 raise KeyError(f"Resource {resource_id} not found")
 return self.resources[resource_id]

# MCP models
class DatabaseConfig(BaseModel):
 database: str = Field(..., description="Database file path")
 pool_size: int = Field(default=5, description="Maximum number of connections")
 
class QueryRequest(BaseModel):
 query: str = Field(..., description="SQL query to execute")
 parameters: Optional[tuple] = Field(default=None, description="Query parameters")
 
class QueryResult(BaseModel):
 results: List[tuple] = Field(..., description="Query results")
 resource_id: str = Field(..., description="ID of the resource used")

# MCP tool
class DatabasePoolTool:
 """MCP tool for managing a pool of database connections."""
 
 def __init__(self, config: DatabaseConfig):
 self.provider = DatabaseProvider()
 self.pool = ResourcePool(self.provider, config.pool_size)
 self.config = {"database": config.database}
 
 async def execute_query(self, request: QueryRequest) -> QueryResult:
 """Execute a query using a connection from the pool."""
 resource = await self.pool.acquire(self.config)
 try:
 results = resource.execute_query(request.query, request.parameters)
 return QueryResult(
 results=results,
 resource_id=resource.resource_id
 )
 finally:
 await self.pool.release(resource)
 
 async def cleanup(self):
 """Clean up all resources."""
 await self.pool.cleanup()

# Create MCP server
config = DatabaseConfig(database=":memory:", pool_size=3)
db_tool = DatabasePoolTool(config)
server = mcp.Server()
server.add_tool("database", db_tool.execute_query, QueryRequest, QueryResult)


In [None]:
async def test_database_pool():
 # Create test table
 create_table = QueryRequest(
 query="""
 CREATE TABLE IF NOT EXISTS users (
 id INTEGER PRIMARY KEY,
 name TEXT NOT NULL,
 email TEXT UNIQUE NOT NULL
 )
 """
 )
 result = await db_tool.execute_query(create_table)
 print("Created table using resource:", result.resource_id)
 
 # Insert test data
 insert_data = QueryRequest(
 query="INSERT INTO users (name, email) VALUES (?, ?)",
 parameters=("John Doe", "john@example.com")
 )
 result = await db_tool.execute_query(insert_data)
 print("\nInserted data using resource:", result.resource_id)
 
 # Query data
 select_data = QueryRequest(
 query="SELECT * FROM users"
 )
 result = await db_tool.execute_query(select_data)
 print("\nQueried data using resource:", result.resource_id)
 print("Results:", result.results)
 
 # Test pool exhaustion
 print("\nTesting pool exhaustion...")
 resources = []
 try:
 for i in range(5): # Try to get more than pool_size connections
 result = await db_tool.execute_query(select_data)
 print(f"Got resource: {result.resource_id}")
 except RuntimeError as e:
 print(f"Pool exhausted as expected: {e}")
 
 # Clean up
 await db_tool.cleanup()
 print("\nCleaned up all resources")

# Run the test
await test_database_pool()
