--- name: cva-patterns-workflows description: Multi-agent workflow orchestration patterns for Clojure+Vertex AI. Covers sequential/parallel/DAG topologies, state management (stateless/stateful/hybrid), error handling (retry/circuit breaker), and monitoring. Includes production metrics from healthcare pipeline (99.4% latency reduction, 92.4% cost reduction). Use when designing multi-agent systems, implementing complex pipelines, optimizing workflow performance, or building production orchestration. allowed-tools: Read,Bash,Edit,Write,Glob --- # Workflow Pipeline Orchestration > **Pattern Type:** Architectural > **Complexity:** High > **Best For:** Multi-agent systems with 3+ agents, production pipelines requiring reliability and observability ## 🎯 Overview Workflow pipeline patterns enable orchestration of multiple AI agents into cohesive, production-ready systems. This pattern addresses three critical challenges: 1. **Topology Design**: How to connect agents (sequential, parallel, DAG) 2. **State Management**: How to handle intermediate results and failures 3. **Reliability**: How to ensure robustness with retries, fallbacks, and monitoring **When to Use:** - Pipeline has 3+ agents with dependencies - Need retry/recovery capabilities - Production environment requiring observability - Cost optimization through parallelization **Trade-offs:** - **Complexity**: Higher code overhead (+40% for DAG vs sequential) - **Performance**: Parallel topologies reduce latency by 32% - **Reliability**: State management adds +250ms overhead but enables recovery **Production ROI:** Healthcare pipeline achieved 99.4% latency reduction (4h 15m → 1m 30s) and 92.4% cost reduction (R$ 192.50 → R$ 14.70). ## 📊 Pattern Explanation ### Core Concept Multi-agent pipelines follow three topology patterns: ``` 1. SEQUENTIAL (Linear) Input → A → B → C → D → Output - Simple, predictable - Full latency = sum(all systems) - Use: Strong dependencies between agents 2. PARALLEL (Fork-Join) ┌→ B ┐ Input → A┤ ├→ D → Output └→ C ┘ - 32% latency reduction - Same cost, better performance - Use: Independent agents after A 3. DAG (Directed Acyclic Graph) ┌→ B → D ┐ Input → A → ┌┤ ├→ E → Output │└→ C ────┘ └→ Validation (optional) - Maximum flexibility - Supports optional nodes, validations - Use: Complex dependencies, 5+ agents ``` ### Implementation Approach **Step 1**: Define agent dependencies - Map each agent's input requirements - Identify independent agents (parallelizable) - Determine optional vs critical nodes **Step 2**: Choose state management strategy - **Stateless**: In-memory only (dev/simple pipelines) - **Stateful**: Database-backed (production, long pipelines) - **Hybrid**: Checkpoints for expensive agents only **Step 3**: Add error handling - Per-system retry with exponential backoff - Circuit breaker for external APIs - Fallback strategies (cache, skip, human review) **Step 4**: Instrument for observability - Track latency, cost, tokens per agent - Export metrics to Prometheus/CloudWatch - Alert on budget thresholds ## 💻 Clojure Implementation ### Basic Example: Sequential Pipeline ```clojure (ns lab.workflows.sequential "Sequential pipeline with error propagation" (:require [lab.agents.data-extraction :as s11] [lab.agents.claims-identification :as s12] [lab.agents.reference-search :as s212])) (defn execute-sequential-pipeline "Execute 3-agent pipeline sequentially. Agent flow: 1. S.1.1 extracts structured data from input 2. S.1.2 identifies claims requiring evidence 3. S.2-1.2 searches scientific references for claims Args: agents-map - Map {:s11 agent1, :s12 agent2, :s212 agent3} db-spec - Database connection spec input - Map {:texto \"...\", :requisitos \"...\"} Returns: {:success? boolean :results {:s11 {...}, :s12 {...}, :s212 {...}} :metrics {:total-time-ms int, :total-cost float}}" [agents-map db-spec input] (let [start-time (System/currentTimeMillis) cost-tracker (atom 0.0)] (try ;; System 1.1: Data extraction (LGPD-compliant) (let [s11-result (s11/extract-data (:s11 agents-map) db-spec input)] (when-not (:success? s11-result) (throw (ex-info "S.1.1 failed" {:system :s11, :error (:errors s11-result)}))) (swap! cost-tracker + 0.045) ;; System 1.2: Claims identification (let [s12-result (s12/identify-claims (:s12 agents-map) (:data s11-result))] (when-not (:success? s12-result) (throw (ex-info "S.1.2 failed" {:system :s12, :error (:errors s12-result)}))) (swap! cost-tracker + 0.021) ;; System 2-1.2: Reference search (let [s212-result (s212/search-references (:s212 agents-map) (:claims s12-result))] (when-not (:success? s212-result) (throw (ex-info "S.2-1.2 failed" {:system :s212, :error (:errors s212-result)}))) (swap! cost-tracker + 0.067) ;; Return successful result {:success? true :results {:s11 (:data s11-result) :s12 (:claims s12-result) :s212 (:references s212-result)} :metrics {:total-time-ms (- (System/currentTimeMillis) start-time) :total-cost @cost-tracker :systems-count 3}}))) (catch Exception e ;; Centralized error handling {:success? false :error (.getMessage e) :error-data (ex-data e) :metrics {:total-time-ms (- (System/currentTimeMillis) start-time) :total-cost @cost-tracker :partial? true}})))) (comment ;; Usage example (def agents {:s11 (create-gemini-agent "gemini-2.0-flash") :s12 (create-gemini-agent "gemini-2.0-flash") :s212 (create-claude-agent "claude-3-haiku")}) (def result (execute-sequential-pipeline agents db-spec {:texto "Criar post sobre TCC para ansiedade" :requisitos "Tom profissional, 800 palavras"})) (:metrics result) ;; => {:total-time-ms 14300, :total-cost 0.133, :systems-count 3} ;; Production metrics (5-agent healthcare pipeline): ;; - Latency: 32.2s average ;; - Cost: $0.391 per execution ;; - Success rate: 87.8% ) ``` ### Production Example: Parallel Pipeline with State Management ```clojure (ns lab.workflows.parallel "Parallel pipeline with hybrid state management" (:require [clojure.core.async :as async] [lab.workflows.sequential :as seq-wf] [next.jdbc :as jdbc] [cheshire.core :as json])) (defn save-checkpoint! "Save expensive agent result to database for recovery. Checkpoints enable resume-from-failure without re-running expensive agents (e.g., external API calls, long-running LLMs). Args: db-spec - Database connection spec pipeline-id - UUID of pipeline execution agent-id - Keyword (:s212, :s4, etc.) result - Agent result map metadata - Map {:cost float, :latency-ms int, :model string} Returns: Database row ID" [db-spec pipeline-id agent-id result metadata] (jdbc/execute-one! (jdbc/get-datasource db-spec) ["INSERT INTO pipeline_checkpoints (pipeline_id, agent_id, result_json, metadata_json, created_at) VALUES (?::uuid, ?, ?::jsonb, ?::jsonb, NOW()) RETURNING id" (str pipeline-id) (name agent-id) (json/generate-string result) (json/generate-string metadata)])) (defn load-checkpoint "Load checkpoint from database if exists. Returns: {:result map, :metadata map} or nil if not found" [db-spec pipeline-id agent-id] (when-let [row (jdbc/execute-one! (jdbc/get-datasource db-spec) ["SELECT result_json, metadata_json FROM pipeline_checkpoints WHERE pipeline_id = ?::uuid AND agent_id = ? ORDER BY created_at DESC LIMIT 1" (str pipeline-id) (name agent-id)])] {:result (json/parse-string (:result_json row) true) :metadata (json/parse-string (:metadata_json row) true)})) (defn execute-parallel-subsystems "Execute independent agents in parallel using core.async. Parallelization strategy: - S.1.2 and S.3-2 are independent after S.1.1 - Execute concurrently to reduce total latency - Both must succeed for pipeline to continue Args: agents-map - Map of agent instances db-spec - Database spec prof-id - Professional UUID s11-output - Output from S.1.1 (already executed) Returns: {:s12-result map :s32-result map :parallel-time-ms int :time-saved-ms int}" [agents-map db-spec prof-id s11-output] (let [start-time (System/currentTimeMillis) ;; Create channels for results s12-chan (async/chan) s32-chan (async/chan) ;; Execute S.1.2 in separate thread _ (async/go (try (let [result (s12/identify-claims (:s12 agents-map) s11-output)] (async/>! s12-chan result)) (catch Exception e (async/>! s12-chan {:success? false, :error (.getMessage e)})))) ;; Execute S.3-2 in separate thread _ (async/go (try (let [result (s32/optimize-seo (:s32 agents-map) db-spec s11-output prof-id)] (async/>! s32-chan result)) (catch Exception e (async/>! s32-chan {:success? false, :error (.getMessage e)})))) ;; Wait for both results (blocking) s12-result (async/$0.05 or >5s). - **Example**: Checkpoint S.2-1.2 (external API, 8.4s, $0.067) but not S.1.2 (fast, 2.1s, $0.021). Saves 75% on recovery cost. 3. **Use Circuit Breakers for External APIs** - **Rationale**: Prevent cascading failures when external services are down. Circuit breaker fails fast, avoiding wasted API calls. - **Example**: PubMed API circuit breaker (3 failures → open for 60s) saved $0.30/min during outage by avoiding 5 retry attempts per request. 4. **Instrument Everything** - **Rationale**: Production issues are invisible without metrics. Track latency, cost, tokens per agent for optimization insights. - **Example**: Discovered S.4 consumed 46% of total cost ($0.18/$0.391). Optimized with multi-model routing, reduced to $0.14 (-22%). 5. **Define Fallback Strategies Per Agent** - **Rationale**: Not all agent failures should halt the pipeline. SEO optimization can be skipped, but LGPD extraction cannot. - **Example**: S.3-2 (SEO) fallback is "skip", allowing pipeline to complete. S.1.1 (LGPD) fallback is "human review" (critical compliance). 6. **Test Recovery Paths** - **Rationale**: Stateful pipelines have complex recovery logic. Untested recovery can corrupt data or lose intermediate results. - **Example**: Integration test suite simulates failures at each checkpoint, validates resume-from-checkpoint produces identical output. ## 🔗 Related Skills - [`cva-concepts-agent-types`](../cva-concepts-agent-types/SKILL.md) - Agent architecture (Types A/B/C/D) ⭐ - [`cva-healthcare-pipeline`](../cva-healthcare-pipeline/SKILL.md) - Real 5-agent production application ⭐ - [`cva-patterns-context`](../cva-patterns-context/SKILL.md) - Context management for pipeline state - [`cva-patterns-cost`](../cva-patterns-cost/SKILL.md) - Cost optimization strategies - [`cva-basics-agent-creation`](../cva-basics-agent-creation/SKILL.md) - Creating Gemini/Claude agents - [`cva-basics-prompts`](../cva-basics-prompts/SKILL.md) - Prompt engineering for agents ## 📘 Additional Resources ### Pattern Variations **Streaming Pipelines**: Use core.async channels for continuous data flow (e.g., real-time event processing). Not covered here as healthcare pipeline is batch-oriented. **Saga Pattern**: Distributed transactions with compensating actions. Useful for multi-database pipelines requiring ACID guarantees. **Event-Driven Orchestration**: Pub/sub messaging between agents (vs direct calls). Better for loosely-coupled systems, adds messaging infrastructure complexity. ### Advanced Topics **Dynamic DAG Construction**: Build pipeline topology at runtime based on input characteristics. Example: Skip S.3-2 (SEO) if content type is "internal documentation". **Multi-Tenant Isolation**: Ensure pipeline executions for different tenants don't interfere (separate database schemas, resource quotas). **A/B Testing Framework**: Run parallel pipeline variants to compare quality/cost. Example: Gemini Flash vs Haiku for S.2-1.2 (73% cost reduction with Haiku, 2% quality drop). ### Performance Benchmarks | Topology | Latency | Cost | Complexity | Recovery | Use Case | |----------|---------|------|------------|----------|----------| | Sequential | 32.2s | $0.391 | ⭐ | ❌ | MVP, prototype | | Parallel | 21.8s | $0.391 | ⭐⭐ | ❌ | Production med volume | | Hybrid | 21.8s | $0.391 | ⭐⭐⭐ | ✅ | **Recommended production** | | DAG | 22.5s | $0.391 | ⭐⭐⭐⭐ | ✅ | Complex (5+ agents) | **Recommendation**: Start with **Parallel + Hybrid state** for production. Migrate to DAG only if you have 5+ agents with complex dependencies.