import os import tempfile import base64 from pathlib import Path from datetime import datetime from fuzzywuzzy import fuzz from typing import Union from mcp.types import ( EmbeddedResource, TextResourceContents, BlobResourceContents, TextContent, ) class ElevenLabsMcpError(Exception): pass def make_error(error_text: str): raise ElevenLabsMcpError(error_text) def is_file_writeable(path: Path) -> bool: if path.exists(): return os.access(path, os.W_OK) parent_dir = path.parent return os.access(parent_dir, os.W_OK) def make_output_file( tool: str, text: str, extension: str, full_id: bool = False ) -> Path: id = text if full_id else text[:5] output_file_name = f"{tool}_{id.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{extension}" return Path(output_file_name) def make_output_path( output_directory: str | None, base_path: str | None = None ) -> Path: # Use None as the default, and provide a default when an empty string is passed if not base_path: base_path = str(Path.home() / "Desktop") if output_directory is None: output_path = Path(os.path.expanduser(base_path)) elif os.path.isabs(output_directory): output_path = Path(os.path.expanduser(output_directory)) else: output_path = Path(os.path.expanduser(base_path)) / Path(output_directory) if not is_file_writeable(output_path): make_error(f"Directory ({output_path}) is not writeable") output_path.mkdir(parents=True, exist_ok=True) return output_path def resolve_resource_path(filename: str, base_dir: Path) -> Path: """Resolve a resource filename against base_dir, rejecting paths outside it.""" candidate = Path(filename) base_dir_resolved = base_dir.resolve() if candidate.is_absolute(): resolved_file = candidate.resolve() else: resolved_file = (base_dir_resolved / candidate).resolve() try: resolved_file.relative_to(base_dir_resolved) except ValueError: make_error( f"Resource path ({resolved_file}) is outside of allowed directory {base_dir_resolved}" ) return resolved_file def find_similar_filenames( target_file: str, directory: Path, threshold: int = 70 ) -> list[tuple[str, int]]: """ Find files with names similar to the target file using fuzzy matching. Args: target_file (str): The reference filename to compare against directory (str): Directory to search in (defaults to current directory) threshold (int): Similarity threshold (0 to 100, where 100 is identical) Returns: list: List of similar filenames with their similarity scores """ target_filename = os.path.basename(target_file) similar_files = [] for root, _, files in os.walk(directory): for filename in files: if ( filename == target_filename and os.path.join(root, filename) == target_file ): continue similarity = fuzz.token_sort_ratio(target_filename, filename) if similarity >= threshold: file_path = Path(root) / filename similar_files.append((file_path, similarity)) similar_files.sort(key=lambda x: x[1], reverse=True) return similar_files def try_find_similar_files( filename: str, directory: Path, take_n: int = 5 ) -> list[Path]: similar_files = find_similar_filenames(filename, directory) if not similar_files: return [] filtered_files = [] for path, _ in similar_files[:take_n]: if check_audio_file(path): filtered_files.append(path) return filtered_files def check_audio_file(path: Path) -> bool: audio_extensions = { ".wav", ".mp3", ".m4a", ".aac", ".ogg", ".flac", ".mp4", ".avi", ".mov", ".wmv", } return path.suffix.lower() in audio_extensions def handle_input_file(file_path: str, audio_content_check: bool = True) -> Path: if not os.path.isabs(file_path) and not os.environ.get("ELEVENLABS_MCP_BASE_PATH"): make_error( "File path must be an absolute path if ELEVENLABS_MCP_BASE_PATH is not set" ) path = Path(file_path) if not path.exists() and path.parent.exists(): parent_directory = path.parent similar_files = try_find_similar_files(path.name, parent_directory) similar_files_formatted = ",".join([str(file) for file in similar_files]) if similar_files: make_error( f"File ({path}) does not exist. Did you mean any of these files: {similar_files_formatted}?" ) make_error(f"File ({path}) does not exist") elif not path.exists(): make_error(f"File ({path}) does not exist") elif not path.is_file(): make_error(f"File ({path}) is not a file") if audio_content_check and not check_audio_file(path): make_error(f"File ({path}) is not an audio or video file") return path def handle_large_text( text: str, max_length: int = 10000, content_type: str = "content" ): """ Handle large text content by saving to temporary file if it exceeds max_length. Args: text: The text content to handle max_length: Maximum character length before saving to temp file content_type: Description of the content type for user messages Returns: str: Either the original text or a message with temp file path """ if len(text) > max_length: with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False, encoding="utf-8" ) as temp_file: temp_file.write(text) temp_path = temp_file.name return f"{content_type.capitalize()} saved to temporary file: {temp_path}\nUse the Read tool to access the full {content_type}." return text def parse_conversation_transcript(transcript_entries, max_length: int = 50000): """ Parse conversation transcript entries into a formatted string. If transcript is too long, save to temporary file and return file path. Args: transcript_entries: List of transcript entries from conversation response max_length: Maximum character length before saving to temp file Returns: tuple: (transcript_text_or_path, is_temp_file) """ transcript_lines = [] for entry in transcript_entries: speaker = getattr(entry, "role", "Unknown") text = getattr(entry, "message", getattr(entry, "text", "")) timestamp = getattr(entry, "timestamp", None) if timestamp: transcript_lines.append(f"[{timestamp}] {speaker}: {text}") else: transcript_lines.append(f"{speaker}: {text}") transcript = ( "\n".join(transcript_lines) if transcript_lines else "No transcript available" ) # Check if transcript is too long for LLM context window if len(transcript) > max_length: # Create temporary file temp_file = tempfile.SpooledTemporaryFile( mode="w+", max_size=0, encoding="utf-8" ) temp_file.write(transcript) temp_file.seek(0) # Get a persistent temporary file path with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False, encoding="utf-8" ) as persistent_temp: persistent_temp.write(transcript) temp_path = persistent_temp.name return ( f"Transcript saved to temporary file: {temp_path}\nUse the Read tool to access the full transcript.", True, ) return transcript, False def parse_location(api_residency: str | None) -> str: """ Parse the API residency and return the corresponding origin URL. """ origin_map = { "us": "https://api.elevenlabs.io", "eu": "https://api.eu.residency.elevenlabs.io", "eu-residency": "https://api.eu.residency.elevenlabs.io", "in": "https://api.in.residency.elevenlabs.io", "in-residency": "https://api.in.residency.elevenlabs.io", "global": "https://api.elevenlabs.io", } if not api_residency or not api_residency.strip(): return origin_map["us"] api_residency = api_residency.strip().lower() if api_residency not in origin_map: valid_options = ", ".join(f"'{k}'" for k in origin_map.keys()) raise ValueError(f"ELEVENLABS_API_RESIDENCY must be one of {valid_options}") return origin_map[api_residency] def get_mime_type(file_extension: str) -> str: """ Get MIME type for a given file extension. Args: file_extension: File extension (with or without dot) Returns: str: MIME type string """ # Remove leading dot if present ext = file_extension.lstrip(".") mime_types = { "mp3": "audio/mpeg", "wav": "audio/wav", "ogg": "audio/ogg", "flac": "audio/flac", "m4a": "audio/mp4", "aac": "audio/aac", "opus": "audio/opus", "txt": "text/plain", "json": "application/json", "xml": "application/xml", "html": "text/html", "csv": "text/csv", "mp4": "video/mp4", "avi": "video/x-msvideo", "mov": "video/quicktime", "wmv": "video/x-ms-wmv", } return mime_types.get(ext.lower(), "application/octet-stream") def generate_resource_uri(filename: str) -> str: """ Generate a resource URI for a given filename. Args: filename: The filename to generate URI for Returns: str: Resource URI in format elevenlabs://filename """ return f"elevenlabs://{filename}" def create_resource_response( file_data: bytes, filename: str, file_extension: str, directory: Path | None = None ) -> EmbeddedResource: """ Create a proper MCP EmbeddedResource response. Args: file_data: Raw file data as bytes filename: Name of the file file_extension: File extension for MIME type detection directory: Optional directory where the file is or would be saved; used to embed path in URI Returns: EmbeddedResource: Proper MCP resource object """ mime_type = get_mime_type(file_extension) if directory is not None: full_path = (directory / filename) resource_uri = f"elevenlabs://{full_path.as_posix()}" else: resource_uri = generate_resource_uri(filename) # For text files, use TextResourceContents if mime_type.startswith("text/"): try: text_content = file_data.decode("utf-8") return EmbeddedResource( type="resource", resource=TextResourceContents( uri=resource_uri, mimeType=mime_type, text=text_content ), ) except UnicodeDecodeError: # Fall back to binary if decode fails pass # For binary files (audio, etc.), use BlobResourceContents base64_data = base64.b64encode(file_data).decode("utf-8") return EmbeddedResource( type="resource", resource=BlobResourceContents( uri=resource_uri, mimeType=mime_type, blob=base64_data ), ) def handle_output_mode( file_data: bytes, output_path: Path, filename: str, output_mode: str, success_message: str = None, ) -> Union[TextContent, EmbeddedResource]: """ Handle different output modes for file generation. Args: file_data: Raw file data as bytes output_path: Path where file should be saved filename: Name of the file output_mode: Output mode ('files', 'resources', or 'both') success_message: Custom success message for files mode (optional) Returns: Union[TextContent, EmbeddedResource]: TextContent for 'files' mode, EmbeddedResource for 'resources' and 'both' modes """ file_extension = Path(filename).suffix.lstrip(".") full_file_path = output_path / filename if output_mode == "files": # Save to disk and return TextContent with success message output_path.mkdir(parents=True, exist_ok=True) with open(full_file_path, "wb") as f: f.write(file_data) if success_message and "{file_path}" in success_message: message = success_message.replace("{file_path}", str(full_file_path)) else: message = success_message or f"Success. File saved as: {full_file_path}" return TextContent(type="text", text=message) elif output_mode == "resources": # Return as EmbeddedResource without saving to disk return create_resource_response(file_data, filename, file_extension, directory=output_path) elif output_mode == "both": # Save to disk AND return as EmbeddedResource output_path.mkdir(parents=True, exist_ok=True) with open(full_file_path, "wb") as f: f.write(file_data) return create_resource_response(file_data, filename, file_extension, directory=output_path) else: raise ValueError( f"Invalid output mode: {output_mode}. Must be 'files', 'resources', or 'both'" ) def handle_multiple_files_output_mode( results: list[Union[TextContent, EmbeddedResource]], output_mode: str, additional_info: str = None, ) -> Union[TextContent, list[EmbeddedResource]]: """ Handle different output modes for multiple file generation. Args: results: List of results from handle_output_mode calls output_mode: Output mode ('files', 'resources', or 'both') additional_info: Additional information to include in files mode message Returns: Union[TextContent, list[EmbeddedResource]]: TextContent for 'files' mode, list of EmbeddedResource for 'resources' and 'both' modes """ if output_mode == "files": # Extract file paths from TextContent objects and create combined message file_paths = [] for result in results: if isinstance(result, TextContent): # Extract file path from the success message text = result.text if "File saved as: " in text: path = ( text.split("File saved as: ")[1].split(".")[0] + "." + text.split(".")[-1].split(" ")[0] ) file_paths.append(path) message = f"Success. Files saved at: {', '.join(file_paths)}" if additional_info: message += f". {additional_info}" return TextContent(type="text", text=message) elif output_mode in ["resources", "both"]: # Return list of EmbeddedResource objects embedded_resources = [] for result in results: if isinstance(result, EmbeddedResource): embedded_resources.append(result) if not embedded_resources: return TextContent(type="text", text="No files generated") return embedded_resources else: raise ValueError( f"Invalid output mode: {output_mode}. Must be 'files', 'resources', or 'both'" ) def get_output_mode_description(output_mode: str) -> str: """ Generate a dynamic description for the current output mode. Args: output_mode: The current output mode ('files', 'resources', or 'both') Returns: str: Description of how the tool will behave based on the output mode """ if output_mode == "files": return "Saves output file to directory (default: $HOME/Desktop)" elif output_mode == "resources": return "Returns output as base64-encoded MCP resource" elif output_mode == "both": return "Saves file to directory (default: $HOME/Desktop) AND returns as base64-encoded MCP resource" else: return "Output behavior depends on ELEVENLABS_MCP_OUTPUT_MODE setting"