{ "cells": [ { "cell_type": "raw", "metadata": { "vscode": { "languageId": "raw" } }, "source": [ "# 🔄 Advanced State Management in MCP\n", "\n", "Learn how to implement robust state management in MCP tools and applications. This notebook covers state persistence, concurrency control, state synchronization, and advanced state patterns.\n", "\n", "## 🎯 Learning Objectives\n", "\n", "By the end of this notebook, you will:\n", "- Design state management systems\n", "- Handle concurrent state access\n", "- Implement state persistence\n", "- Create state synchronization\n", "- Build state recovery mechanisms\n", "\n", "## 📋 Prerequisites\n", "\n", "- Completed notebooks 01-16\n", "- Understanding of concurrency\n", "- Knowledge of persistence\n", "- Familiarity with state patterns\n", "\n", "## 🔑 Key Concepts\n", "\n", "1. **State Types**\n", " - Ephemeral state\n", " - Persistent state\n", " - Shared state\n", " - Distributed state\n", "\n", "2. **State Management**\n", " - State persistence\n", " - State recovery\n", " - State validation\n", " - State migration\n", "\n", "3. **Concurrency Control**\n", " - Locks and mutexes\n", " - Transactions\n", " - Optimistic locking\n", " - Event sourcing\n", "\n", "## 📚 Table of Contents\n", "\n", "1. [State Design](#design)\n", "2. [State Persistence](#persistence)\n", "3. [Concurrency Control](#concurrency)\n", "4. [Best Practices](#practices)\n", "5. [Exercises](#exercises)\n", "\n", "---\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from typing import Dict, Any, Optional, List, Type, TypeVar, Generic\n", "import json\n", "import asyncio\n", "from datetime import datetime\n", "from dataclasses import dataclass, asdict\n", "from abc import ABC, abstractmethod\n", "import sqlite3\n", "import modelcontextprotocol as mcp\n", "from pydantic import BaseModel, Field\n", "\n", "T = TypeVar('T')\n", "\n", "# State interface\n", "class State(BaseModel):\n", " \"\"\"Base class for state objects.\"\"\"\n", " version: int = Field(default=1, description=\"State version\")\n", " created_at: datetime = Field(default_factory=datetime.now, description=\"Creation timestamp\")\n", " updated_at: datetime = Field(default_factory=datetime.now, description=\"Last update timestamp\")\n", " \n", " def update(self):\n", " \"\"\"Update the timestamp.\"\"\"\n", " self.updated_at = datetime.now()\n", "\n", "# State store interface\n", "class StateStore(ABC, Generic[T]):\n", " \"\"\"Abstract base class for state stores.\"\"\"\n", " \n", " @abstractmethod\n", " async def get(self, key: str) -> Optional[T]:\n", " \"\"\"Get state by key.\"\"\"\n", " pass\n", " \n", " @abstractmethod\n", " async def set(self, key: str, state: T) -> None:\n", " \"\"\"Set state for key.\"\"\"\n", " pass\n", " \n", " @abstractmethod\n", " async def delete(self, key: str) -> None:\n", " \"\"\"Delete state for key.\"\"\"\n", " pass\n", " \n", " @abstractmethod\n", " async def list(self) -> List[str]:\n", " \"\"\"List all state keys.\"\"\"\n", " pass\n", "\n", "# Memory state store\n", "class MemoryStateStore(StateStore[T]):\n", " \"\"\"In-memory state store implementation.\"\"\"\n", " \n", " def __init__(self):\n", " self.store: Dict[str, T] = {}\n", " self.locks: Dict[str, asyncio.Lock] = {}\n", " \n", " def _get_lock(self, key: str) -> asyncio.Lock:\n", " \"\"\"Get or create lock for key.\"\"\"\n", " if key not in self.locks:\n", " self.locks[key] = asyncio.Lock()\n", " return self.locks[key]\n", " \n", " async def get(self, key: str) -> Optional[T]:\n", " \"\"\"Get state by key.\"\"\"\n", " async with self._get_lock(key):\n", " return self.store.get(key)\n", " \n", " async def set(self, key: str, state: T) -> None:\n", " \"\"\"Set state for key.\"\"\"\n", " async with self._get_lock(key):\n", " self.store[key] = state\n", " \n", " async def delete(self, key: str) -> None:\n", " \"\"\"Delete state for key.\"\"\"\n", " async with self._get_lock(key):\n", " if key in self.store:\n", " del self.store[key]\n", " \n", " async def list(self) -> List[str]:\n", " \"\"\"List all state keys.\"\"\"\n", " return list(self.store.keys())\n", "\n", "# SQLite state store\n", "class SQLiteStateStore(StateStore[T]):\n", " \"\"\"SQLite-based state store implementation.\"\"\"\n", " \n", " def __init__(self, db_path: str, state_type: Type[T]):\n", " self.db_path = db_path\n", " self.state_type = state_type\n", " self.locks: Dict[str, asyncio.Lock] = {}\n", " self._init_db()\n", " \n", " def _init_db(self):\n", " \"\"\"Initialize database.\"\"\"\n", " conn = sqlite3.connect(self.db_path)\n", " try:\n", " conn.execute(\"\"\"\n", " CREATE TABLE IF NOT EXISTS states (\n", " key TEXT PRIMARY KEY,\n", " data TEXT NOT NULL,\n", " created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n", " updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n", " )\n", " \"\"\")\n", " conn.commit()\n", " finally:\n", " conn.close()\n", " \n", " def _get_lock(self, key: str) -> asyncio.Lock:\n", " \"\"\"Get or create lock for key.\"\"\"\n", " if key not in self.locks:\n", " self.locks[key] = asyncio.Lock()\n", " return self.locks[key]\n", " \n", " async def get(self, key: str) -> Optional[T]:\n", " \"\"\"Get state by key.\"\"\"\n", " async with self._get_lock(key):\n", " conn = sqlite3.connect(self.db_path)\n", " try:\n", " cursor = conn.execute(\n", " \"SELECT data FROM states WHERE key = ?\",\n", " (key,)\n", " )\n", " row = cursor.fetchone()\n", " if row:\n", " data = json.loads(row[0])\n", " return self.state_type.parse_obj(data)\n", " return None\n", " finally:\n", " conn.close()\n", " \n", " async def set(self, key: str, state: T) -> None:\n", " \"\"\"Set state for key.\"\"\"\n", " async with self._get_lock(key):\n", " conn = sqlite3.connect(self.db_path)\n", " try:\n", " data = json.dumps(state.dict())\n", " conn.execute(\n", " \"\"\"\n", " INSERT OR REPLACE INTO states (key, data, updated_at)\n", " VALUES (?, ?, CURRENT_TIMESTAMP)\n", " \"\"\",\n", " (key, data)\n", " )\n", " conn.commit()\n", " finally:\n", " conn.close()\n", " \n", " async def delete(self, key: str) -> None:\n", " \"\"\"Delete state for key.\"\"\"\n", " async with self._get_lock(key):\n", " conn = sqlite3.connect(self.db_path)\n", " try:\n", " conn.execute(\"DELETE FROM states WHERE key = ?\", (key,))\n", " conn.commit()\n", " finally:\n", " conn.close()\n", " \n", " async def list(self) -> List[str]:\n", " \"\"\"List all state keys.\"\"\"\n", " conn = sqlite3.connect(self.db_path)\n", " try:\n", " cursor = conn.execute(\"SELECT key FROM states\")\n", " return [row[0] for row in cursor.fetchall()]\n", " finally:\n", " conn.close()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Example state models\n", "class UserState(State):\n", " \"\"\"User state model.\"\"\"\n", " name: str = Field(..., description=\"User name\")\n", " email: str = Field(..., description=\"User email\")\n", " preferences: Dict[str, Any] = Field(default_factory=dict, description=\"User preferences\")\n", " last_login: Optional[datetime] = Field(default=None, description=\"Last login timestamp\")\n", "\n", "class SessionState(State):\n", " \"\"\"Session state model.\"\"\"\n", " user_id: str = Field(..., description=\"User ID\")\n", " token: str = Field(..., description=\"Session token\")\n", " expires_at: datetime = Field(..., description=\"Expiration timestamp\")\n", " data: Dict[str, Any] = Field(default_factory=dict, description=\"Session data\")\n", "\n", "# State manager\n", "class StateManager:\n", " \"\"\"Manager for handling multiple state stores.\"\"\"\n", " \n", " def __init__(self):\n", " # Create state stores\n", " self.user_store = SQLiteStateStore(\"users.db\", UserState)\n", " self.session_store = MemoryStateStore[SessionState]()\n", " \n", " async def create_user(self, user_id: str, name: str, email: str) -> UserState:\n", " \"\"\"Create a new user state.\"\"\"\n", " user = UserState(\n", " name=name,\n", " email=email\n", " )\n", " await self.user_store.set(user_id, user)\n", " return user\n", " \n", " async def get_user(self, user_id: str) -> Optional[UserState]:\n", " \"\"\"Get user state.\"\"\"\n", " return await self.user_store.get(user_id)\n", " \n", " async def update_user(self, user_id: str, **updates) -> Optional[UserState]:\n", " \"\"\"Update user state.\"\"\"\n", " user = await self.get_user(user_id)\n", " if user:\n", " for key, value in updates.items():\n", " setattr(user, key, value)\n", " user.update()\n", " await self.user_store.set(user_id, user)\n", " return user\n", " \n", " async def create_session(self, user_id: str) -> SessionState:\n", " \"\"\"Create a new session state.\"\"\"\n", " import uuid\n", " from datetime import timedelta\n", " \n", " session = SessionState(\n", " user_id=user_id,\n", " token=str(uuid.uuid4()),\n", " expires_at=datetime.now() + timedelta(hours=1)\n", " )\n", " await self.session_store.set(session.token, session)\n", " return session\n", " \n", " async def get_session(self, token: str) -> Optional[SessionState]:\n", " \"\"\"Get session state.\"\"\"\n", " session = await self.session_store.get(token)\n", " if session and session.expires_at > datetime.now():\n", " return session\n", " return None\n", " \n", " async def update_session(self, token: str, data: Dict[str, Any]) -> Optional[SessionState]:\n", " \"\"\"Update session data.\"\"\"\n", " session = await self.get_session(token)\n", " if session:\n", " session.data.update(data)\n", " session.update()\n", " await self.session_store.set(token, session)\n", " return session\n", "\n", "# Test state management\n", "async def test_state_management():\n", " print(\"Testing state management...\")\n", " manager = StateManager()\n", " \n", " # Create user\n", " print(\"\\nCreating user...\")\n", " user = await manager.create_user(\n", " \"user-1\",\n", " \"John Doe\",\n", " \"john@example.com\"\n", " )\n", " print(f\"Created user: {user}\")\n", " \n", " # Update user\n", " print(\"\\nUpdating user...\")\n", " user = await manager.update_user(\n", " \"user-1\",\n", " preferences={\"theme\": \"dark\", \"notifications\": True}\n", " )\n", " print(f\"Updated user: {user}\")\n", " \n", " # Create session\n", " print(\"\\nCreating session...\")\n", " session = await manager.create_session(\"user-1\")\n", " print(f\"Created session: {session}\")\n", " \n", " # Update session\n", " print(\"\\nUpdating session...\")\n", " session = await manager.update_session(\n", " session.token,\n", " {\"last_page\": \"/dashboard\"}\n", " )\n", " print(f\"Updated session: {session}\")\n", " \n", " # Get user and session\n", " print(\"\\nRetrieving states...\")\n", " user = await manager.get_user(\"user-1\")\n", " print(f\"Retrieved user: {user}\")\n", " \n", " session = await manager.get_session(session.token)\n", " print(f\"Retrieved session: {session}\")\n", "\n", "# Run test\n", "await test_state_management()\n" ] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 2 }