--- name: dbos-patterns description: DBOS durable execution patterns and CRITICAL constraints for ChainGraph executor. Use when working on workflows, steps, execution, or any DBOS-related code. Contains MUST-FOLLOW constraints about what can be called from workflows vs steps. Triggers: dbos, workflow, step, durable, execution, startWorkflow, writeStream, recv, send, runStep, atomic, checkpoint, WorkflowQueue, queue, cancelWorkflow, Promise.allSettled. (project) --- # DBOS Patterns for ChainGraph This skill covers DBOS (Database-Oriented Operating System) patterns used in the ChainGraph executor. **CRITICAL**: Contains constraints that agents MUST follow to avoid runtime errors. ## CRITICAL Constraints ### The Most Important Rule **DBOS context methods have strict calling restrictions based on WHERE you are:** ```typescript // ============================================================ // WORKFLOW FUNCTIONS: All DBOS methods allowed // ============================================================ async function myWorkflow(task: Task): Promise { await DBOS.send(...) // ✅ Allowed await DBOS.recv(...) // ✅ Allowed await DBOS.startWorkflow(...) // ✅ Allowed await DBOS.writeStream(...) // ✅ Allowed await DBOS.setEvent(...) // ✅ Allowed await DBOS.sleep(...) // ✅ Allowed const result = await DBOS.runStep(() => myStep(task)) // ✅ Allowed return result } // ============================================================ // STEP FUNCTIONS: ONLY writeStream() allowed! // ============================================================ async function myStep(task: Task): Promise { await DBOS.writeStream(...) // ✅ ONLY THIS ONE! // ❌ NOT ALLOWED - Will throw runtime error: // await DBOS.send(...) // ❌ Error! // await DBOS.recv(...) // ❌ Error! // await DBOS.startWorkflow(...) // ❌ Error! // await DBOS.setEvent(...) // ❌ Error! // await DBOS.sleep(...) // ❌ Error! return { data: ... } } ``` ### Constraint Reference Table | DBOS Method | From Workflow | From Step | |-------------|---------------|-----------| | `DBOS.send()` | ✅ | ❌ | | `DBOS.recv()` | ✅ | ❌ | | `DBOS.startWorkflow()` | ✅ | ❌ | | `DBOS.setEvent()` / `getEvent()` | ✅ | ❌ | | `DBOS.sleep()` | ✅ | ❌ | | `DBOS.cancelWorkflow()` | ✅ | ❌ | | `DBOS.runStep()` | ✅ | ❌ | | **`DBOS.writeStream()`** | ✅ | **✅** | | `DBOS.readStream()` | ✅ | ❌ | ### Promise Handling **NEVER use `Promise.all()`** - it fails fast and leaves promises unresolved, risking unhandled rejections. ```typescript // ❌ BAD: Promise.all() fails fast, other promises left dangling const results = await Promise.all([step1(), step2(), step3()]) // ✅ GOOD: Promise.allSettled() waits for all, reports outcomes const results = await Promise.allSettled([step1(), step2(), step3()]) ``` ### Memory Isolation Workflows and steps should **NOT** have side effects outside their own scope: - ✅ Can READ global variables - ❌ Must NOT create or update global variables - ❌ Must NOT modify shared state outside return values ### Queue Initialization Order **CRITICAL**: WorkflowQueue MUST be created **before** `DBOS.launch()` is called! ```typescript // File: server/dbos/queue.ts:17-35 // Queue is created at module level BEFORE DBOS.launch() export const executionQueue = new WorkflowQueue(QUEUE_NAME, { workerConcurrency: config.dbos.workerConcurrency ?? 5, concurrency: config.dbos.queueConcurrency ?? 100, }) // If created AFTER DBOS.launch(), queue will NOT dequeue tasks! ``` --- ## Design Patterns ### Pattern 1: Signal Pattern (Race Condition Fix) **Problem**: Client subscribes to events before the stream exists. **Solution**: Workflow writes initialization event BEFORE waiting for start signal. **File**: `packages/chaingraph-executor/server/dbos/workflows/ExecutionWorkflows.ts` ``` Timeline: 1. create execution (tRPC) └─ Workflow starts → writes EXECUTION_CREATED → stream exists! ✅ └─ Workflow waits for START_SIGNAL... ⏸️ 2. subscribe events (tRPC) └─ Stream already exists → immediately receives EXECUTION_CREATED ✅ 3. start execution (tRPC) └─ Sends START_SIGNAL → workflow continues ▶️ ``` **Implementation Pattern**: ```typescript async function executionWorkflow(task: ExecutionTask): Promise { // Write event BEFORE waiting - stream now exists! await DBOS.writeStream('events', { executionId: task.executionId, event: 'EXECUTION_CREATED', timestamp: Date.now(), }) // Now safe to wait - clients can subscribe const signal = await DBOS.recv('START_SIGNAL', 300) if (!signal) { throw new Error('Execution start timeout') } // Continue with execution... } ``` --- ### Pattern 2: Shared State Pattern (Command System) **Problem**: Cannot call `DBOS.recv()` from steps, but need to check for commands. **Solution**: Workflow polls messages, updates shared state object that step reads. **Files**: - Workflow: `server/dbos/workflows/ExecutionWorkflows.ts` - Step: `server/dbos/steps/ExecuteFlowAtomicStep.ts` ```typescript // Shared state object (passed from workflow to step) interface CommandController { currentCommand: 'PAUSE' | 'RESUME' | 'STEP' | null } // WORKFLOW LEVEL: Poll DBOS.recv() every 500ms async function executionWorkflow(task: ExecutionTask) { const commandController: CommandController = { currentCommand: null } const abortController = new AbortController() // Start polling loop (runs concurrently with step) const pollCommands = async () => { while (!abortController.signal.aborted) { const cmd = await DBOS.recv<{ command: string }>('COMMAND', 0.5) if (cmd) { if (cmd.command === 'STOP') { abortController.abort() } else { commandController.currentCommand = cmd.command } } } } // Run step with shared state const result = await DBOS.runStep(() => executeFlowAtomic(task, abortController, commandController) ) return result } // STEP LEVEL: Check shared state every 100ms (no DBOS calls!) async function executeFlowAtomic( task: ExecutionTask, abortController: AbortController, commandController: CommandController ) { const checkCommands = setInterval(() => { if (commandController.currentCommand === 'PAUSE') { debugger.pause() } else if (commandController.currentCommand === 'RESUME') { debugger.continue() } commandController.currentCommand = null }, 100) // Execute flow... // Step reads shared state, never calls DBOS.recv() } ``` --- ### Pattern 3: Collect & Spawn Pattern (Child Executions) **Problem**: Cannot call `DBOS.startWorkflow()` from steps, but Event Emitter nodes need to spawn children. **Solution**: Step collects child tasks and returns them, workflow spawns them. **Files**: - Step: `server/dbos/steps/ExecuteFlowAtomicStep.ts:346-401` - Workflow: `server/dbos/workflows/ExecutionWorkflows.ts` ```typescript // STEP: Collect child tasks (don't spawn!) async function executeFlowAtomic(task: ExecutionTask): Promise { const collectedChildTasks: ExecutionTask[] = [] // Execute flow, capture emitted events await engine.execute() // After execution, collect child tasks from emitted events for (const event of context.emittedEvents.filter(e => !e.processed)) { event.processed = true // Create child execution row in DB (allowed in step) const childTask = await createChildTask(instance, event, store) collectedChildTasks.push(childTask) } // Return child tasks for workflow-level spawning return { status: 'completed', childTasks: collectedChildTasks, // ← Workflow will spawn these } } // WORKFLOW: Spawn collected children (DBOS.startWorkflow allowed here!) async function executionWorkflow(task: ExecutionTask) { const result = await DBOS.runStep(() => executeFlowAtomic(task)) // Spawn children at workflow level if (result.childTasks?.length > 0) { for (const childTask of result.childTasks) { await DBOS.startWorkflow(executionWorkflow, { workflowID: childTask.executionId })(childTask) } } return result } ``` --- ### Pattern 4: Auto-Start Pattern (Child Execution Lifecycle) **Problem**: Children need manual start call, slowing down execution tree. **Solution**: Children skip the signal wait entirely and start immediately. **File**: `server/dbos/workflows/ExecutionWorkflows.ts:192-214` ```typescript async function executionWorkflow(task: ExecutionTask) { const executionRow = await store.get(task.executionId) const isChildExecution = !!executionRow.parentExecutionId // Write EXECUTION_CREATED first (Signal Pattern) await DBOS.writeStream('events', { event: 'EXECUTION_CREATED', ... }) // Auto-start for children! if (!isChildExecution) { // Parents: wait for signal from tRPC (timeout: 5 minutes) const startSignal = await DBOS.recv('START_SIGNAL', 300) if (!startSignal) { throw new Error('Execution start timeout') } } else { // Children: skip waiting, start immediately DBOS.logger.info(`Child execution auto-start, beginning execution`) } // Continue execution... } ``` **Child Execution Lifecycle**: ```text Parent spawns child via DBOS.startWorkflow() └─ Child workflow starts ├─ Writes EXECUTION_CREATED event ├─ Detects parentExecutionId ├─ Skips signal wait (auto-start) └─ Executes flow immediately ``` --- ### Pattern 5: WorkflowQueue Pattern (Managed Concurrency) **Problem**: Need to manage concurrency and ensure idempotent workflow spawning. **Solution**: Use WorkflowQueue with concurrency limits and deduplication. **File**: `server/dbos/queue.ts` ```typescript import { WorkflowQueue } from '@dbos-inc/dbos-sdk' // Create at module level BEFORE DBOS.launch() export const executionQueue = new WorkflowQueue('chaingraph-executions', { workerConcurrency: 5, // Max concurrent per worker process concurrency: 100, // Max concurrent globally }) // Use with deduplication to prevent duplicate workflows await DBOS.startWorkflow(ExecutionWorkflows, { queueName: executionQueue.name, workflowID: childTask.executionId, // Unique ID enqueueOptions: { deduplicationID: childTask.executionId, // Idempotency key }, }).executeChainGraph(childTask) ``` --- ### Pattern 6: Parent Monitoring Pattern (Child Stops if Parent Dies) **Problem**: Child executions should stop if their parent completes or fails. **Solution**: Background checker monitors parent workflow status. **File**: `server/dbos/workflows/ExecutionWorkflows.ts` ```typescript async function monitorParentWorkflow( parentExecutionId: string, abortController: AbortController ) { while (!abortController.signal.aborted) { const parentStatus = await DBOS.getWorkflowStatus(parentExecutionId) if (parentStatus?.status === 'COMPLETED' || parentStatus?.status === 'ERROR' || parentStatus?.status === 'CANCELLED') { abortController.abort('Parent workflow has ended') break } await DBOS.sleep(5) // Check every 5 seconds } } ``` --- ## Three-Phase Workflow Structure ChainGraph executions follow a three-phase structure: ```text ┌──────────────────────────────────────────────────────────────┐ │ PHASE 1: Stream Initialization (Lines 148-214) │ │ ├─ Create CommandController │ │ ├─ Write EXECUTION_CREATED event (stream exists!) │ │ ├─ Auto-start children (send START_SIGNAL to self) │ │ └─ Wait for START_SIGNAL │ ├──────────────────────────────────────────────────────────────┤ │ PHASE 2: Execution (Lines 216-374) │ │ ├─ Step 1: updateToRunning() │ │ ├─ Step 2: executeFlowAtomic() ← Core execution │ │ └─ Spawn children via DBOS.startWorkflow() │ ├──────────────────────────────────────────────────────────────┤ │ PHASE 3: Cleanup (Lines 376-423) │ │ ├─ Step 3: updateToCompleted() │ │ ├─ Stop command polling │ │ └─ DBOS auto-closes event stream │ └──────────────────────────────────────────────────────────────┘ ``` --- ## Key Files | File | Purpose | Critical? | |------|---------|-----------| | `server/dbos/workflows/ExecutionWorkflows.ts` | Main orchestration workflow | ⭐⭐⭐ | | `server/dbos/steps/ExecuteFlowAtomicStep.ts` | Core execution step | ⭐⭐⭐ | | `server/dbos/queue.ts:17-35` | Queue initialization (MUST be before DBOS.launch) | ⭐⭐⭐ | | `server/dbos/config.ts` | DBOS initialization | ⭐⭐ | | `server/dbos/DBOSExecutionWorker.ts` | Worker lifecycle | ⭐⭐ | | `server/dbos/steps/UpdateStatusStep.ts` | Status updates | ⭐ | | `server/implementations/dbos/DBOSEventBus.ts` | Event streaming via DBOS.writeStream() | ⭐⭐ | | `server/utils/config.ts:70-139` | Environment config | ⭐⭐ | --- ## Environment Variables ```bash # Enable DBOS mode (default: false) ENABLE_DBOS_EXECUTION=true # DBOS Admin UI DBOS_ADMIN_ENABLED=true DBOS_ADMIN_PORT=3022 # Access at http://localhost:3022 # Concurrency Limits DBOS_QUEUE_CONCURRENCY=100 # Global across all workers DBOS_WORKER_CONCURRENCY=5 # Per worker process # DBOS Conductor (optional, for production monitoring) DBOS_CONDUCTOR_URL=https://conductor.dbos.dev DBOS_APPLICATION_NAME=chaingraph-executor DBOS_CONDUCTOR_KEY=your-api-key-here ``` --- ## Anti-Patterns ### Anti-Pattern #1: Calling DBOS methods from steps ```typescript // ❌ BAD: Will throw runtime error async function myStep(data: string) { await DBOS.send('other-workflow', 'message', 'TOPIC') // ❌ Error! } // ✅ GOOD: Return data, let workflow send async function myStep(data: string): Promise<{ toSend: Message }> { return { toSend: { target: 'other-workflow', message: 'hello' } } } async function myWorkflow() { const result = await DBOS.runStep(() => myStep(data)) await DBOS.send(result.toSend.target, result.toSend.message, 'TOPIC') // ✅ } ``` ### Anti-Pattern #2: Splitting atomic execution ```typescript // ❌ BAD: State lost between steps await DBOS.runStep(() => loadFlow()) await DBOS.runStep(() => executeFlow()) // ❌ Flow state lost! // ✅ GOOD: Single atomic step await DBOS.runStep(() => executeFlowAtomic(task)) // ✅ All in one step ``` ### Anti-Pattern #3: Making children wait for START_SIGNAL ```typescript // ❌ BAD: Children timeout waiting for signal that never comes async function executionWorkflow(task: ExecutionTask) { const isChild = !!executionRow.parentExecutionId // Always waiting - children have no one to send them the signal! await DBOS.recv('START_SIGNAL', 300) // ❌ Times out for children } // ✅ GOOD: Children skip signal wait async function executionWorkflow(task: ExecutionTask) { const isChild = !!executionRow.parentExecutionId if (!isChild) { // Only parents wait for signal (from tRPC start() call) await DBOS.recv('START_SIGNAL', 300) } // Children start immediately - no signal wait! } ``` ### Anti-Pattern #4: Using Promise.all() for parallel steps ```typescript // ❌ BAD: Promise.all() fails fast, leaving other promises dangling const results = await Promise.all([ DBOS.runStep(() => step1()), DBOS.runStep(() => step2()), DBOS.runStep(() => step3()), ]) // ✅ GOOD: Promise.allSettled() waits for all, handles all outcomes const results = await Promise.allSettled([ DBOS.runStep(() => step1()), DBOS.runStep(() => step2()), DBOS.runStep(() => step3()), ]) ``` ### Anti-Pattern #5: Memory side effects in workflows/steps ```typescript // ❌ BAD: Modifying global state let globalCounter = 0 async function myWorkflow() { globalCounter++ // ❌ Side effect outside scope! } // ✅ GOOD: Return values instead of mutating globals async function myWorkflow(): Promise<{ count: number }> { const count = calculateCount() return { count } // ✅ Pure function, no side effects } ``` ### Anti-Pattern #6: Creating queue after DBOS.launch() ```typescript // ❌ BAD: Queue created after DBOS is initialized await DBOS.launch() const queue = new WorkflowQueue('my-queue') // ❌ Won't dequeue! // ✅ GOOD: Queue created at module level BEFORE DBOS.launch() const queue = new WorkflowQueue('my-queue') // ✅ Module level // ... later in main() await DBOS.launch() ``` --- ## Quick Reference | Need | Pattern | Where | |------|---------|-------| | Stream exists before subscribe | Signal Pattern | Write event before recv() | | Commands during step execution | Shared State | Workflow polls, step reads object | | Spawn child workflows | Collect & Spawn | Step collects, workflow spawns | | Children start immediately | Auto-Start | Skip signal wait | | Real-time events from step | `DBOS.writeStream()` | Only stream method allowed in steps | | Managed concurrency | WorkflowQueue | Queue with workerConcurrency/concurrency | | Child stops if parent dies | Parent Monitoring | Background status checker | | Parallel steps safely | Promise.allSettled() | Never use Promise.all() | --- ## DBOS Workflow Architecture ``` ┌─────────────────────────────────────────────────────────────┐ │ WORKFLOW (can call ALL DBOS methods) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ DBOS.send() │ │ DBOS.recv() │ │startWorkflow│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ DBOS.runStep(() => ...) │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ │ │ STEP (ONLY writeStream allowed) │ │ │ │ │ │ │ │ │ │ │ │ ✅ DBOS.writeStream() │ │ │ │ │ │ ❌ DBOS.send/recv/startWorkflow/sleep/... │ │ │ │ │ │ │ │ │ │ │ │ return { childTasks: [...] } │ │ │ │ │ └──────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ // After step completes: │ │ for (child of result.childTasks) { │ │ await DBOS.startWorkflow(...)(child) // ✅ Allowed here│ │ } │ └─────────────────────────────────────────────────────────────┘ ``` --- ## Advanced DBOS Features For advanced DBOS features not currently used in ChainGraph (Debouncer, forkWorkflow, versioning, rate limiting, partitioned queues), see `dbos-advanced.md` in this skill directory. --- ## Related Skills - `executor-architecture` - Package overview - `chaingraph-concepts` - Core domain concepts - `subscription-sync` - Event streaming patterns - `trpc-execution` - Execution tRPC procedures - `trpc-patterns` - General tRPC framework patterns