# Workflows Workflows are multi-step, resumable processes that maintain state across executions. They're perfect for complex operations, scheduled tasks, and long-running processes that may need to pause for user input or external events. ## Table of Contents - [Basic Concepts](#basic-concepts) - [Creating Workflows](#creating-workflows) - [Managing Instances](#managing-instances) - [State, Input, and Output](#state-input-and-output) - [Step Mechanics](#step-mechanics) - [Communication Patterns](#communication-patterns) - [AI/LLM Integration](#aillm-integration) - [Exposing Workflows as Tools (Non-Blocking Pattern)](#exposing-workflows-as-tools-non-blocking-pattern) - [Scheduled Workflows](#scheduled-workflows) - [Best Practices](#best-practices) ## Basic Concepts ### What are Workflows? - **Resumable**: Can pause and resume from where they left off - **Stateful**: Maintain state across steps and restarts - **Scheduled**: Can run on cron schedules - **Interactive**: Can request data from conversations ### File Location - **Location**: `src/workflows/*.ts` - **Auto-registration**: Files automatically become available as workflows ## Creating Workflows ### Basic Structure > ⚠️ **CRITICAL: State Access Pattern** > > Workflow state is passed as a **parameter** to the handler, not accessed via `this.state`! > The handler receives `{ input, state, step, client, execute }` where `state` is a mutable parameter that's automatically tracked and persisted. ```typescript import { Workflow, z } from "@botpress/runtime"; export const MyWorkflow = new Workflow({ name: "myWorkflow", description: "Description of what this workflow does", // Optional: Set workflow timeout (default: 5 minutes) timeout: "6h", // Can be: "30m", "2h", "1d", etc. // Optional: Schedule for automatic execution schedule: "0 9 * * *", // Cron format (daily at 9 AM) // Input schema - data required to start the workflow input: z.object({ userId: z.string(), data: z.string() }), // State schema - mutable data that persists across steps state: z.object({ currentStep: z.number().default(0), processedItems: z.array(z.string()).default([]) }), // Output schema - what the workflow returns when complete output: z.object({ result: z.boolean(), itemsProcessed: z.number() }), // Handler receives state as a parameter async handler({ input, state, step, client, execute }) { // Access and modify state directly (it's automatically tracked) state.currentStep = 1; // Execute steps const data = await step("fetch-data", async () => { return await fetchSomething(input.userId); }); state.processedItems.push(data); // Return output return { result: true, itemsProcessed: state.processedItems.length }; } }); ``` ### Handler Parameters The handler receives these parameters: ```typescript async handler({ input, state, step, client, execute }) { // input: z.infer - Read-only input data // state: z.infer - Mutable workflow state (automatically tracked and persisted) // step: TypedWorkflowStep - Step execution functions // client: BotClient - Botpress API client // execute: ExecuteFn - AI execution function for autonomous operations } ``` #### Using AI Execute in Workflows The `execute` function enables AI operations within workflows: ```typescript export const AnalyzeWorkflow = new Workflow({ name: "analyze", async handler({ input, step, execute }) { const result = await step("ai-analysis", async () => { return await execute({ instructions: "Analyze the provided data", model: "openai:gpt-4o", temperature: 0, input: { data: input.data } }); }); return { analysis: result }; } }); ``` ## Managing Instances ### Creating and Retrieving Workflows ```typescript // 1. Start a new workflow (always creates new instance) const instance = await MyWorkflow.start({ userId: user.id, data: "some data" }); console.log(instance.id); // "wflw_abc123" // 2. Get or create (idempotent - uses key for deduplication) const instance = await MyWorkflow.getOrCreate({ key: user.id, // Unique key prevents duplicates input: { userId: user.id, data: "x" } }); // 3. Check status via getOrCreate (no standalone get/list methods) const instance = await MyWorkflow.getOrCreate({ key: user.id, input: { userId: user.id, data: "x" } }); console.log(instance.status); // "in_progress" | "completed" | "failed" ``` ### Important Notes - There is NO `create()`, `get()`, or `list()` method - use `start()` or `getOrCreate()` - `start()`: Always creates a new instance - `getOrCreate()`: Creates only if no instance exists with the given key ### Instance Properties ```typescript const instance = await MyWorkflow.getOrCreate({ key: userId, input: { /* ... */ } }); // Available properties console.log(instance.id); // "wflw_abc123" console.log(instance.status); // "in_progress" | "completed" | "failed" | "cancelled" console.log(instance.input); // Input data console.log(instance.state); // Current state console.log(instance.output); // Output (undefined if not complete) ``` ### Workflow Control Methods #### Cancelling Workflows Cancel a running workflow from outside: ```typescript const instance = await MyWorkflow.getOrCreate({ key: userId, input: { /* ... */ } }); if (instance && instance.status === "in_progress") { await instance.cancel(); console.log(instance.status); // "cancelled" } ``` Cancellation is also available via the ADK UI (localhost:3001) during development. #### Completing Workflows Early Complete a workflow immediately with output (from within the handler): ```typescript export const MyWorkflow = new Workflow({ name: "myWorkflow", output: z.object({ result: z.string(), skipped: z.boolean() }), async handler({ input, workflow, step }) { // Check condition and complete early if (input.skipProcessing) { workflow.complete({ result: "Skipped", skipped: true }); // Code after complete() is never reached } // Normal processing continues if not completed early const data = await step("process", async () => { return await processData(input); }); return { result: data, skipped: false }; } }); ``` **Note:** `workflow.complete()` can only be called from within the workflow handler. It immediately interrupts execution. #### Failing Workflows Fail a workflow with an error reason (from within the handler): ```typescript export const MyWorkflow = new Workflow({ name: "myWorkflow", async handler({ input, workflow, step }) { // Validate input if (!input.data || input.data.length === 0) { workflow.fail("Invalid input: data cannot be empty"); // Code after fail() is never reached } // Check external conditions const isAvailable = await step("check-service", async () => { return await checkServiceAvailability(); }); if (!isAvailable) { workflow.fail("External service unavailable"); } // Continue normal processing return { success: true }; } }); ``` **Note:** `workflow.fail()` can only be called from within the workflow handler. It immediately interrupts execution and marks the workflow as failed. #### Extending Workflow Timeouts Extend or set a new timeout for long-running workflows: ```typescript export const LongRunningWorkflow = new Workflow({ name: "longRunning", timeout: "1h", // Initial timeout async handler({ input, workflow, step }) { // Extend timeout before a long operation workflow.setTimeout({ in: "6h" }); // Relative: 6 hours from now // Or set an absolute deadline workflow.setTimeout({ at: "2025-12-31T23:59:59Z" }); // Long-running operation for (const batch of batches) { await step(`process-batch-${batch.id}`, async () => { // Extend timeout for each batch if needed workflow.setTimeout({ in: "30m" }); return await processBatch(batch); }); } return { completed: true }; } }); ``` **Duration formats:** `"30m"`, `"1h"`, `"6 hours"`, `"1d"`, etc. #### Control Methods Summary | Method | Location | Purpose | |--------|----------|---------| | `instance.cancel()` | Outside handler | Cancel a workflow externally | | `workflow.complete(output)` | Inside handler | Complete early with output | | `workflow.fail(reason)` | Inside handler | Fail with error message | | `workflow.setTimeout({ in })` | Inside handler | Extend timeout (relative) | | `workflow.setTimeout({ at })` | Inside handler | Set absolute deadline | ## State, Input, and Output ### Understanding the Three Data Types ```typescript export const DataWorkflow = new Workflow({ name: "dataWorkflow", // INPUT: Read-only data passed at creation input: z.object({ userId: z.string(), config: z.object({ mode: z.string() }) }), // STATE: Mutable data persisted across steps state: z.object({ currentStep: z.number().default(0), items: z.array(z.string()).default([]), error: z.string().optional() }), // OUTPUT: Final result returned on completion output: z.object({ success: z.boolean(), totalProcessed: z.number() }), async handler({ input, state, step }) { // INPUT is read-only console.log(input.userId); // ✅ Read // input.userId = "new"; // ❌ TypeScript error // STATE is mutable via the state parameter state.currentStep = 1; // ✅ Automatically persisted state.items.push("item1"); // Steps automatically persist state await step("process", async () => { state.currentStep = 2; // Saved even if workflow crashes }); // OUTPUT is returned at the end return { success: true, totalProcessed: state.items.length }; } }); ``` ## Step Mechanics ### Basic Steps Steps are the fundamental unit of workflow execution with automatic persistence: ```typescript async handler({ input, state, step }) { // Basic step - returns and caches the value const userData = await step("fetch-user", async () => { const data = await api.getUser(input.userId); return data; // This value is cached and returned }); // Use the returned value console.log(userData.name); // Nested steps with return values const processedData = await step("process-data", async () => { // Steps can contain other steps const enriched = await step("enrich", async () => { return await enrichData(userData); }); const validated = await step("validate", async () => { return await validateData(enriched); }); return { enriched, validated }; // Return multiple values }); // Step with retry logic const result = await step("risky-op", async ({ attempt }) => { console.log(`Attempt ${attempt}`); if (attempt < 3) { throw new Error("Retry me"); } return "success"; }, { maxAttempts: 5, // Retry up to 5 times timeout: 10000 // 10 second timeout }); // Accessing state inside steps await step("update-state", async () => { state.userData = userData; // State is accessible in the closure state.processedData = processedData; // No need to return if just updating state }); // Progress logging in steps (production pattern) await step("final-processing", async () => { console.log(`✓ Processing user ${userData.id}`); const result = await finalProcess(processedData); console.log(`✓ Completed: ${result.status}`); return result; }); } ``` ### Available Step Methods ```typescript // Sleep for specified milliseconds await step.sleep("wait-5s", 5000); // Sleep until specific time await step.sleepUntil("wait-until", new Date("2025-12-31")); // Update progress message await step.progress("Processing items..."); // Execute child workflow const result = await step.executeWorkflow( "child", ChildWorkflow, { input: data } ); // Wait for workflow completion await step.waitForWorkflow("wait-child", childWorkflowId); // Request data from conversation (pauses workflow) const answer = await step.request( "confirmation", "Do you want to proceed?" ); // Process array in batches await step.batch("batch-urls", urls, async (batch) => { // Process batch }, { batchSize: 20 }); // Map over array with parallel processing and retry logic const results = await step.map( "map-items", items, async (item) => { return processItem(item); }, { concurrency: 10, // Process 10 items at a time maxAttempts: 3 // Retry failed items up to 3 times } ); // ForEach with concurrency await step.forEach("foreach-items", items, async (item) => { await processItem(item); }, { concurrency: 5 }); // Listen for events (pauses until event) await step.listen("wait-for-event"); // Fail workflow with message await step.fail("Workflow failed due to X"); // Abort workflow (no args, synchronous, does not mark as failed) step.abort(); ``` ### Step Rules 1. **Step names must be unique** within a workflow 2. **Step names must be stable** (don't use dynamic names) 3. **Steps execute sequentially** 4. **Completed steps are cached** and skipped on resume 5. **Step results are persisted** ```typescript // ❌ BAD: Dynamic step names break resume for (let i = 0; i < items.length; i++) { await step(`process-${i}`, async () => { /* ... */ }); } // ✅ GOOD: Single step for batch operations await step("process-all-items", async () => { for (const item of items) { await processItem(item); } }); ``` ## Communication Patterns ### 1. Requesting User Input (Blocking) Workflows can pause and request data from conversations: ```typescript // In Workflow export const InteractiveWorkflow = new Workflow({ name: "interactive", // Define expected request types requests: { email: z.object({ email: z.string().email() }), confirmation: z.object({ confirmed: z.boolean() }) }, async handler({ state, step }) { // Request email (workflow pauses here) const { email } = await step.request( "email", "Please enter your email address:" ); // Request confirmation const { confirmed } = await step.request( "confirmation", "Do you want to proceed?" ); if (confirmed) { // Continue processing } } }); // In Conversation - handle the request export const Chat = new Conversation({ channel: "chat.channel", async handler({ type, request, conversation }) { if (type === "workflow_request") { // Send prompt to user await conversation.send({ type: "text", payload: { text: request.workflow.prompt } }); // Later, provide data back await request.workflow.provide("email", { email: userInput }); } } }); ``` ### 2. Sending Progress Updates (Non-blocking) Send messages without pausing execution: ```typescript export const NotificationWorkflow = new Workflow({ name: "notification", input: z.object({ conversationId: z.string(), // CRITICAL: Required for messaging userId: z.string() }), async handler({ input, state, client, step }) { // Send progress update await step("notify-start", async () => { await client.createMessage({ conversationId: input.conversationId, type: "text", payload: { text: "🚀 Processing started..." } }); }); // Do work const result = await step("process", async () => { return await processData(); }); // Send completion await step("notify-complete", async () => { await client.createMessage({ conversationId: input.conversationId, type: "text", payload: { text: `✅ Complete: ${result}` } }); }); } }); ``` ### 3. Starting Workflows from Conversations **CRITICAL**: Always pass `conversationId` for communication: ```typescript // In Conversation handler async handler({ state, conversation, message }) { const instance = await MyWorkflow.start({ conversationId: conversation.id, // ← Essential! userId: user.id, data: message.payload.text }); // Store workflow ID for tracking state.activeWorkflowId = instance.id; } ``` ### 4. Handling Workflow Completion in Conversations When a workflow completes, fails, is canceled, or times out, the associated conversation handler receives a `workflow_callback` event with a typed `completion` object: ```typescript // In Conversation - handle workflow completion export const Chat = new Conversation({ channel: "chat.channel", async handler({ type, completion, conversation }) { if (type === "workflow_callback") { // completion.type — workflow name (e.g., "processOrder") // completion.workflow — workflow instance // completion.status — "completed" | "failed" | "canceled" | "timed_out" // completion.output — workflow output (when completed) // completion.error — error message (when failed) if (completion.status === "completed") { await conversation.send({ type: "text", payload: { text: `Processing finished! Result: ${JSON.stringify(completion.output)}` } }); } else if (completion.status === "failed") { await conversation.send({ type: "text", payload: { text: `Processing failed: ${completion.error}` } }); } } } }); ``` This replaces the need to poll workflow status or use raw event type guards. See **[Conversations — Handling Workflow Callbacks](./conversations.md#handling-workflow-callbacks-completion-events)** for full examples. ### Communication Best Practices 1. **Always pass conversationId** for workflows that need to communicate 2. **Wrap client calls in steps** for persistence and retry 3. **Choose the right pattern**: - `step.request()` for required user input - `type === "workflow_callback"` for reacting to workflow completion/failure - `client.createMessage()` for status updates - Events for decoupled notifications ## Inter-Workflow Communication ### Calling and Waiting for Other Workflows Workflows can start and wait for other workflows to complete: ```typescript import { Workflow, WorkflowDefinitions } from "@botpress/runtime"; export const ParentWorkflow = new Workflow({ name: "parent", input: z.object({ conversationId: z.string(), uniqueKey: z.string() }), output: z.object({ childResult: z.any() }), async handler({ step, input }) { // Start another workflow and wait for it const { output } = await step("call-child", async () => { const { id } = await ChildWorkflow.getOrCreate({ input: { data: "some data" }, statuses: ["pending", "in_progress", "completed"], key: `child_${input.uniqueKey}` }); // Wait for the workflow to complete const result = await step.waitForWorkflow( "ChildWorkflow", id ); return result.output as WorkflowDefinitions["child"]["output"]; }); return { childResult: output }; } }); ``` ### Workflow Coordination Pattern Use getOrCreate with unique keys to prevent duplicate workflows: ```typescript export const ExtractTopicsWorkflow = new Workflow({ name: "topics", input: z.object({ conversationId: z.string() }), async handler({ step, input }) { // Get or create ensures only one workflow per conversation const { id } = await InsightsWorkflow.getOrCreate({ input: { conversationId: input.conversationId }, statuses: ["pending", "in_progress", "completed"], key: `insights_conversation_${input.conversationId}` // Unique key }); // If workflow already exists with key, returns existing instance const { output } = await step.waitForWorkflow( "InsightsWorkflow", id ); return output; } }); ``` ### Workflow Synchronization Pattern When you need to ensure certain operations complete before others: ```typescript export const SyncWorkflow = new Workflow({ name: "sync", async handler({ step, input }) { // Step 1: Get or create dependent workflow const topics = await step("find topics", async () => { const { id } = await InsightsTopicsWorkflow.getOrCreate({ input: { conversationId: input.conversationId }, statuses: ["pending", "in_progress", "completed"], key: `topics_conversation_${input.conversationId}`, }); const { output } = await step.waitForWorkflow( "InsightsTopicsWorkflow", id ); return output as WorkflowDefinitions["insights_topics"]["output"]; }); // Step 2: Use the topics in the main processing const insights = await step("process insights", async () => { // Use topics data from the dependent workflow return processWithTopics(topics); }); return { insights }; } }); ``` ## AI/LLM Integration Workflows have full access to AI capabilities through multiple approaches: ### 1. Using execute() Function (Autonomous Agent) ```typescript import { Workflow, z, Autonomous } from "@botpress/runtime"; import { actions } from "@botpress/runtime"; export const AIWorkflow = new Workflow({ name: "aiWorkflow", input: z.object({ question: z.string() }), output: z.object({ answer: z.string() }), handler: async ({ input, state, execute, step }) => { // Define exits for structured responses const AnswerExit = new Autonomous.Exit({ name: "Answer", description: "Provide answer", schema: z.object({ answer: z.string(), confidence: z.number() }) }); // Define tools for AI const SearchTool = new Autonomous.Tool({ name: "Search", description: "Search for info", input: z.object({ query: z.string() }), output: z.string(), handler: async ({ query }) => { return await actions.browser.webSearch({ query }); } }); // Execute AI with tools const result = await execute({ instructions: `Answer: "${input.question}"`, tools: [SearchTool], exits: [AnswerExit], model: "openai:gpt-4o", temperature: 0.7, iterations: 10 }); if (result.is(AnswerExit)) { return { answer: result.output.answer }; } return { answer: "No answer found" }; } }); ``` ### 2. Using execute() with Knowledge Bases Provide knowledge bases directly to the AI for RAG-powered responses: ```typescript import { Workflow, z, Autonomous } from "@botpress/runtime"; import { KnowledgeDocs } from "../knowledge/docs"; // Your knowledge base export const KnowledgeWorkflow = new Workflow({ name: "answerFromKnowledge", input: z.object({ question: z.string() }), handler: async ({ input, execute }) => { const AnswerExit = new Autonomous.Exit({ name: "Answer", description: "Found answer in knowledge base", schema: z.object({ answer: z.string(), sources: z.array(z.string()) }) }); const NoAnswerExit = new Autonomous.Exit({ name: "NoAnswer", description: "No relevant information found" }); const result = await execute({ instructions: `Find answer to: "${input.question}" Search the knowledge base and provide accurate information. If no relevant information exists, use NoAnswer exit.`, knowledge: [KnowledgeDocs], // Pass knowledge bases directly exits: [AnswerExit, NoAnswerExit], model: "openai:gpt-4o", temperature: 0, iterations: 5 }); if (result.is(AnswerExit)) { // Access citations if needed const citations = context.get("citations"); const [cleaned, found] = citations.removeCitationsFromObject(result.output.answer); return { answer: cleaned, hasSources: found.length > 0 }; } return { answer: "No information found", hasSources: false }; } }); ``` ### 3. Using Cognitive Client (Raw LLM) ```typescript import { Workflow, z, context } from "@botpress/runtime"; export const DirectLLMWorkflow = new Workflow({ name: "directLLM", handler: async ({ input, state, step }) => { const cognitive = context.get("cognitive"); const result = await step("generate", async () => { const response = await cognitive.generateContent({ model: "openai:gpt-4o", systemPrompt: "You are helpful", messages: [ { role: "user", content: input.prompt } ], temperature: 0.7, maxTokens: 1000 }); return response.choices[0].message.content; }); return { response: result }; } }); ``` ### 3. Using Zai (Structured Operations) The ADK provides `adk.zai` for structured AI operations directly in workflows: ```typescript import { Workflow, z, adk } from "@botpress/runtime"; export const ZaiWorkflow = new Workflow({ name: "zaiWorkflow", handler: async ({ input, state, step }) => { // Extract structured data using adk.zai const insights = await step("extract insights", async () => adk.zai.extract( input.text, z.array(z.object({ type: z.enum(["bug", "feature_request", "question"]), description: z.string(), priority: z.enum(["low", "medium", "high"]) })), { instructions: "Extract customer insights from the conversation" } ) ); // Check conditions const hasBugs = await step("check for bugs", async () => adk.zai.check( insights, "Are there any bug reports?", { returnBoolean: true } ) ); // Summarize content const summary = await step("summarize", async () => adk.zai.summarize(input.text, { maxLength: 100, style: "bullet_points" }) ); return { insights, hasBugs, summary }; } }); ``` ### AI Approach Comparison | Approach | Best For | Example Use Case | |----------|----------|------------------| | **execute()** | Multi-step AI tasks with tools | Research, support agents | | **Cognitive Client** | Simple text generation | Content generation | | **Zai** | Structured data extraction | Entity extraction | | **Integration Actions** | Provider-specific features | Vision models | ### Important AI Notes 1. **Always wrap AI calls in step()** for persistence 2. **Model format**: Use `"provider:model"` (e.g., `"openai:gpt-4o"`) 3. **Cost awareness**: AI calls consume tokens 4. **Error handling**: AI calls can fail, use retries ## Exposing Workflows as Tools (Non-Blocking Pattern) Workflows can be converted to tools using `.asTool()`. This is useful when you want AI to start a long-running workflow without blocking on its completion. ### Direct Workflow Tool Pattern ```typescript // Starts the workflow and returns immediately await execute({ tools: [ MyWorkflow.asTool() ] }); ``` When a workflow is exposed this way, the tool starts the workflow and returns a lightweight result such as the workflow ID and current status. ### When to Still Use an Action Wrapper An Action wrapper is still useful when you want to rename the tool, constrain inputs, add business rules, or hide workflow details from the AI: ```typescript // File: /actions/start-processing.ts import { Action, z } from "@botpress/runtime"; import { DataProcessingWorkflow } from "../workflows/data-processing"; export default new Action({ name: "startDataProcessing", description: "Start data processing workflow", input: z.object({ datasetId: z.string(), options: z.object({ fullScan: z.boolean().default(false) }).optional() }), output: z.object({ workflowId: z.string() }), async handler({ input }) { // Calls workflow.start() - returns immediately const instance = await DataProcessingWorkflow.start({ datasetId: input.datasetId, options: input.options || { fullScan: false } }); return { workflowId: instance.id }; } }); ``` Use the Action wrapper as a tool: ```typescript import { Conversation, actions } from "@botpress/runtime"; export default new Conversation({ channel: "*", handler: async ({ execute }) => { await execute({ instructions: "Help the user", tools: [ actions.startDataProcessing.asTool() ] }); } }); ``` ### Tracking Workflow Status #### 1. Status Check Action ```typescript import { Action, z, context } from "@botpress/runtime"; export default new Action({ name: "checkProcessingStatus", description: "Check workflow status", input: z.object({ workflowId: z.string() }), output: z.object({ status: z.string(), progress: z.number() }), async handler({ workflowId }) { const client = context.get("client"); const { workflow } = await client.getWorkflow({ id: workflowId }); const { state } = await client.getState({ name: "workflowSteps", id: workflowId, type: "workflow" }).catch(() => ({ state: { payload: { steps: {} } } })); const steps = state.payload.steps || {}; const stepArray = Object.values(steps); const completed = stepArray.filter((s: any) => s.finishedAt).length; const progress = stepArray.length > 0 ? Math.round((completed / stepArray.length) * 100) : 0; return { status: workflow.status, progress }; } }); ``` #### 2. Progress Events Send progress events from workflow: ```typescript import { context } from "@botpress/runtime"; export const DataProcessingWorkflow = new Workflow({ name: "dataProcessing", async handler({ input, state, step, client }) { const workflow = context.get("workflow"); const conversationId = workflow.conversationId; if (conversationId) { await client.createEvent({ type: "workflowProgress", conversationId, payload: { workflowId: workflow.id, stage: "processing", progress: 50 } }); } return { itemsProcessed: 0 }; } }); ``` ### Key Differences | Aspect | Action | Workflow Wrapper | |--------|--------|------------------| | **Returns** | Final result | Workflow ID | | **Execution** | Synchronous | Asynchronous | | **Example** | `fetchUser()` | `startIndexing()` | > **See Also:** [Converting Actions to Tools](./actions.md#converting-actions-to-tools) for more details on using `.asTool()` with different action patterns. ## Scheduled Workflows Run workflows automatically on a schedule: ```typescript import { bot, Workflow, z } from "@botpress/runtime"; export const DailySync = new Workflow({ name: "dailySync", description: "Syncs data daily with incremental updates", // Cron schedule (runs at 8 AM every day) schedule: "0 8 * * *", // Long timeout for large sync operations timeout: "6h", // Scheduled workflows can have optional input for manual triggering input: z.object({ fullSync: z.boolean().default(false).describe("Force full sync") }), // Track sync progress state: z.object({ lastSyncDate: z.string().optional(), itemsProcessed: z.number().default(0) }), async handler({ input, state, step }) { // Access global bot state to check if initial sync is done bot.state.syncStatus ??= { initialized: false }; const isInitialSync = input.fullSync || !bot.state.syncStatus.initialized; const report = await step("generate", async () => { return await generateDailyReport(isInitialSync); }); await step("send", async () => { await sendReportToTeam(report); }); // Mark initial sync as complete in global state if (isInitialSync) { bot.state.syncStatus.initialized = true; } return { sent: true }; } }); ``` ### Cron Syntax Reference ``` ┌───────────── minute (0-59) │ ┌───────────── hour (0-23) │ │ ┌───────────── day of month (1-31) │ │ │ ┌───────────── month (1-12) │ │ │ │ ┌───────────── day of week (0-6, Sunday=0) │ │ │ │ │ * * * * * ``` ### Common Patterns ```typescript const schedulePatterns = { everyMinute: "* * * * *", every5Minutes: "*/5 * * * *", everyHour: "0 * * * *", daily9AM: "0 9 * * *", weekdaysMorning: "0 9 * * 1-5", // Mon-Fri at 9am firstOfMonth: "0 0 1 * *" }; ``` ## Accessing Global Bot State Workflows can also access and modify global bot state (not just workflow-specific state): ```typescript import { bot, Workflow, z } from "@botpress/runtime"; export const AnalysisWorkflow = new Workflow({ name: "analysis", schedule: "0 8 * * *", // Daily at 8 AM async handler({ state, step }) { // Perform analysis const results = await step("analyze", async () => { return await performAnalysis(); }); // Store results in global bot state bot.state.analysisResults = results; bot.state.lastAnalysisDate = new Date().toISOString(); // Access existing bot state console.log(`Previous analysis: ${bot.state.lastAnalysisDate}`); return { completed: true }; } }); ``` This is useful for: - Storing cross-workflow shared data - Maintaining global configuration - Tracking system-wide metrics - Sharing state between workflows and conversations ## Best Practices ### 1. Use Meaningful Keys ```typescript // Prevent duplicate workflows for same user await MyWorkflow.getOrCreate({ key: userId, // Or composite: `${userId}-${date}` input: { userId } }); ``` ### 2. Keep Step Names Stable ```typescript // ✅ GOOD: Stable step names await step("process-batch", async () => { for (const item of items) { await processItem(item); } }); // ❌ BAD: Dynamic step names for (let i = 0; i < items.length; i++) { await step(`process-${i}`, async () => {}); } ``` ### 3. Store Progress in State ```typescript // Access state via the state parameter in workflow handlers state.progress = "Processing item 5 of 10"; state.percentComplete = 50; // External systems can poll this state via workflow instance ``` ### 4. Pass State Values, Not References ```typescript // ❌ BAD - Passing state reference async handler({ state, step }) { const result = await processItems(state.items); // If state.items changes, processItems might see it! } // ✅ GOOD - Pass a copy async handler({ state, step }) { const itemsCopy = [...state.items]; const result = await processItems(itemsCopy); // Safe - function has its own copy } // ✅ GOOD - Extract primitives directly async handler({ state, step }) { const count = state.count; // Primitives are safe const items = [...state.items]; // Arrays need copying const config = { ...state.config }; // Objects need copying await step("process", async () => { return await processData(count, items, config); }); } ``` ### 5. Handle Errors Gracefully ```typescript await step("risky-op", async () => { try { return await riskyOperation(); } catch (error) { state.lastError = error.message; // Store error in state throw error; // Re-throw to mark step as failed } }, { maxAttempts: 3 }); ``` ### 6. Use Appropriate Timeouts ```typescript export const MyWorkflow = new Workflow({ timeout: "5m", // Balance completion time and resources // ... }); ``` ## Complete Example: Research Workflow ```typescript import { Workflow, z, Autonomous } from "@botpress/runtime"; import { actions } from "@botpress/runtime"; export const ResearchWorkflow = new Workflow({ name: "research", description: "Research a topic using web searches", input: z.object({ topic: z.string(), conversationId: z.string() }), state: z.object({ sources: z.array(z.string()).default([]), progress: z.string().default("Starting...") }), output: z.object({ report: z.string() }), async handler({ input, state, execute, step, client }) { // Notify start await step("notify-start", async () => { await client.createMessage({ conversationId: input.conversationId, type: "text", payload: { text: `🔍 Researching "${input.topic}"...` } }); }); // Define search tool const SearchTool = new Autonomous.Tool({ name: "webSearch", description: "Search the web", input: z.object({ query: z.string() }), output: z.string(), async handler({ query }) { const results = await actions.browser.webSearch({ query, count: 5 }); state.sources.push(...results.urls); // State accessible in closure throw new Autonomous.ThinkSignal("Found sources", results); } }); // Define report exit const ReportExit = new Autonomous.Exit({ name: "Report", description: "Final research report", schema: z.string() }); // Execute research const report = await step("research", async () => { state.progress = "Researching..."; // Update state const result = await execute({ instructions: `Research "${input.topic}". Use web searches to gather information. Compile findings into a comprehensive report.`, tools: [SearchTool], exits: [ReportExit], iterations: 15 }); if (result.is(ReportExit)) { return result.output; } return "Research incomplete"; }); // Send report await step("send-report", async () => { await client.createMessage({ conversationId: input.conversationId, type: "text", payload: { text: `📄 Research Report:\n\n${report}` } }); }); return { report }; } }); ``` ## Workflow Lifecycle ### Status Values - `pending`: Created but not started - `in_progress`: Currently executing - `completed`: Finished successfully - `failed`: Terminated due to error - `cancelled`: Manually cancelled - `timedout`: Exceeded timeout limit When a workflow reaches a terminal status (`completed`, `failed`, `cancelled`, `timedout`), the associated conversation handler receives a `workflow_callback` event. Use `type === "workflow_callback"` in the conversation handler to react to these transitions — see [Handling Workflow Completion in Conversations](#4-handling-workflow-completion-in-conversations). ### Managing Lifecycle ```typescript // Check status const instance = await MyWorkflow.getOrCreate({ key: userId, input: { /* ... */ } }); if (instance?.status === "failed") { // Retry by starting new instance await MyWorkflow.start(instance.input); } // Handle cancellation in handler async handler({ state, step }) { if (state.shouldCancel) { return { cancelled: true }; } } ```