In [None]:
from typing import Any, Dict, List, Optional, Type, TypeVar, Generic
import json
import asyncio
from datetime import datetime
from dataclasses import dataclass
from abc import ABC, abstractmethod
import modelcontextprotocol as mcp
from pydantic import BaseModel, Field

T = TypeVar('T')

# Custom serializer
class JSONSerializer:
 """Custom JSON serializer with advanced type handling."""
 
 @staticmethod
 def serialize(obj: Any) -> str:
 """Serialize an object to JSON string."""
 if isinstance(obj, datetime):
 return obj.isoformat()
 elif isinstance(obj, BaseModel):
 return obj.json()
 return json.dumps(obj)
 
 @staticmethod
 def deserialize(data: str, cls: Type[T]) -> T:
 """Deserialize JSON string to object."""
 raw = json.loads(data)
 if issubclass(cls, BaseModel):
 return cls.parse_obj(raw)
 elif cls == datetime:
 return datetime.fromisoformat(raw)
 return cls(raw)

# Middleware base class
class Middleware(ABC):
 """Base class for middleware components."""
 
 @abstractmethod
 async def process_request(self, request: Any) -> Any:
 """Process incoming request."""
 pass
 
 @abstractmethod
 async def process_response(self, response: Any) -> Any:
 """Process outgoing response."""
 pass

# Authentication middleware
class AuthMiddleware(Middleware):
 """Middleware for authentication."""
 
 def __init__(self, api_key: str):
 self.api_key = api_key
 
 async def process_request(self, request: Any) -> Any:
 """Validate API key."""
 if not hasattr(request, "api_key") or request.api_key != self.api_key:
 raise ValueError("Invalid API key")
 return request
 
 async def process_response(self, response: Any) -> Any:
 """Pass through response."""
 return response

# Caching middleware
class CacheMiddleware(Middleware):
 """Middleware for response caching."""
 
 def __init__(self, ttl: int = 300):
 self.cache: Dict[str, Any] = {}
 self.ttl = ttl
 self.timestamps: Dict[str, datetime] = {}
 
 def _is_cached(self, key: str) -> bool:
 """Check if key is cached and not expired."""
 if key not in self.cache:
 return False
 if (datetime.now() - self.timestamps[key]).total_seconds() > self.ttl:
 del self.cache[key]
 del self.timestamps[key]
 return False
 return True
 
 async def process_request(self, request: Any) -> Any:
 """Check cache for request."""
 cache_key = JSONSerializer.serialize(request)
 if self._is_cached(cache_key):
 return self.cache[cache_key]
 return request
 
 async def process_response(self, response: Any) -> Any:
 """Cache response."""
 cache_key = JSONSerializer.serialize(response)
 self.cache[cache_key] = response
 self.timestamps[cache_key] = datetime.now()
 return response

# Logging middleware
class LoggingMiddleware(Middleware):
 """Middleware for request/response logging."""
 
 async def process_request(self, request: Any) -> Any:
 """Log request."""
 print(f"Request: {request}")
 return request
 
 async def process_response(self, response: Any) -> Any:
 """Log response."""
 print(f"Response: {response}")
 return response

# Middleware chain
class MiddlewareChain:
 """Chain of middleware components."""
 
 def __init__(self):
 self.middlewares: List[Middleware] = []
 
 def add(self, middleware: Middleware) -> None:
 """Add middleware to chain."""
 self.middlewares.append(middleware)
 
 async def process_request(self, request: Any) -> Any:
 """Process request through middleware chain."""
 for middleware in self.middlewares:
 request = await middleware.process_request(request)
 return request
 
 async def process_response(self, response: Any) -> Any:
 """Process response through middleware chain."""
 for middleware in reversed(self.middlewares):
 response = await middleware.process_response(response)
 return response


In [None]:
# Example models
class WeatherRequest(BaseModel):
 """Weather request with authentication."""
 api_key: str = Field(..., description="API key for authentication")
 location: str = Field(..., description="Location to get weather for")
 units: str = Field(default="metric", description="Units (metric/imperial)")

class WeatherResponse(BaseModel):
 """Weather response with timestamp."""
 location: str = Field(..., description="Location")
 temperature: float = Field(..., description="Temperature")
 humidity: float = Field(..., description="Humidity percentage")
 timestamp: datetime = Field(default_factory=datetime.now, description="Response timestamp")

# Example tool
class WeatherTool:
 """Weather service with middleware support."""
 
 def __init__(self, api_key: str):
 self.middleware = MiddlewareChain()
 
 # Add middleware components
 self.middleware.add(LoggingMiddleware())
 self.middleware.add(AuthMiddleware(api_key))
 self.middleware.add(CacheMiddleware(ttl=60)) # Cache for 1 minute
 
 async def get_weather(self, request: WeatherRequest) -> WeatherResponse:
 """Get weather for location."""
 # Process request through middleware
 processed_request = await self.middleware.process_request(request)
 
 # Simulate API call
 await asyncio.sleep(1)
 response = WeatherResponse(
 location=processed_request.location,
 temperature=20.5,
 humidity=65.0
 )
 
 # Process response through middleware
 processed_response = await self.middleware.process_response(response)
 return processed_response

# Create MCP server with weather tool
weather_tool = WeatherTool(api_key="secret-key")
server = mcp.Server()
server.add_tool("weather", weather_tool.get_weather, WeatherRequest, WeatherResponse)

# Test the weather tool
async def test_weather_tool():
 # Test with valid API key
 print("Testing with valid API key...")
 request = WeatherRequest(
 api_key="secret-key",
 location="London",
 units="metric"
 )
 response = await weather_tool.get_weather(request)
 print(f"Weather: {response}\n")
 
 # Test caching (should be instant)
 print("Testing cache...")
 start = datetime.now()
 response = await weather_tool.get_weather(request)
 duration = (datetime.now() - start).total_seconds()
 print(f"Cached response time: {duration:.3f} seconds")
 print(f"Weather: {response}\n")
 
 # Test with invalid API key
 print("Testing with invalid API key...")
 try:
 request = WeatherRequest(
 api_key="wrong-key",
 location="London",
 units="metric"
 )
 await weather_tool.get_weather(request)
 except ValueError as e:
 print(f"Caught expected error: {e}")

# Run tests
await test_weather_tool()


In [None]:
# Stream models
class DataStreamRequest(BaseModel):
 """Request for streaming data."""
 stream_id: str = Field(..., description="Stream identifier")
 batch_size: int = Field(default=10, description="Number of items per batch")

class DataStreamResponse(BaseModel):
 """Response with streaming data."""
 stream_id: str = Field(..., description="Stream identifier")
 data: List[float] = Field(..., description="Batch of data")
 is_last: bool = Field(default=False, description="Whether this is the last batch")

# Pub-sub models
class PublishRequest(BaseModel):
 """Request to publish data."""
 topic: str = Field(..., description="Topic to publish to")
 message: Any = Field(..., description="Message to publish")

class SubscribeRequest(BaseModel):
 """Request to subscribe to topic."""
 topic: str = Field(..., description="Topic to subscribe to")

class Message(BaseModel):
 """Message received from subscription."""
 topic: str = Field(..., description="Topic of the message")
 data: Any = Field(..., description="Message data")
 timestamp: datetime = Field(default_factory=datetime.now, description="Message timestamp")

# Stream handler
class StreamHandler:
 """Handler for data streaming."""
 
 def __init__(self):
 self.streams: Dict[str, asyncio.Queue] = {}
 
 async def create_stream(self, stream_id: str) -> None:
 """Create a new data stream."""
 if stream_id not in self.streams:
 self.streams[stream_id] = asyncio.Queue()
 
 async def write_data(self, stream_id: str, data: List[float]) -> None:
 """Write data to stream."""
 if stream_id in self.streams:
 await self.streams[stream_id].put(data)
 
 async def read_data(self, stream_id: str, batch_size: int) -> List[float]:
 """Read data from stream."""
 if stream_id not in self.streams:
 raise ValueError(f"Stream {stream_id} not found")
 
 queue = self.streams[stream_id]
 if queue.empty():
 return []
 
 data = []
 while len(data) < batch_size and not queue.empty():
 batch = await queue.get()
 data.extend(batch)
 return data[:batch_size]

# Pub-sub handler
class PubSubHandler:
 """Handler for publish-subscribe pattern."""
 
 def __init__(self):
 self.topics: Dict[str, List[asyncio.Queue]] = {}
 
 async def publish(self, topic: str, message: Any) -> None:
 """Publish message to topic."""
 if topic not in self.topics:
 self.topics[topic] = []
 
 msg = Message(topic=topic, data=message)
 for queue in self.topics[topic]:
 await queue.put(msg)
 
 async def subscribe(self, topic: str) -> asyncio.Queue:
 """Subscribe to topic."""
 if topic not in self.topics:
 self.topics[topic] = []
 
 queue = asyncio.Queue()
 self.topics[topic].append(queue)
 return queue
 
 async def unsubscribe(self, topic: str, queue: asyncio.Queue) -> None:
 """Unsubscribe from topic."""
 if topic in self.topics and queue in self.topics[topic]:
 self.topics[topic].remove(queue)

# Example usage
async def test_streaming():
 print("Testing data streaming...")
 handler = StreamHandler()
 
 # Create stream
 stream_id = "test-stream"
 await handler.create_stream(stream_id)
 
 # Write data
 for i in range(3):
 data = [float(x) for x in range(i*5, (i+1)*5)]
 await handler.write_data(stream_id, data)
 print(f"Wrote batch {i}: {data}")
 
 # Read data in batches
 batch_size = 7
 while True:
 data = await handler.read_data(stream_id, batch_size)
 if not data:
 break
 print(f"Read batch (size={batch_size}): {data}")

async def test_pubsub():
 print("\nTesting pub-sub...")
 handler = PubSubHandler()
 
 # Create subscribers
 sub1 = await handler.subscribe("weather")
 sub2 = await handler.subscribe("weather")
 
 # Publish messages
 messages = [
 {"location": "London", "temp": 20},
 {"location": "Paris", "temp": 25},
 {"location": "Tokyo", "temp": 30}
 ]
 
 for msg in messages:
 await handler.publish("weather", msg)
 print(f"Published: {msg}")
 
 # Read messages from subscribers
 print("\nSubscriber 1 messages:")
 while not sub1.empty():
 msg = await sub1.get()
 print(f"Received: {msg}")
 
 print("\nSubscriber 2 messages:")
 while not sub2.empty():
 msg = await sub2.get()
 print(f"Received: {msg}")

# Run tests
print("Testing advanced communication patterns...\n")
await test_streaming()
await test_pubsub()
