# /// script # requires-python = ">=3.10" # dependencies = [ # "requests>=2,<3", # "mcp>=1.2.0,<2", # ] # /// import sys import requests import argparse import logging from urllib.parse import urljoin from mcp.server.fastmcp import FastMCP DEFAULT_GHIDRA_SERVER = "http://127.0.0.1:8080/" logger = logging.getLogger(__name__) mcp = FastMCP("ghidra-mcp") # Initialize ghidra_server_url with default value ghidra_server_url = DEFAULT_GHIDRA_SERVER def safe_get(endpoint: str, params: dict = None) -> list: """ 执行带有可选查询参数的GET请求。 """ if params is None: params = {} url = urljoin(ghidra_server_url, endpoint) try: response = requests.get(url, params=params, timeout=5) response.encoding = 'utf-8' if response.ok: return response.text.splitlines() else: return [f"Error {response.status_code}: {response.text.strip()}"] except Exception as e: return [f"Request failed: {str(e)}"] def safe_post(endpoint: str, data: dict | str) -> str: try: url = urljoin(ghidra_server_url, endpoint) if isinstance(data, dict): response = requests.post(url, data=data, timeout=5) else: response = requests.post(url, data=data.encode("utf-8"), timeout=5) response.encoding = 'utf-8' if response.ok: return response.text.strip() else: return f"Error {response.status_code}: {response.text.strip()}" except Exception as e: return f"Request failed: {str(e)}" @mcp.tool() def list_methods(offset: int = 0, limit: int = 100) -> list: """ 列出程序中的所有函数名称,支持分页。 """ return safe_get("methods", {"offset": offset, "limit": limit}) @mcp.tool() def list_classes(offset: int = 0, limit: int = 100) -> list: """ 列出程序中的所有命名空间/类名称,支持分页。 """ return safe_get("classes", {"offset": offset, "limit": limit}) @mcp.tool() def decompile_function(name: str) -> str: """ 通过名称反编译特定函数并返回反编译后的C代码。 """ return safe_post("decompile", name) @mcp.tool() def rename_function(old_name: str, new_name: str) -> str: """ 通过当前名称将函数重命名为用户定义的新名称。 """ return safe_post("renameFunction", {"oldName": old_name, "newName": new_name}) @mcp.tool() def rename_data(address: str, new_name: str) -> str: """ 重命名指定地址的数据标签。 """ return safe_post("renameData", {"address": address, "newName": new_name}) @mcp.tool() def list_segments(offset: int = 0, limit: int = 100) -> list: """ 列出程序中的所有内存段,支持分页。 """ return safe_get("segments", {"offset": offset, "limit": limit}) @mcp.tool() def list_imports(offset: int = 0, limit: int = 100) -> list: """ 列出程序中的导入符号,支持分页。 """ return safe_get("imports", {"offset": offset, "limit": limit}) @mcp.tool() def list_exports(offset: int = 0, limit: int = 100) -> list: """ 列出导出的函数/符号,支持分页。 """ return safe_get("exports", {"offset": offset, "limit": limit}) @mcp.tool() def list_namespaces(offset: int = 0, limit: int = 100) -> list: """ 列出程序中的所有非全局命名空间,支持分页。 """ return safe_get("namespaces", {"offset": offset, "limit": limit}) @mcp.tool() def list_data_items(offset: int = 0, limit: int = 100) -> list: """ 列出已定义的数据标签及其值,支持分页。 """ return safe_get("data", {"offset": offset, "limit": limit}) @mcp.tool() def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list: """ 搜索名称包含给定子字符串的函数。 """ if not query: return ["Error: query string is required"] return safe_get("searchFunctions", {"query": query, "offset": offset, "limit": limit}) @mcp.tool() def rename_variable(function_name: str, old_name: str, new_name: str) -> str: """ 重命名函数内的局部变量。 """ return safe_post("renameVariable", { "functionName": function_name, "oldName": old_name, "newName": new_name }) @mcp.tool() def get_function_by_address(address: str) -> str: """ 通过地址获取函数。 """ return "\n".join(safe_get("get_function_by_address", {"address": address})) @mcp.tool() def get_current_address() -> str: """ 获取用户当前选择的地址。 """ return "\n".join(safe_get("get_current_address")) @mcp.tool() def get_current_function() -> str: """ 获取用户当前选择的函数。 """ return "\n".join(safe_get("get_current_function")) @mcp.tool() def list_functions() -> list: """ 列出数据库中的所有函数。 """ return safe_get("list_functions") @mcp.tool() def decompile_function_by_address(address: str) -> str: """ 反编译给定地址的函数。 """ return "\n".join(safe_get("decompile_function", {"address": address})) @mcp.tool() def disassemble_function(address: str) -> list: """ 获取函数的汇编代码(地址:指令;注释)。 """ return safe_get("disassemble_function", {"address": address}) @mcp.tool() def set_decompiler_comment(address: str, comment: str) -> str: """ 在函数伪代码中为给定地址设置注释。 """ return safe_post("set_decompiler_comment", {"address": address, "comment": comment}) @mcp.tool() def set_disassembly_comment(address: str, comment: str) -> str: """ 在函数反汇编中为给定地址设置注释。 """ return safe_post("set_disassembly_comment", {"address": address, "comment": comment}) @mcp.tool() def rename_function_by_address(function_address: str, new_name: str) -> str: """ 通过地址重命名函数。 """ return safe_post("rename_function_by_address", {"function_address": function_address, "new_name": new_name}) @mcp.tool() def set_function_prototype(function_address: str, prototype: str) -> str: """ 设置函数的原型。 """ return safe_post("set_function_prototype", {"function_address": function_address, "prototype": prototype}) @mcp.tool() def set_local_variable_type(function_address: str, variable_name: str, new_type: str) -> str: """ 设置局部变量的类型。 """ return safe_post("set_local_variable_type", {"function_address": function_address, "variable_name": variable_name, "new_type": new_type}) @mcp.tool() def get_xrefs_to(address: str, offset: int = 0, limit: int = 100) -> list: """ 获取指定地址的所有引用(被引用)。 参数: address: 十六进制格式的目标地址(例如:"0x1400010a0") offset: 分页偏移量(默认:0) limit: 返回的最大引用数量(默认:100) 返回: 指定地址的引用列表 """ return safe_get("xrefs_to", {"address": address, "offset": offset, "limit": limit}) @mcp.tool() def get_xrefs_from(address: str, offset: int = 0, limit: int = 100) -> list: """ 获取从指定地址出发的所有引用(引用)。 参数: address: 十六进制格式的源地址(例如:"0x1400010a0") offset: 分页偏移量(默认:0) limit: 返回的最大引用数量(默认:100) 返回: 从指定地址出发的引用列表 """ return safe_get("xrefs_from", {"address": address, "offset": offset, "limit": limit}) @mcp.tool() def get_function_xrefs(name: str, offset: int = 0, limit: int = 100) -> list: """ 获取指定函数名称的所有引用。 参数: name: 要搜索的函数名称 offset: 分页偏移量(默认:0) limit: 返回的最大引用数量(默认:100) 返回: 指定函数的引用列表 """ return safe_get("function_xrefs", {"name": name, "offset": offset, "limit": limit}) @mcp.tool() def list_strings(offset: int = 0, limit: int = 2000, filter: str = None) -> list: """ 列出程序中所有已定义的字符串及其地址。 参数: offset: 分页偏移量(默认:0) limit: 返回的最大字符串数量(默认:2000) filter: 用于匹配字符串内容的可选过滤器 返回: 带有地址的字符串列表 """ params = {"offset": offset, "limit": limit} if filter: params["filter"] = filter return safe_get("strings", params) def main(): parser = argparse.ArgumentParser(description="MCP server for Ghidra") parser.add_argument("--ghidra-server", type=str, default=DEFAULT_GHIDRA_SERVER, help=f"Ghidra server URL, default: {DEFAULT_GHIDRA_SERVER}") parser.add_argument("--mcp-host", type=str, default="127.0.0.1", help="Host to run MCP server on (only used for sse), default: 127.0.0.1") parser.add_argument("--mcp-port", type=int, help="Port to run MCP server on (only used for sse), default: 8081") parser.add_argument("--transport", type=str, default="stdio", choices=["stdio", "sse"], help="Transport protocol for MCP, default: stdio") args = parser.parse_args() # Use the global variable to ensure it's properly updated global ghidra_server_url if args.ghidra_server: ghidra_server_url = args.ghidra_server if args.transport == "sse": try: # Set up logging log_level = logging.INFO logging.basicConfig(level=log_level) logging.getLogger().setLevel(log_level) # Configure MCP settings mcp.settings.log_level = "INFO" if args.mcp_host: mcp.settings.host = args.mcp_host else: mcp.settings.host = "127.0.0.1" if args.mcp_port: mcp.settings.port = args.mcp_port else: mcp.settings.port = 8081 logger.info(f"Connecting to Ghidra server at {ghidra_server_url}") logger.info(f"Starting MCP server on http://{mcp.settings.host}:{mcp.settings.port}/sse") logger.info(f"Using transport: {args.transport}") mcp.run(transport="sse") except KeyboardInterrupt: logger.info("Server stopped by user") else: mcp.run() if __name__ == "__main__": main()