In [None]:
# First, let's set up our FastMCP server with necessary imports
from mcp.server.fastmcp import FastMCP
import mcp.types as types
from pydantic import BaseModel
from typing import Dict, List, Optional
from pathlib import Path
import hashlib
import mimetypes
from datetime import datetime

# Initialize our FastMCP server
mcp = FastMCP("file_processor")

# Define our data models
class FileInfo(BaseModel):
 """File metadata model"""
 path: str
 size: int
 modified: float
 mime_type: str
 hash: Optional[str]

class FileContent(BaseModel):
 """File content model"""
 content: str
 encoding: str
 lines: int
 info: FileInfo

# System prompts
@mcp.prompt()
def system_prompt() -> str:
 """Define the file processor's role"""
 return """
 You are a file processing assistant that helps users work with files safely.
 Always validate paths and file operations before execution.
 Never perform dangerous operations without confirmation.
 """

@mcp.prompt()
def error_prompt() -> str:
 """Handle file operation errors"""
 return """
 I encountered an error while processing the file.
 This could be due to:
 - Invalid file path
 - Permission issues
 - File not found
 - Unsupported file type
 Please check the file and try again.
 """

print("āœ… FastMCP server initialized with models and prompts!")


In [None]:
# Now let's implement our file resource provider
@mcp.resource("file://{path}")
async def get_file_info(path: str) -> Dict:
 """Get metadata about a file"""
 try:
 full_path = Path(path).resolve()
 
 if not full_path.exists():
 return {"error": "File not found"}
 
 stat = full_path.stat()
 mime_type, _ = mimetypes.guess_type(str(full_path))
 
 # Calculate file hash
 sha256_hash = hashlib.sha256()
 with open(full_path, "rb") as f:
 for byte_block in iter(lambda: f.read(4096), b""):
 sha256_hash.update(byte_block)
 
 return FileInfo(
 path=str(full_path),
 size=stat.st_size,
 modified=stat.st_mtime,
 mime_type=mime_type or "application/octet-stream",
 hash=sha256_hash.hexdigest()
 ).dict()
 
 except Exception as e:
 return {"error": str(e)}

# Create a test file
with open("test.txt", "w") as f:
 f.write("Hello FastMCP!")

# Test our resource
import asyncio
result = asyncio.run(get_file_info("test.txt"))
print("šŸ“„ File Info:", result)


In [None]:
# Let's implement our file operation tools
@mcp.tool()
async def read_file(file_path: str, encoding: str = "utf-8") -> Dict:
 """
 Read a file safely
 
 Args:
 file_path: Path to the file
 encoding: File encoding (default: utf-8)
 
 Returns:
 Dictionary with file contents and metadata
 """
 try:
 # Get file info from resource
 file_info = await get_file_info(file_path)
 
 if "error" in file_info:
 raise Exception(file_info["error"])
 
 # Validate file type
 if not file_info["mime_type"].startswith(("text/", "application/")):
 raise Exception("Unsupported file type")
 
 # Read file
 with open(file_path, "r", encoding=encoding) as f:
 content = f.read()
 
 return {
 "success": True,
 "data": FileContent(
 content=content,
 encoding=encoding,
 lines=len(content.splitlines()),
 info=FileInfo(**file_info)
 ).dict()
 }
 
 except Exception as e:
 return {
 "success": False,
 "error": str(e),
 "prompt": "error_prompt"
 }

@mcp.tool()
async def write_file(file_path: str, content: str, encoding: str = "utf-8") -> Dict:
 """
 Write content to a file safely
 
 Args:
 file_path: Path to write to
 content: Content to write
 encoding: File encoding (default: utf-8)
 
 Returns:
 Dictionary with operation results
 """
 try:
 # Validate path
 full_path = Path(file_path).resolve()
 
 # Create parent directories
 full_path.parent.mkdir(parents=True, exist_ok=True)
 
 # Write file
 with open(full_path, "w", encoding=encoding) as f:
 f.write(content)
 
 # Get updated file info
 file_info = await get_file_info(str(full_path))
 
 return {
 "success": True,
 "data": {
 "path": str(full_path),
 "size": file_info["size"],
 "lines": len(content.splitlines()),
 "encoding": encoding
 }
 }
 
 except Exception as e:
 return {
 "success": False,
 "error": str(e),
 "prompt": "error_prompt"
 }

# Test our tools
result = asyncio.run(write_file("example.txt", "Testing FastMCP tools!\nThis is line 2."))
print("āœļø Write result:", result)

result = asyncio.run(read_file("example.txt"))
print("\nšŸ“– Read result:", result)
