--- name: genie-integration description: Integrate Databricks Genie rooms as tools in agent workflows. Use when integrating Genie spaces with AI agents, querying Genie rooms programmatically via SDK or MCP, managing Genie conversations and polling, handling Genie API responses and errors, or building tool-calling agents that use Genie as a data source. Covers SDK patterns, MCP tool integration, conversation management, error handling, and performance optimization for Genie-based agent tools. --- # Genie Integration Patterns Integrate Databricks Genie rooms as powerful tools in your agent workflows using SDK or MCP approaches. ## Core Concepts ### What is Databricks Genie? Genie is a conversational BI interface that: - Translates natural language to SQL - Executes queries against your data - Returns formatted results and visualizations - Maintains conversation context **Key advantage:** Existing Genie rooms become instant agent tools without rebuilding data pipelines. ### Integration Approaches **SDK Integration:** - Direct control via Databricks Python SDK - Manual conversation management - Flexible polling strategies - Best for custom workflows **MCP Tools:** - Pre-configured tool interfaces - Simplified API surface - Built-in polling logic - Best for standard patterns ## Problem-Solution Patterns ### Problem 1: Genie Query Timeouts **Symptoms:** - Queries hang indefinitely - Agent gets stuck waiting for response - No error handling for slow queries **Root causes:** - Insufficient polling timeout - Complex SQL generated by Genie - Large dataset queries - No backoff strategy **Solution:** ```python @tool def query_genie_with_timeout(question: str, space_id: str, max_attempts: int = 30) -> str: """Query Genie with proper timeout handling""" import time from databricks.sdk import WorkspaceClient w = WorkspaceClient() try: # Start conversation response = w.genie.start_conversation( space_id=space_id, content=question ) conversation_id = response.conversation_id message_id = response.message_id # Poll with exponential backoff wait_time = 2 for attempt in range(max_attempts): message = w.genie.get_message( space_id=space_id, conversation_id=conversation_id, message_id=message_id ) if message.status == "COMPLETED": return extract_response(message) elif message.status in ["FAILED", "CANCELLED"]: return f"Query failed: {message.status}. Try simplifying your question." # Exponential backoff: 2s, 2s, 4s, 4s, 8s, 8s... time.sleep(wait_time) if attempt % 2 == 1: # Double wait time every 2 attempts wait_time = min(wait_time * 2, 10) # Cap at 10 seconds return "Query timeout. The query may be too complex or the dataset too large." except Exception as e: return f"Error: {str(e)}" def extract_response(message) -> str: """Extract formatted response from Genie message""" response_text = "" if message.attachments: for attachment in message.attachments: if hasattr(attachment, 'text') and attachment.text: response_text += attachment.text.content + "\n" elif hasattr(attachment, 'query') and attachment.query: if hasattr(attachment.query, 'description'): response_text += f"{attachment.query.description}\n" return response_text or message.content or "No response available" ``` ### Problem 2: Verbose Genie Responses Confuse Agent **Symptoms:** - Agent overwhelmed by SQL query text - Long table outputs cause token limits - Agent can't synthesize due to noise **Root causes:** - Not filtering Genie response attachments - Including raw SQL in tool output - No summarization of large results **Solution:** ```python @tool def query_genie_concise(question: str, space_id: str) -> str: """Query Genie and return concise, agent-friendly response""" from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Get raw response (using pattern from Problem 1) response = w.genie.start_conversation(space_id=space_id, content=question) message = poll_for_completion(w, space_id, response.conversation_id, response.message_id) # Extract ONLY the natural language summary if message.attachments: for attachment in message.attachments: # Prioritize text summaries over raw SQL if hasattr(attachment, 'text') and attachment.text: # Return first text attachment (usually the summary) return attachment.text.content # If query result, extract key insights only if hasattr(attachment, 'query') and attachment.query: if hasattr(attachment.query, 'description'): desc = attachment.query.description # Truncate if too long if len(desc) > 500: return desc[:500] + "... [truncated for brevity]" return desc return message.content or "No clear response from Genie" ``` **Best practice:** Return summaries, not raw data. Let Genie do the summarization. ### Problem 3: Losing Conversation Context **Symptoms:** - Each query starts fresh conversation - Agent can't do follow-up questions - "What about last month?" fails **Root causes:** - Not tracking conversation IDs - Creating new conversation for each query - No conversation state management **Solution:** ```python class GenieConversationManager: """Manage ongoing conversations with Genie rooms""" def __init__(self): self.workspace_client = WorkspaceClient() self.conversations = {} # space_id -> conversation_id @tool def query_genie_contextual(self, question: str, space_id: str) -> str: """ Query Genie while maintaining conversation context. Automatically continues existing conversations or starts new ones. """ # Check if we have an active conversation for this space conversation_id = self.conversations.get(space_id) if conversation_id: # Continue existing conversation response = self.workspace_client.genie.create_message( space_id=space_id, conversation_id=conversation_id, content=question ) else: # Start new conversation response = self.workspace_client.genie.start_conversation( space_id=space_id, content=question ) # Save conversation ID self.conversations[space_id] = response.conversation_id # Poll and return (using patterns from above) message = poll_for_completion( self.workspace_client, space_id, response.conversation_id, response.message_id ) return extract_response(message) def reset_conversation(self, space_id: str): """Start fresh conversation for a space""" if space_id in self.conversations: del self.conversations[space_id] ``` **Usage in agent:** ```python # Initialize once for agent lifecycle genie_manager = GenieConversationManager() @tool def query_customer_behavior(question: str) -> str: """Query customer behavior Genie room""" return genie_manager.query_genie_contextual( question=question, space_id="01f09cdbacf01b5fa7ff7c237365502c" ) ``` ### Problem 4: Genie Space ID Management **Symptoms:** - Hard-coded space IDs scattered in code - Errors when space IDs change - Difficulty managing multiple environments **Root causes:** - No centralized configuration - Space IDs embedded in tool definitions - No environment-aware setup **Solution:** ```python # config.py from dataclasses import dataclass from typing import Dict import os @dataclass class GenieSpaceConfig: space_id: str name: str description: str class GenieConfig: """Centralized Genie space configuration""" def __init__(self, environment: str = None): self.environment = environment or os.getenv("DATABRICKS_ENV", "production") self.spaces = self._load_spaces() def _load_spaces(self) -> Dict[str, GenieSpaceConfig]: """Load space configurations per environment""" # Production spaces if self.environment == "production": return { "customer_behavior": GenieSpaceConfig( space_id="01f09cdbacf01b5fa7ff7c237365502c", name="Customer Behavior Analysis", description="Customer trends and preferences" ), "inventory": GenieSpaceConfig( space_id="02a10defbcg02c6ga8gg8d348476613d", name="Real-Time Inventory", description="Stock levels and turnover" ) } # Development spaces elif self.environment == "development": return { "customer_behavior": GenieSpaceConfig( space_id="dev_customer_space_id", name="Customer Behavior (Dev)", description="Dev customer data" ), "inventory": GenieSpaceConfig( space_id="dev_inventory_space_id", name="Inventory (Dev)", description="Dev inventory data" ) } raise ValueError(f"Unknown environment: {self.environment}") def get_space(self, name: str) -> GenieSpaceConfig: """Get space configuration by name""" if name not in self.spaces: raise ValueError(f"Unknown Genie space: {name}") return self.spaces[name] # Usage in tools config = GenieConfig() @tool def query_customer_behavior(question: str) -> str: """Query customer behavior""" space = config.get_space("customer_behavior") return query_genie(question, space.space_id) ``` ## SDK Integration Pattern ### Complete SDK-Based Tool ```python from databricks.sdk import WorkspaceClient from langchain.tools import tool import time @tool def query_genie_sdk( question: str, space_id: str, conversation_id: str = None, max_attempts: int = 30 ) -> str: """ Query Databricks Genie room using SDK. Args: question: Natural language question space_id: Genie space ID conversation_id: Optional conversation ID to continue conversation max_attempts: Max polling attempts Returns: Genie's response as string """ w = WorkspaceClient() try: # Start or continue conversation if conversation_id: response = w.genie.create_message( space_id=space_id, conversation_id=conversation_id, content=question ) else: response = w.genie.start_conversation( space_id=space_id, content=question ) conv_id = response.conversation_id msg_id = response.message_id # Poll for completion with exponential backoff wait_time = 2 for attempt in range(max_attempts): message = w.genie.get_message( space_id=space_id, conversation_id=conv_id, message_id=msg_id ) # Check status if message.status == "COMPLETED": # Extract response result = "" if message.attachments: for attachment in message.attachments: if hasattr(attachment, 'text') and attachment.text: result += attachment.text.content + "\n" elif hasattr(attachment, 'query') and attachment.query: if hasattr(attachment.query, 'description'): result += attachment.query.description + "\n" return result.strip() or message.content or "No response" elif message.status in ["FAILED", "CANCELLED"]: return f"Query failed: {message.status}" # Wait with backoff time.sleep(wait_time) if attempt % 2 == 1: wait_time = min(wait_time * 2, 10) return "Query timeout after 60 seconds" except Exception as e: return f"Error querying Genie: {str(e)}" ``` ## MCP Tool Integration Pattern ### Understanding MCP Tools MCP (Model Context Protocol) tools provide a higher-level interface: ```python # MCP tools are pre-configured for specific Genie spaces # They handle polling and conversation management automatically # Available in environment as: # - query_space_ # - poll_response_ # Example: Using existing MCP tool from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Call Genie via MCP result = w.genie.query_space( space_id="01f09cdbacf01b5fa7ff7c237365502c", query="What products are trending?", conversation_id=None # Optional: continue conversation ) ``` ### When to Use MCP vs SDK **Use MCP tools when:** - Standard query/response pattern - Don't need custom polling logic - Want simpler code - MCP tools available for your spaces **Use SDK when:** - Need custom timeout handling - Complex conversation management - Custom response parsing - MCP tools not configured ## Performance Optimization ### Pattern 1: Caching Genie Responses ```python from functools import lru_cache from datetime import datetime, timedelta class CachedGenieQuery: """Cache Genie responses with TTL""" def __init__(self, ttl_minutes: int = 15): self.cache = {} self.ttl = timedelta(minutes=ttl_minutes) def query(self, question: str, space_id: str) -> str: """Query with caching""" cache_key = f"{space_id}:{question}" # Check cache if cache_key in self.cache: result, timestamp = self.cache[cache_key] if datetime.now() - timestamp < self.ttl: return result # Query Genie result = query_genie_sdk(question, space_id) # Cache result self.cache[cache_key] = (result, datetime.now()) return result ``` ### Pattern 2: Parallel Genie Queries ```python import concurrent.futures def query_multiple_genie_spaces(questions: list[tuple[str, str]]) -> list[str]: """ Query multiple Genie spaces in parallel. Args: questions: List of (question, space_id) tuples Returns: List of responses in same order """ def query_one(question_space): question, space_id = question_space return query_genie_sdk(question, space_id) with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: results = list(executor.map(query_one, questions)) return results # Usage in agent questions = [ ("What products are trending?", "customer_space_id"), ("Which locations have high turnover?", "inventory_space_id") ] results = query_multiple_genie_spaces(questions) ``` ## Error Handling Best Practices ### Comprehensive Error Strategy ```python @tool def query_genie_robust(question: str, space_id: str) -> str: """Query Genie with comprehensive error handling""" from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError w = WorkspaceClient() try: response = w.genie.start_conversation( space_id=space_id, content=question ) message = poll_for_completion( w, space_id, response.conversation_id, response.message_id ) if message.status == "COMPLETED": return extract_response(message) elif message.status == "FAILED": # Provide actionable error message return ("Query failed. This could be due to:\n" "- Invalid SQL generated\n" "- Data source unavailable\n" "- Permissions issue\n" "Try rephrasing your question or check Genie room configuration.") else: return f"Unexpected status: {message.status}" except DatabricksError as e: if "not found" in str(e).lower(): return "Genie space not found. Check space ID configuration." elif "permission" in str(e).lower(): return "Permission denied. Ensure agent has access to Genie space." else: return f"Databricks API error: {str(e)}" except TimeoutError: return "Query timeout. Try a simpler question or check data volume." except Exception as e: return f"Unexpected error: {str(e)}" ``` ## Testing Genie Integration ### Unit Test Pattern ```python def test_genie_tool(): """Test Genie tool in isolation""" # Test 1: Simple query result = query_genie_sdk( question="How many customers do we have?", space_id="01f09cdbacf01b5fa7ff7c237365502c" ) assert result, "Should return non-empty result" assert "error" not in result.lower(), "Should not contain error" # Test 2: Invalid space ID result = query_genie_sdk( question="test", space_id="invalid_id" ) assert "not found" in result.lower() or "error" in result.lower() # Test 3: Conversation continuity # (Test that conversation_id parameter works) ``` ### Integration Test Pattern ```python def test_genie_in_agent(): """Test Genie tool within agent workflow""" from your_agent import GenieAgent agent = GenieAgent() # Test single-tool query result = agent.query("What products are trending?") assert "customer_behavior" in result['intermediate_steps'][0][0].tool # Test multi-tool query result = agent.query("Trending products at risk of overstock?") tools_called = [step[0].tool for step in result['intermediate_steps']] assert "customer_behavior" in tools_called assert "inventory" in tools_called ``` ## Quick Reference ### Minimum Viable Genie Tool (SDK) ```python from databricks.sdk import WorkspaceClient from langchain.tools import tool import time @tool def query_genie(question: str, space_id: str) -> str: """Query Genie room""" w = WorkspaceClient() resp = w.genie.start_conversation(space_id=space_id, content=question) for _ in range(30): msg = w.genie.get_message(space_id, resp.conversation_id, resp.message_id) if msg.status == "COMPLETED": return msg.attachments[0].text.content if msg.attachments else msg.content elif msg.status in ["FAILED", "CANCELLED"]: return f"Failed: {msg.status}" time.sleep(2) return "Timeout" ``` ## Related Skills - **mosaic-ai-agent**: Design agents that use Genie tools - **agent-mlops**: Deploy Genie-powered agents to production