""" Example Python client for testing the Hologres MCP Server in HTTP Stream mode. This script demonstrates how to connect to the server and use its features. """ import requests import json import time import os import sseclient # Configuration SERVER_URL = "http://localhost:8001" # Change to your server's URL HTTP_STREAM_ENDPOINT = f"{SERVER_URL}/mcp" def send_batch_request(requests_list): """Send a batch of JSON-RPC requests to the server.""" print(f"Sending batch request with {len(requests_list)} requests") response = requests.post(HTTP_STREAM_ENDPOINT, json=requests_list) if response.status_code != 200: raise Exception(f"Failed to send batch request: {response.status_code}") result = response.json() print(f"Received batch response with {len(result)} responses") return result def send_stream_request(requests_list): """Send a batch of JSON-RPC requests to the server in streaming mode.""" print(f"Sending stream request with {len(requests_list)} requests") headers = {"Accept": "text/event-stream"} response = requests.post(HTTP_STREAM_ENDPOINT, json=requests_list, headers=headers, stream=True) if response.status_code != 200: raise Exception(f"Failed to send stream request: {response.status_code}") client = sseclient.SSEClient(response) results = [] for event in client.events(): if event.event == "message": result = json.loads(event.data) results.append(result) print(f"Received stream response: {json.dumps(result, indent=2)}") return results def send_message(method, params=None, stream_mode=False): """Send a single JSON-RPC message to the server.""" message = { "jsonrpc": "2.0", "id": int(time.time() * 1000), "method": method } if params: message["params"] = params print(f"Sending message: {json.dumps(message, indent=2)}") if stream_mode: headers = {"Accept": "text/event-stream"} response = requests.post(HTTP_STREAM_ENDPOINT, json=message, headers=headers, stream=True) if response.status_code != 200: raise Exception(f"Failed to send message: {response.status_code}") client = sseclient.SSEClient(response) for event in client.events(): if event.event == "message": result = json.loads(event.data) print(f"Received stream response: {json.dumps(result, indent=2)}") return result else: response = requests.post(HTTP_STREAM_ENDPOINT, json=message) if response.status_code != 200: raise Exception(f"Failed to send message: {response.status_code}") result = response.json() print(f"Received response: {json.dumps(result, indent=2)}") return result def initialize(stream_mode=False): """Initialize the connection to the server.""" return send_message("initialize", { "clientInfo": { "name": "python-http-stream-test-client", "version": "1.0.0" } }, stream_mode) def list_tools(stream_mode=False): """List available tools.""" return send_message("listTools", None, stream_mode) def call_tool(name, arguments, stream_mode=False): """Call a tool with arguments.""" return send_message("callTool", { "name": name, "arguments": arguments }, stream_mode) def read_resource(uri, stream_mode=False): """Read a resource by URI.""" return send_message("readResource", { "uri": uri }, stream_mode) def test_batch_mode(): """Test HTTP Stream Transport in batch mode.""" print("\n=== Testing HTTP Stream Transport in Batch Mode ===") # Create a batch of requests batch_requests = [ { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "clientInfo": { "name": "python-http-stream-test-client", "version": "1.0.0" } } }, { "jsonrpc": "2.0", "id": 2, "method": "listTools" }, { "jsonrpc": "2.0", "id": 3, "method": "callTool", "params": { "name": "execute_sql", "arguments": { "query": "SELECT 'Hello, Hologres!' AS greeting" } } } ] # Send batch request batch_responses = send_batch_request(batch_requests) # Verify responses assert len(batch_responses) == 3, "Should receive 3 responses" assert batch_responses[0]["id"] == 1, "First response should have id 1" assert batch_responses[1]["id"] == 2, "Second response should have id 2" assert batch_responses[2]["id"] == 3, "Third response should have id 3" print("Batch mode test completed successfully!") def test_stream_mode(): """Test HTTP Stream Transport in streaming mode.""" print("\n=== Testing HTTP Stream Transport in Streaming Mode ===") # Create a batch of requests batch_requests = [ { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "clientInfo": { "name": "python-http-stream-test-client", "version": "1.0.0" } } }, { "jsonrpc": "2.0", "id": 2, "method": "listTools" }, { "jsonrpc": "2.0", "id": 3, "method": "callTool", "params": { "name": "execute_sql", "arguments": { "query": "SELECT 'Hello, Hologres!' AS greeting" } } } ] # Send stream request stream_responses = send_stream_request(batch_requests) # Verify responses assert len(stream_responses) == 3, "Should receive 3 responses" assert stream_responses[0]["id"] == 1, "First response should have id 1" assert stream_responses[1]["id"] == 2, "Second response should have id 2" assert stream_responses[2]["id"] == 3, "Third response should have id 3" print("Stream mode test completed successfully!") def test_single_requests(): """Test individual requests in both batch and stream modes.""" print("\n=== Testing Individual Requests ===") # Test initialize in batch mode print("\nTesting initialize in batch mode:") initialize_response = initialize(stream_mode=False) assert "result" in initialize_response, "Initialize response should contain 'result'" # Test initialize in stream mode print("\nTesting initialize in stream mode:") initialize_stream_response = initialize(stream_mode=True) assert "result" in initialize_stream_response, "Initialize stream response should contain 'result'" # Test listTools in batch mode print("\nTesting listTools in batch mode:") list_tools_response = list_tools(stream_mode=False) assert "result" in list_tools_response, "ListTools response should contain 'result'" assert "tools" in list_tools_response["result"], "Result should contain 'tools'" # Test callTool in batch mode print("\nTesting callTool in batch mode:") call_tool_response = call_tool("execute_sql", { "query": "SELECT 'Hello, Hologres!' AS greeting" }, stream_mode=False) assert "result" in call_tool_response, "CallTool response should contain 'result'" assert "output" in call_tool_response["result"], "Result should contain 'output'" # Test readResource in batch mode print("\nTesting readResource in batch mode:") read_resource_response = read_resource("hologres:///schemas", stream_mode=False) assert "result" in read_resource_response, "ReadResource response should contain 'result'" assert "content" in read_resource_response["result"], "Result should contain 'content'" print("Individual requests test completed successfully!") def main(): """Run the example client.""" try: # Test batch mode test_batch_mode() # Test stream mode test_stream_mode() # Test individual requests test_single_requests() print("\nAll tests completed successfully!") except Exception as e: print(f"Error: {str(e)}") finally: print("Exiting...") if __name__ == "__main__": main()