--- name: orchestrating-agents description: Orchestrates parallel API instances, delegated sub-tasks, and multi-agent workflows with streaming and tool-enabled delegation patterns. Use for parallel analysis, multi-perspective reviews, or complex task decomposition. metadata: version: 0.4.0 --- # Orchestrating Agents This skill enables programmatic API invocations for advanced workflows including parallel processing, task delegation, and multi-agent analysis using the Anthropic API. ## When to Use This Skill **Primary use cases:** - **Parallel sub-tasks**: Break complex analysis into simultaneous independent streams - **Multi-perspective analysis**: Get 3-5 different expert viewpoints concurrently - **Delegation**: Offload specific subtasks to specialized API instances - **Recursive workflows**: Orchestrator coordinating multiple API instances - **High-volume processing**: Batch process multiple items concurrently **Trigger patterns:** - "Parallel analysis", "multi-perspective review", "concurrent processing" - "Delegate subtasks", "coordinate multiple agents" - "Run analyses from different perspectives" - "Get expert opinions from multiple angles" ## Quick Start ### Single Invocation ```python import sys sys.path.append('/home/user/claude-skills/orchestrating-agents/scripts') from claude_client import invoke_claude response = invoke_claude( prompt="Analyze this code for security vulnerabilities: ...", model="claude-sonnet-4-6" ) print(response) ``` ### Parallel Multi-Perspective Analysis ```python from claude_client import invoke_parallel prompts = [ { "prompt": "Analyze from security perspective: ...", "system": "You are a security expert" }, { "prompt": "Analyze from performance perspective: ...", "system": "You are a performance optimization expert" }, { "prompt": "Analyze from maintainability perspective: ...", "system": "You are a software architecture expert" } ] results = invoke_parallel(prompts, model="claude-sonnet-4-6") for i, result in enumerate(results): print(f"\n=== Perspective {i+1} ===") print(result) ``` ### Parallel with Shared Cached Context (Recommended) For parallel operations with shared base context, use caching to reduce costs by up to 90%: ```python from claude_client import invoke_parallel # Large context shared across all sub-agents (e.g., codebase, documentation) base_context = """ ...large codebase or documentation (1000+ tokens)... """ prompts = [ {"prompt": "Find security vulnerabilities in the authentication module"}, {"prompt": "Identify performance bottlenecks in the API layer"}, {"prompt": "Suggest refactoring opportunities in the database layer"} ] # First sub-agent creates cache, subsequent ones reuse it results = invoke_parallel( prompts, shared_system=base_context, cache_shared_system=True # 90% cost reduction for cached content ) ``` ### Multi-Turn Conversation with Auto-Caching For sub-agents that need multiple rounds of conversation: ```python from claude_client import ConversationThread # Create a conversation thread (auto-caches history) agent = ConversationThread( system="You are a code refactoring expert with access to the codebase", cache_system=True ) # Turn 1: Initial analysis response1 = agent.send("Analyze the UserAuth class for issues") print(response1) # Turn 2: Follow-up (reuses cached system + turn 1) response2 = agent.send("How would you refactor the login method?") print(response2) # Turn 3: Implementation (reuses all previous context) response3 = agent.send("Show me the refactored code") print(response3) ``` ### Streaming Responses For real-time feedback from sub-agents: ```python from claude_client import invoke_claude_streaming def show_progress(chunk): print(chunk, end='', flush=True) response = invoke_claude_streaming( "Write a comprehensive security analysis...", callback=show_progress ) ``` ### Parallel Streaming Monitor multiple sub-agents simultaneously: ```python from claude_client import invoke_parallel_streaming def agent1_callback(chunk): print(f"[Security] {chunk}", end='', flush=True) def agent2_callback(chunk): print(f"[Performance] {chunk}", end='', flush=True) results = invoke_parallel_streaming( [ {"prompt": "Security review: ..."}, {"prompt": "Performance review: ..."} ], callbacks=[agent1_callback, agent2_callback] ) ``` ### Interruptible Operations Cancel long-running parallel operations: ```python from claude_client import invoke_parallel_interruptible, InterruptToken import threading import time token = InterruptToken() # Run in background def run_analysis(): results = invoke_parallel_interruptible( prompts=[...], interrupt_token=token ) return results thread = threading.Thread(target=run_analysis) thread.start() # Interrupt after 5 seconds time.sleep(5) token.interrupt() ``` ## Core Functions ### `invoke_claude()` Single synchronous invocation with full control: ```python invoke_claude( prompt: str | list[dict], model: str = "claude-sonnet-4-6", system: str | list[dict] | None = None, max_tokens: int = 4096, temperature: float = 1.0, streaming: bool = False, cache_system: bool = False, cache_prompt: bool = False, messages: list[dict] | None = None, **kwargs ) -> str ``` **Parameters:** - `prompt`: The user message (string or list of content blocks) - `model`: Claude model to use (default: claude-sonnet-4-6) - `system`: Optional system prompt (string or list of content blocks) - `max_tokens`: Maximum tokens in response (default: 4096) - `temperature`: Randomness 0-1 (default: 1.0) - `streaming`: Enable streaming response (default: False) - `cache_system`: Add cache_control to system prompt (requires 1024+ tokens, default: False) - `cache_prompt`: Add cache_control to user prompt (requires 1024+ tokens, default: False) - `messages`: Pre-built messages list for multi-turn (overrides prompt) - `**kwargs`: Additional API parameters (top_p, top_k, etc.) **Returns:** Response text as string **Note:** Caching requires minimum 1,024 tokens per cache breakpoint. Cache lifetime is 5 minutes (refreshed on use). ### `invoke_parallel()` Concurrent invocations using lightweight workflow pattern: ```python invoke_parallel( prompts: list[dict], model: str = "claude-sonnet-4-6", max_tokens: int = 4096, max_workers: int = 5, shared_system: str | list[dict] | None = None, cache_shared_system: bool = False ) -> list[str] ``` **Parameters:** - `prompts`: List of dicts with 'prompt' (required) and optional 'system', 'temperature', 'cache_system', 'cache_prompt', etc. - `model`: Claude model for all invocations - `max_tokens`: Max tokens per response - `max_workers`: Max concurrent API calls (default: 5, max: 10) - `shared_system`: System context shared across ALL invocations (for cache efficiency) - `cache_shared_system`: Add cache_control to shared_system (default: False) **Returns:** List of response strings in same order as prompts **Note:** For optimal cost savings, put large common context (1024+ tokens) in `shared_system` with `cache_shared_system=True`. First invocation creates cache, subsequent ones reuse it (90% cost reduction). ### `invoke_claude_streaming()` Stream responses in real-time with optional callbacks: ```python invoke_claude_streaming( prompt: str | list[dict], callback: callable = None, model: str = "claude-sonnet-4-6", system: str | list[dict] | None = None, max_tokens: int = 4096, temperature: float = 1.0, cache_system: bool = False, cache_prompt: bool = False, **kwargs ) -> str ``` **Parameters:** - `callback`: Optional function called with each text chunk (str) as it arrives - (other parameters same as invoke_claude) **Returns:** Complete accumulated response text ### `invoke_parallel_streaming()` Parallel invocations with per-agent streaming callbacks: ```python invoke_parallel_streaming( prompts: list[dict], callbacks: list[callable] = None, model: str = "claude-sonnet-4-6", max_tokens: int = 4096, max_workers: int = 5, shared_system: str | list[dict] | None = None, cache_shared_system: bool = False ) -> list[str] ``` **Parameters:** - `callbacks`: Optional list of callback functions, one per prompt - (other parameters same as invoke_parallel) ### `invoke_parallel_interruptible()` Parallel invocations with cancellation support: ```python invoke_parallel_interruptible( prompts: list[dict], interrupt_token: InterruptToken = None, # ... same other parameters as invoke_parallel ) -> list[str] ``` **Parameters:** - `interrupt_token`: Optional InterruptToken to signal cancellation - (other parameters same as invoke_parallel) **Returns:** List of response strings (None for interrupted tasks) ### `ConversationThread` Manages multi-turn conversations with automatic caching: ```python thread = ConversationThread( system: str | list[dict] | None = None, model: str = "claude-sonnet-4-6", max_tokens: int = 4096, temperature: float = 1.0, cache_system: bool = True ) response = thread.send( user_message: str | list[dict], cache_history: bool = True ) -> str ``` **Methods:** - `send(message, cache_history=True)`: Send message and get response - `get_messages()`: Get conversation history - `clear()`: Clear conversation history - `__len__()`: Get number of turns **New in 0.3.0:** - `turn_count` property: Number of completed turn pairs - `send_continuation(guidance, cache_history)`: Lightweight continuation turn (requires prior `send()`) - `max_turns` constructor parameter: Optional turn limit - `continuation_prompt` constructor parameter: Default continuation guidance ### `StallDetector` Monitors activity timestamps and detects unresponsive operations: ```python from claude_client import StallDetector def handle_stall(task_id, idle_seconds): print(f"Task {task_id} stalled for {idle_seconds:.1f}s") detector = StallDetector(timeout=60.0, on_stall=handle_stall) detector.register("task-1") detector.start_monitoring(poll_interval=5.0) # Call heartbeat() during streaming/progress detector.heartbeat("task-1") # When done detector.unregister("task-1") detector.stop_monitoring() ``` ### `TaskTracker` (task_state module) Formal task lifecycle state machine with enforced transitions: ```python from task_state import TaskTracker, TaskState tracker = TaskTracker(max_retries=3) tracker.add("task-1", category="security") tracker.claim("task-1") # UNCLAIMED → CLAIMED tracker.start("task-1") # CLAIMED → RUNNING (increments attempt) tracker.complete("task-1") # RUNNING → COMPLETED # On failure with retry: tracker.fail("task-2", error="timeout") tracker.retry("task-2") # FAILED → RETRY_QUEUED (if under max_retries) tracker.claim("task-2") # RETRY_QUEUED → CLAIMED # Query state tracker.active_count(category="security") tracker.get_by_state(TaskState.RUNNING) tracker.summary() # {"completed": 1, "running": 1, ...} ``` ### `invoke_with_retry()` (orchestration module) Single invocation with exponential backoff: ```python from orchestration import invoke_with_retry response = invoke_with_retry( "Analyze this code...", max_retries=3, base_delay_ms=1000, # 1s, 2s, 4s backoff max_delay_ms=10000, # capped at 10s ) ``` ### `invoke_parallel_managed()` (orchestration module) Full-featured parallel invocations with all Symphony patterns: ```python from orchestration import invoke_parallel_managed, ConcurrencyLimiter limiter = ConcurrencyLimiter( global_limit=10, category_limits={"security": 3, "perf": 3} ) def reconcile(prompts, tracker): # Filter out invalid/duplicate work before dispatch return [p for p in prompts if should_run(p)] results = invoke_parallel_managed( prompts=[ {"prompt": "Security review...", "task_id": "sec-1", "category": "security"}, {"prompt": "Perf review...", "task_id": "perf-1", "category": "perf"}, ], reconcile=reconcile, concurrency_limiter=limiter, max_retries=3, stall_timeout=60.0, on_stall=lambda tid, idle: print(f"{tid} stalled"), ) ``` ## Example Workflows See [references/workflows.md](references/workflows.md) for detailed examples including: - Multi-expert code review - Parallel document analysis - Recursive task delegation - Advanced Agent SDK delegation patterns - Prompt caching workflows ## Execute Mode (Default Sub-Agent Prompt) For autonomous sub-agents that should execute without asking questions: ```python from claude_client import invoke_claude, EXECUTE_MODE response = invoke_claude( prompt="Review auth.py for SQL injection vulnerabilities", system=f"You are a security expert.\n\n{EXECUTE_MODE}" ) ``` `EXECUTE_MODE` encodes these principles (adapted from OpenAI Codex): - Make assumptions instead of asking questions; state them briefly - Think ahead: what else might be needed? - Report failures with what you tried and what you'll do next - Summarize deliverables and how to validate them ## Agent Pool (Named Agents with Messaging) For workflows where multiple agents need to communicate: ```python from agent_pool import AgentPool pool = AgentPool( shared_system="You are reviewing the auth module of a web app.", max_depth=3, # prevent recursive spawn explosion max_agents=10, ) # Spawn named agents with roles pool.spawn("security", system=f"Focus on vulnerabilities.\n\n{pool.EXECUTE_MODE}") pool.spawn("perf", system=f"Focus on performance.\n\n{pool.EXECUTE_MODE}") # Run turns (pending inter-agent messages auto-injected) sec_result = pool.run("security", "Review the login flow") # Agent-to-agent messaging pool.send("security", to="perf", content="Auth does N+1 queries in the session check loop", trigger_turn=True) # auto-runs perf with this context # Broadcast to all agents pool.broadcast("security", "Auth uses bcrypt cost=12, 200ms per hash") # Query pool state pool.agents() # ["security", "perf"] pool.agent_info("perf") # {name, depth, children, pending_messages, turns} ``` ### Spawn Reservation (Atomic Agent Creation) For complex workflows where agent creation might fail: ```python from agent_pool import AgentPool pool = AgentPool(shared_system="Code review team") # Reservation pattern: name is reserved, rolled back on exception with pool.reserve("analyst", parent="lead") as res: res.configure(system="You analyze code complexity.", model="claude-opus-4-6") # If configure or any other work raises, the name is released # Agent "analyst" is now live # Depth limits prevent unbounded recursion pool.spawn("sub-analyst", parent="analyst") # depth=2, OK pool.spawn("sub-sub", parent="sub-analyst") # depth=3, raises ValueError ``` ### When to Use AgentPool vs invoke_parallel | Pattern | Use When | |---------|----------| | `invoke_parallel()` | Independent tasks, no inter-agent communication needed | | `AgentPool` | Agents need to share findings, build on each other's work, or have parent/child relationships | | `invoke_parallel_managed()` | Independent tasks with retry, stall detection, concurrency limits | ## Setup **Prerequisites:** 1. Install anthropic library: ```bash uv pip install anthropic ``` 2. Configure API key via project knowledge file: **Option 1 (recommended): Individual file** - Create document: `ANTHROPIC_API_KEY.txt` - Content: Your API key (e.g., `sk-ant-api03-...`) **Option 2: Combined file** - Create document: `API_CREDENTIALS.json` - Content: ```json { "anthropic_api_key": "sk-ant-api03-..." } ``` Get your API key: https://console.anthropic.com/settings/keys Installation check: ```bash python3 -c "import anthropic; print(f'✓ anthropic {anthropic.__version__}')" ``` ## Error Handling The module provides comprehensive error handling: ```python from claude_client import invoke_claude, ClaudeInvocationError try: response = invoke_claude("Your prompt here") except ClaudeInvocationError as e: print(f"API Error: {e}") print(f"Status: {e.status_code}") print(f"Details: {e.details}") except ValueError as e: print(f"Configuration Error: {e}") ``` Common errors: - **API key missing**: Add ANTHROPIC_API_KEY.txt to project knowledge (see Setup above) - **Rate limits**: Reduce max_workers or add delays - **Token limits**: Reduce prompt size or max_tokens - **Network errors**: Automatic retry with exponential backoff ## Prompt Caching For detailed caching workflows and best practices, see [references/workflows.md](references/workflows.md#prompt-caching-workflows). ## Performance Considerations **Token efficiency:** - Parallel calls use more tokens but save wall-clock time - Use prompt caching for shared context (90% cost reduction) - Use concise system prompts to reduce overhead - Consider token budgets when setting max_tokens **Rate limits:** - Anthropic API has per-minute rate limits - Default max_workers=5 is safe for most tiers - Adjust based on your API tier and rate limits **Cost management:** - Each invocation consumes API credits - Monitor usage in Anthropic Console - Use smaller models (haiku) for simple tasks - Use prompt caching for repeated context (90% savings) - Cache lifetime: 5 minutes, refreshed on each use ## Best Practices 1. **Use parallel invocations for independent tasks only** - Don't parallelize sequential dependencies - Each parallel task should be self-contained 2. **Set appropriate system prompts** - Define clear roles/expertise for each instance - Keeps responses focused and relevant 3. **Handle errors gracefully** - Always wrap invocations in try-except - Provide fallback behavior for failures 4. **Test with small batches first** - Verify prompts work before scaling - Check token usage and costs 5. **Consider alternatives** - Not all tasks benefit from multiple instances - Sometimes sequential with context is better ## Token Efficiency This skill uses ~800 tokens when loaded but enables powerful multi-agent patterns that can dramatically improve complex analysis quality and speed. ## See Also - [references/api-reference.md](references/api-reference.md) - Detailed API documentation - [Anthropic API Docs](https://docs.anthropic.com/claude/reference) - Official documentation