Highway DSL Farshid A. Specification Rodmena Ltd. Version: 2.0.1 November 2025 Category: Standards Track Highway Workflow DSL Specification Status of This Memo This document specifies the Highway Workflow Domain-Specific Language (DSL), a declarative language for defining complex, production-grade workflow orchestration. This specification is intended to serve as the authoritative reference for implementers and users of the Highway workflow engine and compatible runtime systems. Abstract This document describes Highway DSL version 2.0.0 (LTS Stable), a Python-based domain-specific language for defining Directed Acyclic Graph (DAG) workflows with advanced features including conditional branching, parallel execution, iterative loops, event-driven coordination, scheduled execution, and durable error handling. Highway DSL provides Airflow-parity scheduling capabilities while maintaining a clean, type-safe, fluent API backed by Pydantic validation. The language is designed for integration with durable execution runtimes such as Absurd, enabling crash-resistant, long-running workflows with exactly- once semantics. Table of Contents 1. Introduction ....................................................4 1.1. Purpose and Scope ..........................................4 1.2. Architectural Philosophy ...................................4 1.3. Relationship to Absurd Runtime .............................5 1.4. Version History ............................................5 2. Terminology and Conventions .....................................6 2.1. Key Terms ..................................................6 2.2. Notational Conventions .....................................7 3. Architectural Overview ..........................................8 3.1. Design Principles ..........................................8 3.2. Execution Model ............................................8 3.3. Layered Architecture .......................................9 4. Core Data Models ...............................................10 4.1. Workflow Object ...........................................10 4.2. Operator Type Enumeration .................................11 4.3. Base Operator Contract ....................................12 4.4. Retry Policy ..............................................13 4.5. Timeout Policy ............................................14 5. Operator Specifications ........................................15 5.1. TaskOperator ..............................................15 5.2. ConditionOperator .........................................17 5.3. WaitOperator ..............................................19 5.4. ParallelOperator ..........................................21 5.5. ForEachOperator ...........................................23 5.6. WhileOperator .............................................25 5.7. EmitEventOperator .........................................27 5.8. WaitForEventOperator ......................................29 5.9. JoinOperator ..............................................30 5.10. SwitchOperator ...........................................32 6. Scheduling Specification .......................................33 6.1. Cron-Based Scheduling .....................................33 6.2. Start Date and Catchup ....................................34 6.3. Pause State and Active Runs ...............................35 6.4. Workflow Tags .............................................36 7. Event-Driven Coordination ......................................37 7.1. Event Emission ............................................37 7.2. Event Waiting .............................................38 7.3. Cross-Workflow Communication ..............................39 8. Error Handling and Resilience ..................................40 8.1. Retry Policies ............................................40 8.2. Timeout Policies ..........................................42 8.3. Callback Hooks ............................................43 8.4. Error Propagation .........................................45 9. WorkflowBuilder API ............................................46 9.1. Fluent Interface Design ...................................46 9.2. Task Definition Methods ...................................47 9.3. Control Flow Methods ......................................49 9.4. Scheduling Methods ........................................51 9.5. Event Methods .............................................52 9.6. Error Handling Methods ....................................53 9.7. Coordination Methods ......................................54 9.8. Build and Finalization ....................................55 10. Variable Resolution and Templating .............................56 10.1. Template Syntax ..........................................55 10.2. Context Variables ........................................56 10.3. Runtime Resolution .......................................57 11. Serialization Formats ..........................................58 11.1. YAML Serialization .......................................58 11.2. JSON Serialization .......................................60 11.3. Deserialization and Validation ...........................62 12. Dependency Management ..........................................63 12.1. Explicit Dependencies ....................................63 12.2. Implicit Chain Dependencies ..............................64 12.3. Loop Isolation ...........................................65 12.4. Parallel Branch Dependencies .............................66 13. Integration with Absurd Runtime ................................67 13.1. Mapping Operators to Absurd Primitives ...................67 13.2. Checkpoint Management ....................................69 13.3. Durable State ............................................70 14. Security Considerations ........................................71 14.1. Code Injection Risks .....................................71 14.2. Variable Validation ......................................72 14.3. Secret Management ........................................72 15. Examples .......................................................73 15.1. Simple Linear Workflow ...................................73 15.2. Conditional Branching ....................................75 15.3. Parallel Execution .......................................77 15.4. ForEach Loops ............................................79 15.5. While Loops ..............................................81 15.6. Scheduled Workflows ......................................83 15.7. Event-Driven Workflows ...................................85 15.8. Callback Hooks ...........................................87 15.9. Switch/Case Logic ........................................89 15.10. Join Coordination .......................................91 16. IANA Considerations ............................................93 17. References .....................................................92 17.1. Normative References .....................................92 17.2. Informative References ...................................92 1. Introduction 1.1. Purpose and Scope Highway DSL (Domain-Specific Language) is a declarative, Python-based language for defining complex workflow orchestration logic. It serves as the "blueprint layer" in a multi-tiered workflow architecture, separating workflow definition from execution runtime. The primary goals of Highway DSL are: 1. To provide a clear, concise, and type-safe API for defining Directed Acyclic Graph (DAG) workflows. 2. To achieve feature parity with Apache Airflow's scheduling and coordination capabilities while maintaining superior type safety and composability. 3. To enable seamless integration with durable execution runtimes (such as Absurd) that provide crash-resistance and exactly-once execution semantics. 4. To support advanced workflow patterns including conditional branching, parallel execution, iterative loops, event-driven coordination, and callback-based error handling. This specification describes version 1.3.0 of the Highway DSL, which introduces Airflow-parity scheduling features, event-based operators for cross-workflow coordination, durable callback hooks, and enhanced control flow operators. 1.2. Architectural Philosophy Highway DSL adheres to the following architectural principles: Separation of Concerns: Workflow definition (DSL) is strictly separated from workflow execution (runtime). The DSL produces a pure data structure (the Workflow object) with no side effects. Declarative Over Imperative: Workflows are declared as immutable specifications, not executed as procedural code. This enables static analysis, visualization, and runtime optimization. Type Safety Through Validation: All workflow objects are backed by Pydantic models, providing runtime validation, serialization, and comprehensive type hints. Fluent API Design: The WorkflowBuilder class implements a fluent interface, allowing workflows to be defined in a natural, chainable syntax. Runtime Agnostic: While designed for integration with Absurd, the DSL produces runtime-agnostic workflow specifications that can be interpreted by any compatible executor. 1.3. Relationship to Absurd Runtime Highway DSL is designed to integrate with Absurd, a PostgreSQL-based durable execution runtime. The integration model is as follows: 1. A workflow is defined using Highway DSL (the blueprint). 2. The workflow is serialized to YAML or JSON and stored in a database table (WorkflowDefinition). 3. A scheduler service (using APScheduler) reads the workflow definition and, based on the cron schedule, spawns a new Absurd task (highway.workflow.run). 4. This Absurd task is a generic workflow interpreter (RefactoredInterpreter) that walks the Highway DSL graph node by node. 5. Each node in the Highway DSL graph is mapped to corresponding Absurd SDK calls (ctx.step, ctx.waitForEvent, absurd.spawn, etc.) managed by a DurableContext. This layered architecture allows Highway DSL to remain a pure specification language while Absurd provides the durable execution guarantees. 1.4. Version History Version 1.0.0 (Initial Release): - Basic task operators (Task, Condition, Wait, Parallel) - Loop operators (ForEach, While) - Retry and timeout policies - YAML/JSON serialization - WorkflowBuilder fluent API Version 1.0.2 (Bug Fix Release): - Fixed ForEach operator dependency management - Improved loop isolation from parallel operators - Enhanced error handling Version 1.0.3 (Stable Release): - Critical bug fixes for dependency injection - Comprehensive test coverage - Documentation improvements Version 1.1.0 (Feature Release): - Airflow-parity scheduling metadata (cron, start_date, catchup) - Event-based operators (EmitEvent, WaitForEvent) - Durable callback hooks (on_success, on_failure) - Switch/case operator - Task descriptions - Workflow-level default retry policy - Tags and metadata enhancements Version 2.0.0 (LTS Stable Release): - LONG-TERM SUPPORT (LTS) release - stable production-ready API - No breaking changes planned for 2.x series - Recommended for all production deployments - Consolidates all features from 1.x series: * JoinOperator for Temporal-style explicit coordination * Event-driven patterns (EmitEvent, WaitForEvent) * SwitchOperator for multi-branch routing * Scheduling metadata (cron, start_date, catchup) * Idempotency support across all operators * Duration helper class * Callback hooks (on_success, on_failure) - Future 2.x releases will only include backward-compatible enhancements Version 1.2.0 (Stability Release): - Fixed dependency management for parallel and loop operators - Enhanced loop body isolation with is_internal_loop_task flag - Improved parallel branch handling with is_internal_parallel_task - Bug fixes for conditional operator dependency injection - Validation improvements for callback references Version 1.3.0 (Join Operator Release): - JoinOperator for Temporal-style explicit coordination - JoinMode enumeration (ALL_OF, ANY_OF, ALL_SUCCESS, ONE_SUCCESS) - Replaces brittle TriggerRule-based joins with explicit operators - TriggerRule deprecated in favor of JoinOperator - Idempotency key support on all operators (idempotency_key field) - Duration helper class for cleaner time specifications - Parallel execution mode for ForEachOperator (dynamic task mapping) 2. Terminology and Conventions 2.1. Key Terms The following terms are used throughout this specification: Workflow: A complete, self-contained definition of a multi-step process, represented as a Directed Acyclic Graph (DAG) of operators. Operator: A single node in the workflow graph representing a discrete unit of work or control flow logic. All operators inherit from BaseOperator. Task: A specific type of operator (TaskOperator) that executes a function with specified arguments. Dependency: A directed edge in the workflow graph indicating that one operator must complete before another can begin. Builder: An instance of the WorkflowBuilder class used to construct a workflow using a fluent API. Runtime: The execution environment responsible for interpreting and executing workflows (e.g., Absurd). Durable Execution: An execution model where workflow state is persisted to a database at each step, enabling crash-resistance and exactly-once semantics. Checkpoint: A persisted snapshot of workflow state at a specific operator, stored by the runtime for resumption after failures. Event: A named, typed message emitted by one workflow and consumed by another, enabling cross-workflow coordination. Schedule: A cron expression defining when a workflow should be automatically executed. Callback: A task that is automatically executed when another task succeeds or fails (on_success_task_id, on_failure_task_id). 2.2. Notational Conventions The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in RFC 2119. Code examples are presented in Python 3.10+ syntax. YAML examples use standard YAML 1.2 syntax. Field types follow Pydantic/Python type annotation conventions: - str: string - int: integer - bool: boolean - float: floating point number - Dict[K, V]: dictionary with key type K and value type V - List[T]: list of elements of type T - Optional[T]: value of type T or None - Union[A, B]: value of type A or type B 3. Architectural Overview 3.1. Design Principles Highway DSL is built on the following core design principles: Immutability: Once constructed, a Workflow object is immutable. All builder methods return self to enable chaining, but internal state changes are managed carefully to maintain consistency. Validation-First: All data structures use Pydantic models with comprehensive validation rules. Invalid workflows cannot be constructed. No Side Effects: The DSL itself performs no I/O, makes no network calls, and executes no user code. It is a pure specification language. Runtime Agnostic: The DSL makes no assumptions about the execution runtime beyond the ability to interpret operator types and respect dependencies. Human and Machine Readable: Workflows can be defined in Python (human-friendly) and serialized to YAML/JSON (machine-friendly) with full round-trip fidelity. 3.2. Execution Model Highway DSL workflows follow a dependency-driven execution model: 1. The runtime identifies all operators with no unmet dependencies (the "ready set"). 2. Operators in the ready set are executed in parallel (subject to runtime resource constraints). 3. When an operator completes, its dependent operators are checked. If all dependencies are met, they are added to the ready set. 4. This process continues until all operators have completed or a failure occurs. Special execution rules: - ConditionOperator: Only one branch (if_true or if_false) is executed based on condition evaluation. - ParallelOperator: All branches are executed concurrently. - ForEachOperator: The loop body is executed once for each item in the items collection, with each iteration seeing the current item in the {{item}} variable. - WhileOperator: The loop body is executed repeatedly while the condition evaluates to true. - WaitOperator: Execution pauses for the specified duration or until the specified datetime. - WaitForEventOperator: Execution pauses until the named event is emitted by another workflow or the timeout is reached. 3.3. Layered Architecture The Highway ecosystem consists of three primary layers: Layer 1: Definition Layer (Highway DSL) - Pure Python classes (Pydantic models) - WorkflowBuilder fluent API - YAML/JSON serialization - No runtime dependencies Layer 2: Storage Layer (Database) - WorkflowDefinition table (stores serialized workflows) - WorkflowRun table (tracks execution instances) - Event table (stores emitted events) - Managed by Alembic migrations Layer 3: Execution Layer (Absurd Runtime) - RefactoredInterpreter (workflow walker) - DurableContext (Absurd SDK wrapper) - Tool/Executor model (task execution) - Scheduler service (APScheduler integration) This specification focuses on Layer 1 (Definition Layer). Layer 2 and Layer 3 are described in separate documents. 4. Core Data Models 4.1. Workflow Object The Workflow class is the root container for all workflow definitions. Class: Workflow Module: highway_dsl.workflow_dsl Inherits: pydantic.BaseModel Fields: name: str (REQUIRED) Unique identifier for the workflow. MUST be a valid Python identifier (alphanumeric plus underscore, not starting with a digit). version: str (DEFAULT: "1.3.0") Semantic version string for the workflow specification. description: str (DEFAULT: "") Human-readable description of the workflow's purpose. tasks: Dict[str, Union[TaskOperator, ConditionOperator, ...]] Dictionary mapping task IDs to operator instances. Task IDs MUST be unique within the workflow. variables: Dict[str, Any] (DEFAULT: {}) Workflow-scoped variables available to all operators via template resolution. start_task: Optional[str] (DEFAULT: None) Task ID of the first operator to execute. If None, the first task added to the workflow is used. schedule: Optional[str] (DEFAULT: None) Cron expression for scheduled execution (e.g., "0 2 * * *" for daily at 2 AM). See Section 6.1. start_date: Optional[datetime] (DEFAULT: None) Timestamp when the schedule becomes active. Runs are not created before this date. catchup: bool (DEFAULT: False) If True, missed runs between start_date and now are backfilled. If False, only future runs are scheduled. is_paused: bool (DEFAULT: False) If True, scheduled execution is suspended. Manual runs are still permitted. tags: List[str] (DEFAULT: []) List of tags for categorization and filtering. max_active_runs: int (DEFAULT: 1) Maximum number of concurrent executions of this workflow. default_retry_policy: Optional[RetryPolicy] (DEFAULT: None) Default retry policy applied to all tasks that do not specify their own retry policy. Methods: add_task(task: BaseOperator) -> Workflow Add an operator to the workflow. Returns self for chaining. set_variables(variables: Dict[str, Any]) -> Workflow Merge the provided variables into the workflow variables. set_start_task(task_id: str) -> Workflow Set the starting operator. to_yaml() -> str Serialize the workflow to YAML format. to_json() -> str Serialize the workflow to JSON format. from_yaml(yaml_str: str) -> Workflow (class method) Deserialize a workflow from YAML. from_json(json_str: str) -> Workflow (class method) Deserialize a workflow from JSON. 4.2. Operator Type Enumeration The OperatorType enumeration defines all valid operator types. Enum: OperatorType Module: highway_dsl.workflow_dsl Inherits: enum.Enum Values: TASK = "task" Executes a function with specified arguments. CONDITION = "condition" Evaluates a boolean expression and routes to one of two branches. WAIT = "wait" Pauses execution for a specified duration or until a datetime. PARALLEL = "parallel" Executes multiple branches concurrently. FOREACH = "foreach" Iterates over a collection, executing the loop body for each item. WHILE = "while" Repeatedly executes the loop body while a condition is true. EMIT_EVENT = "emit_event" Emits a named event with an optional payload for cross-workflow coordination. WAIT_FOR_EVENT = "wait_for_event" Pauses execution until a named event is received or a timeout occurs. SWITCH = "switch" Multi-branch routing based on the value of an expression (switch/case logic). JOIN = "join" Coordinates multiple tasks/branches with explicit join modes (Temporal-style join logic). 4.3. Base Operator Contract All operators inherit from BaseOperator, which defines the common interface. Class: BaseOperator Module: highway_dsl.workflow_dsl Inherits: pydantic.BaseModel, abc.ABC Fields: task_id: str (REQUIRED) Unique identifier for this operator within the workflow. MUST be unique across all tasks in the workflow. operator_type: OperatorType (REQUIRED) The type of this operator. Set automatically by subclasses. dependencies: List[str] (DEFAULT: []) List of task IDs that must complete before this operator can execute. retry_policy: Optional[RetryPolicy] (DEFAULT: None) Retry policy for this operator. If None, uses the workflow's default_retry_policy. timeout_policy: Optional[TimeoutPolicy] (DEFAULT: None) Timeout policy for this operator. If None, no timeout is enforced. metadata: Dict[str, Any] (DEFAULT: {}) Arbitrary key-value pairs for custom metadata. description: str (DEFAULT: "") Human-readable description of what this operator does. on_success_task_id: Optional[str] (DEFAULT: None) Task ID of an operator to execute if this operator succeeds. on_failure_task_id: Optional[str] (DEFAULT: None) Task ID of an operator to execute if this operator fails. is_internal_loop_task: bool (DEFAULT: False, exclude=True) Internal flag to mark operators that are part of a loop body. Excluded from serialization. Validation Rules: - task_id MUST NOT contain whitespace or special characters except underscore. - dependencies MUST reference valid task IDs in the same workflow. - on_success_task_id and on_failure_task_id MUST reference valid task IDs if specified. 4.4. Retry Policy The RetryPolicy class defines how operators should be retried on failure. Class: RetryPolicy Module: highway_dsl.workflow_dsl Inherits: pydantic.BaseModel Fields: max_retries: int (DEFAULT: 3) Maximum number of retry attempts. A value of 0 disables retries. delay: timedelta (DEFAULT: timedelta(seconds=5)) Initial delay between retry attempts. backoff_factor: float (DEFAULT: 2.0) Multiplicative factor applied to delay after each retry. The delay for retry N is: delay * (backoff_factor ^ N). Behavior: If an operator with a retry policy fails: 1. The runtime waits for the specified delay. 2. The operator is re-executed. 3. If it fails again, the delay is multiplied by backoff_factor. 4. This continues until max_retries is reached or the operator succeeds. Example: RetryPolicy(max_retries=3, delay=timedelta(seconds=10), backoff_factor=2.0) This results in retry delays of: 10s, 20s, 40s. Serialization: When serialized to YAML/JSON, timedelta is represented in ISO 8601 duration format: delay: PT10S (10 seconds) delay: PT5M (5 minutes) delay: PT1H (1 hour) 4.5. Timeout Policy The TimeoutPolicy class defines execution time limits for operators. Class: TimeoutPolicy Module: highway_dsl.workflow_dsl Inherits: pydantic.BaseModel Fields: timeout: timedelta (REQUIRED) Maximum execution time for the operator. kill_on_timeout: bool (DEFAULT: True) If True, the runtime SHOULD terminate the operator's execution when the timeout is reached. If False, the runtime MAY allow the operator to complete but mark it as failed. Behavior: If an operator with a timeout policy exceeds its timeout: 1. If kill_on_timeout is True, the runtime terminates execution. 2. The operator is marked as failed. 3. If a retry policy is also specified, the retry logic is triggered. 4. If an on_failure callback is specified, it is executed. Example: TimeoutPolicy(timeout=timedelta(hours=2), kill_on_timeout=True) Serialization: When serialized to YAML/JSON, timedelta is represented in ISO 8601 duration format: timeout: PT2H (2 hours) 5. Operator Specifications 5.1. TaskOperator The TaskOperator executes a function with specified arguments. Class: TaskOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: function: str (REQUIRED) Fully-qualified function name in dot notation (e.g., "module.submodule.function_name"). The runtime is responsible for resolving and executing this function. args: List[Any] (DEFAULT: []) Positional arguments to pass to the function. Values may contain template variables (e.g., "{{variable_name}}"). kwargs: Dict[str, Any] (DEFAULT: {}) Keyword arguments to pass to the function. Values may contain template variables. result_key: Optional[str] (DEFAULT: None) If specified, the return value of the function is stored in the workflow context under this key, making it available to subsequent operators via template resolution (e.g., "{{result_key}}"). Operator Type: OperatorType.TASK Execution Semantics: 1. The runtime resolves all template variables in args and kwargs using the current workflow context. 2. The runtime imports and calls the specified function with the resolved arguments. 3. If the function raises an exception: a. If a retry_policy is specified, retry logic is triggered. b. If retries are exhausted, the operator is marked as failed. c. If an on_failure_task_id is specified, that task is executed. 4. If the function completes successfully: a. If result_key is specified, the return value is stored in the workflow context. b. If an on_success_task_id is specified, that task is executed. c. All dependent tasks are evaluated for readiness. Example (Python): task = TaskOperator( task_id="extract_data", function="etl.extract.fetch_from_api", args=["{{api_url}}", "{{api_key}}"], kwargs={"timeout": 30}, result_key="raw_data", retry_policy=RetryPolicy(max_retries=3, delay=timedelta(seconds=10)), timeout_policy=TimeoutPolicy(timeout=timedelta(minutes=5)), description="Fetch raw data from external API" ) Example (YAML): extract_data: operator_type: task task_id: extract_data function: etl.extract.fetch_from_api args: - "{{api_url}}" - "{{api_key}}" kwargs: timeout: 30 result_key: raw_data retry_policy: max_retries: 3 delay: PT10S backoff_factor: 2.0 timeout_policy: timeout: PT5M kill_on_timeout: true description: Fetch raw data from external API dependencies: [] 5.2. ConditionOperator The ConditionOperator evaluates a boolean expression and routes execution to one of two branches. Class: ConditionOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: condition: str (REQUIRED) A boolean expression to evaluate. The expression MAY contain template variables. The runtime MUST evaluate this to a boolean value. if_true: Optional[str] (DEFAULT: None) Task ID of the operator to execute if the condition evaluates to True. if_false: Optional[str] (DEFAULT: None) Task ID of the operator to execute if the condition evaluates to False. Operator Type: OperatorType.CONDITION Execution Semantics: 1. The runtime resolves all template variables in the condition string. 2. The runtime evaluates the condition to a boolean value. 3. If the condition is True: a. If if_true is specified, that task becomes ready for execution. b. The if_false branch is NOT executed. 4. If the condition is False: a. If if_false is specified, that task becomes ready for execution. b. The if_true branch is NOT executed. 5. Tasks that depend on the ConditionOperator itself (not on the branches) wait for the selected branch to complete. Example (Python): condition = ConditionOperator( task_id="check_quality", condition="{{data.quality_score}} > 0.8", if_true="high_quality_processing", if_false="standard_processing", dependencies=["validate_data"] ) Example (YAML): check_quality: operator_type: condition task_id: check_quality condition: "{{data.quality_score}} > 0.8" if_true: high_quality_processing if_false: standard_processing dependencies: - validate_data 5.3. WaitOperator The WaitOperator pauses workflow execution for a specified duration or until a specified datetime. Class: WaitOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: wait_for: Union[timedelta, datetime, str] (REQUIRED) The wait specification. Can be: - timedelta: Duration to wait (e.g., timedelta(hours=1)) - datetime: Absolute datetime to wait until - str: Serialized form (see Serialization below) Operator Type: OperatorType.WAIT Execution Semantics: 1. If wait_for is a timedelta: a. The runtime records the current time T. b. The operator completes at time T + wait_for. 2. If wait_for is a datetime: a. The operator completes when the system clock reaches wait_for. 3. The runtime SHOULD use a durable sleep mechanism (e.g., Absurd's ctx.sleep) to ensure the wait survives crashes. 4. Template variables in wait_for are NOT supported in version 1.3.0. Serialization: When serialized, timedelta and datetime are converted to prefixed strings: - timedelta: "duration:" (e.g., "duration:3600" for 1 hour) - datetime: "datetime:" (e.g., "datetime:2025-01-01T00:00:00") When deserialized, these strings are parsed back to the appropriate Python types. ISO 8601 duration format is also supported in YAML: wait_for: PT1H (parsed as timedelta(hours=1)) wait_for: P1D (parsed as timedelta(days=1)) Example (Python): wait = WaitOperator( task_id="wait_for_batch", wait_for=timedelta(hours=24), dependencies=["process_data"] ) Example (YAML): wait_for_batch: operator_type: wait task_id: wait_for_batch wait_for: P1D dependencies: - process_data 5.4. ParallelOperator The ParallelOperator executes multiple branches concurrently. Class: ParallelOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: branches: Dict[str, List[str]] (REQUIRED) Dictionary mapping branch names to lists of task IDs. Each branch is executed concurrently. Branch names MUST be unique within the operator. timeout: Optional[int] (DEFAULT: None) Optional timeout in seconds for all branches to complete. If any branch exceeds this timeout, the entire operator fails. Operator Type: OperatorType.PARALLEL Execution Semantics: 1. All branches are started concurrently (subject to runtime resource limits). 2. Each branch executes its task list in dependency order. 3. The ParallelOperator completes when ALL branches have completed successfully. 4. If ANY branch fails: a. Other branches MAY continue or be cancelled (runtime- dependent). b. The ParallelOperator is marked as failed. c. Retry and callback logic is triggered. 5. Tasks that depend on the ParallelOperator wait for all branches to complete. Example (Python): parallel = ParallelOperator( task_id="parallel_processing", branches={ "branch_a": ["transform_a", "enrich_a"], "branch_b": ["transform_b", "enrich_b"], "branch_c": ["transform_c"] }, timeout=3600, dependencies=["fetch_data"] ) Example (YAML): parallel_processing: operator_type: parallel task_id: parallel_processing branches: branch_a: - transform_a - enrich_a branch_b: - transform_b - enrich_b branch_c: - transform_c timeout: 3600 dependencies: - fetch_data 5.5. ForEachOperator The ForEachOperator iterates over a collection, executing a loop body for each item. Class: ForEachOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: items: str (REQUIRED) A template expression that evaluates to an iterable collection (e.g., "{{data.records}}", "{{range(10)}}"). loop_body: List[Union[TaskOperator, ConditionOperator, ...]] List of operators to execute for each item. These operators are serialized inline within the ForEachOperator. Operator Type: OperatorType.FOREACH Execution Semantics: 1. The runtime evaluates the items expression to obtain a collection. 2. For each item in the collection: a. The runtime sets the special variable {{item}} to the current item. b. The loop body is executed with access to {{item}}. c. All template variables in the loop body are resolved with {{item}} in scope. 3. Loop iterations MAY execute in parallel (runtime-dependent). 4. The ForEachOperator completes when all iterations have completed successfully. 5. If any iteration fails: a. The behavior is runtime-dependent (fail-fast or continue). b. Retry and callback logic applies to the entire operator, not individual iterations. 6. Loop body tasks are marked with is_internal_loop_task=True to isolate them from external dependency injection. Example (Python): foreach = ForEachOperator( task_id="process_records", items="{{data.records}}", loop_body=[ TaskOperator( task_id="validate_record", function="validators.validate", args=["{{item}}"] ), TaskOperator( task_id="save_record", function="db.save", args=["{{item}}"], dependencies=["validate_record"] ) ], dependencies=["fetch_data"] ) Example (YAML): process_records: operator_type: foreach task_id: process_records items: "{{data.records}}" loop_body: - operator_type: task task_id: validate_record function: validators.validate args: - "{{item}}" dependencies: - process_records - operator_type: task task_id: save_record function: db.save args: - "{{item}}" dependencies: - validate_record dependencies: - fetch_data 5.6. WhileOperator The WhileOperator repeatedly executes a loop body while a condition remains true. Class: WhileOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: condition: str (REQUIRED) A boolean expression evaluated before each iteration. The loop continues while this evaluates to True. loop_body: List[Union[TaskOperator, ConditionOperator, ...]] List of operators to execute in each iteration. Operator Type: OperatorType.WHILE Execution Semantics: 1. The runtime evaluates the condition expression. 2. If the condition is False: a. The loop body is NOT executed. b. The WhileOperator completes immediately. 3. If the condition is True: a. The loop body is executed. b. After the loop body completes, the condition is re-evaluated. c. Steps 2-3 repeat. 4. If the loop body fails: a. The WhileOperator is marked as failed. b. Retry and callback logic is triggered. 5. The runtime SHOULD enforce a maximum iteration limit to prevent infinite loops (implementation-specific). Example (Python): while_loop = WhileOperator( task_id="retry_until_success", condition="{{qa_results.status}} == 'failed'", loop_body=[ TaskOperator( task_id="perform_rework", function="rework.fix_issues", args=["{{qa_results.issues}}"] ), TaskOperator( task_id="rerun_qa", function="qa.inspect", result_key="qa_results", dependencies=["perform_rework"] ) ], dependencies=["initial_qa"] ) Example (YAML): retry_until_success: operator_type: while task_id: retry_until_success condition: "{{qa_results.status}} == 'failed'" loop_body: - operator_type: task task_id: perform_rework function: rework.fix_issues args: - "{{qa_results.issues}}" dependencies: - retry_until_success - operator_type: task task_id: rerun_qa function: qa.inspect result_key: qa_results dependencies: - perform_rework dependencies: - initial_qa 5.7. EmitEventOperator The EmitEventOperator emits a named event with an optional payload, enabling cross-workflow coordination. Class: EmitEventOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: event_name: str (REQUIRED) Globally unique name for the event. Other workflows wait for events by this name. payload: Dict[str, Any] (DEFAULT: {}) Arbitrary data to include with the event. Waiting workflows can access this payload. Operator Type: OperatorType.EMIT_EVENT Execution Semantics: 1. The runtime resolves all template variables in event_name and payload. 2. The runtime persists the event to durable storage (e.g., the Event table in the database). 3. The runtime notifies any workflows waiting for this event_name. 4. The operator completes immediately after emitting the event. 5. Events are typically scoped to a data_interval_start or other unique identifier to ensure the correct workflow run receives the event. Example (Python): emit = EmitEventOperator( task_id="notify_completion", event_name="pipeline_{{run.id}}_completed", payload={ "status": "success", "record_count": "{{result.count}}", "timestamp": "{{run.started_at}}" }, dependencies=["final_task"] ) Example (YAML): notify_completion: operator_type: emit_event task_id: notify_completion event_name: "pipeline_{{run.id}}_completed" payload: status: success record_count: "{{result.count}}" timestamp: "{{run.started_at}}" dependencies: - final_task 5.8. WaitForEventOperator The WaitForEventOperator pauses execution until a named event is received or a timeout occurs. Class: WaitForEventOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: event_name: str (REQUIRED) Name of the event to wait for. MUST match the event_name of an EmitEventOperator in another workflow. timeout_seconds: Optional[int] (DEFAULT: None) Maximum time to wait for the event, in seconds. If None, waits indefinitely. Operator Type: OperatorType.WAIT_FOR_EVENT Execution Semantics: 1. The runtime resolves all template variables in event_name. 2. The runtime checks if the event has already been emitted. If so, the operator completes immediately. 3. If the event has not been emitted: a. The runtime subscribes to the event. b. Execution is paused using a durable wait mechanism (e.g., Absurd's ctx.waitForEvent). 4. When the event is received: a. The event payload is stored in the workflow context (e.g., "{{wait_task.payload}}"). b. The operator completes successfully. 5. If timeout_seconds is specified and the timeout is reached: a. The operator fails. b. Retry and callback logic is triggered. Example (Python): wait = WaitForEventOperator( task_id="wait_upstream", event_name="pipeline_{{upstream_run_id}}_completed", timeout_seconds=3600, description="Wait for upstream pipeline to complete" ) Example (YAML): wait_upstream: operator_type: wait_for_event task_id: wait_upstream event_name: "pipeline_{{upstream_run_id}}_completed" timeout_seconds: 3600 description: Wait for upstream pipeline to complete dependencies: [] 5.9. JoinOperator The JoinOperator coordinates multiple tasks or parallel branches using explicit join modes (Temporal-style coordination pattern). Class: JoinOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: join_tasks: List[str] (REQUIRED) List of task IDs to wait for and coordinate. These tasks are evaluated to determine when the join completes. join_mode: JoinMode (DEFAULT: JoinMode.ALL_OF) Coordination mode that determines the join completion criteria. Operator Type: OperatorType.JOIN Join Modes: JoinMode.ALL_OF: Wait for all tasks to reach a final state (success or failure). The join succeeds when all tasks complete, regardless of their individual status. This is useful for ensuring all parallel work has finished before proceeding. JoinMode.ANY_OF: Wait for any single task to reach a final state. The join completes as soon as the first task finishes. Useful for race conditions or timeout patterns. JoinMode.ALL_SUCCESS: Wait for all tasks to succeed. The join fails immediately if any task fails. This is the strictest join mode and ensures all work completed successfully. JoinMode.ONE_SUCCESS: Wait for at least one task to succeed. The join succeeds when the first task succeeds. If all tasks fail, the join fails. Useful for fallback patterns. Execution Semantics: 1. The runtime evaluates the status of all tasks in join_tasks. 2. Based on join_mode: a. ALL_OF: Waits until all tasks have completed (success or failure). Join succeeds when all are done. b. ANY_OF: Completes when the first task finishes (success or failure). Join inherits that task's status. c. ALL_SUCCESS: Waits for all tasks to succeed. Fails immediately if any task fails. d. ONE_SUCCESS: Completes when one task succeeds. Fails if all tasks fail. 3. The JoinOperator provides explicit coordination that replaces brittle dependency-based joins and TriggerRule patterns. 4. Recommended over TriggerRule for all parallel workflow coordination. 5. Tasks that depend on the JoinOperator wait for the join to complete before executing. Example (Python): from highway_dsl import WorkflowBuilder, JoinMode builder = WorkflowBuilder("parallel_pipeline") builder.task("start", "setup.initialize") # Create parallel branches builder.task("branch_a", "process.method_a", dependencies=["start"]) builder.task("branch_b", "process.method_b", dependencies=["start"]) builder.task("branch_c", "process.method_c", dependencies=["start"]) # Join with ALL_SUCCESS - fail if any branch fails builder.join( task_id="sync_gate", join_tasks=["branch_a", "branch_b", "branch_c"], join_mode=JoinMode.ALL_SUCCESS, description="Wait for all branches to succeed" ) builder.task("finalize", "cleanup.finish", dependencies=["sync_gate"]) Example (YAML): sync_gate: operator_type: join task_id: sync_gate join_tasks: - branch_a - branch_b - branch_c join_mode: all_success description: Wait for all branches to succeed dependencies: [] 5.10. SwitchOperator The SwitchOperator provides multi-branch routing based on the value of an expression (similar to switch/case in programming languages). Class: SwitchOperator Module: highway_dsl.workflow_dsl Inherits: BaseOperator Additional Fields: switch_on: str (REQUIRED) Expression to evaluate. The result is matched against the keys in cases. cases: Dict[str, str] (REQUIRED) Dictionary mapping case values (as strings) to task IDs. When switch_on matches a key, the corresponding task is executed. default: Optional[str] (DEFAULT: None) Task ID to execute if switch_on does not match any case. If None and no match is found, the operator completes without executing any branch. Operator Type: OperatorType.SWITCH Execution Semantics: 1. The runtime resolves all template variables in switch_on. 2. The runtime evaluates switch_on to a value. 3. The value is converted to a string and compared against the keys in cases. 4. If a match is found: a. The corresponding task becomes ready for execution. b. All other branches are NOT executed. 5. If no match is found: a. If default is specified, that task becomes ready. b. If default is None, the operator completes without executing any branch. Example (Python): switch = SwitchOperator( task_id="route_by_status", switch_on="{{data.status}}", cases={ "approved": "approve_task", "rejected": "reject_task", "pending": "review_task" }, default="unknown_handler", dependencies=["fetch_status"] ) Example (YAML): route_by_status: operator_type: switch task_id: route_by_status switch_on: "{{data.status}}" cases: approved: approve_task rejected: reject_task pending: review_task default: unknown_handler dependencies: - fetch_status 6. Scheduling Specification 6.1. Cron-Based Scheduling Highway DSL supports Airflow-parity cron-based scheduling via the schedule field on the Workflow object. Field: schedule Type: Optional[str] Default: None Format: Standard 5-field cron expression (minute, hour, day, month, weekday). Examples: "0 2 * * *" - Daily at 2:00 AM "*/15 * * * *" - Every 15 minutes "0 0 * * 1" - Weekly on Monday at midnight "0 9-17 * * 1-5" - Hourly from 9 AM to 5 PM, Monday-Friday Execution Semantics: 1. The scheduler service (using APScheduler) reads workflows with a non-None schedule. 2. For each scheduled workflow, the scheduler: a. Creates an APScheduler job with the cron schedule. b. Configures the job to call a spawn function (e.g., _spawn_workflow_job). 3. When the cron schedule triggers: a. The spawn function calculates the data_interval_start (previous trigger time) and data_interval_end (current trigger time). b. A new WorkflowRun is created with these intervals as inputs. c. The workflow is submitted to the runtime for execution. 4. If is_paused is True, the scheduler does NOT create runs, but the APScheduler job remains active for re-enabling. 5. If max_active_runs is exceeded, the scheduler queues the run or skips it (implementation-specific). 6.2. Start Date and Catchup Field: start_date Type: Optional[datetime] Default: None Purpose: Defines when the schedule becomes active. No runs are created for times before start_date. Field: catchup Type: bool Default: False Purpose: Controls whether missed runs between start_date and the current time are backfilled. Execution Semantics: 1. If catchup is True: a. When the scheduler first processes the workflow, it calculates all missed trigger times between start_date and now. b. A WorkflowRun is created for each missed trigger. c. These runs are executed in chronological order. 2. If catchup is False: a. Only future trigger times (after now) result in runs. b. Past triggers are ignored. Example: workflow = ( WorkflowBuilder("daily_report") .set_schedule("0 2 * * *") .set_start_date(datetime(2025, 1, 1)) .set_catchup(False) .build() ) If this workflow is deployed on January 15, 2025: - With catchup=False: Only runs starting January 16 at 2 AM are created. - With catchup=True: Runs for January 2-15 are created and executed. 6.3. Pause State and Active Runs Field: is_paused Type: bool Default: False Purpose: Allows temporary suspension of scheduled execution without deleting the schedule. Execution Semantics: 1. If is_paused is True: a. The scheduler does NOT create new runs. b. Manual runs (via API or CLI) are still permitted. 2. If is_paused is False: a. Normal scheduled execution resumes. Field: max_active_runs Type: int Default: 1 Purpose: Limits the number of concurrent executions of the workflow. Execution Semantics: 1. Before creating a new run, the scheduler counts the number of WorkflowRun records with status IN (PENDING, RUNNING). 2. If the count is >= max_active_runs: a. The scheduler may queue the run or skip it (implementation- specific). b. When a running workflow completes, the scheduler checks the queue. 3. This prevents resource exhaustion from overlapping runs. 6.4. Workflow Tags Field: tags Type: List[str] Default: [] Purpose: Provides metadata for categorization, filtering, and search. Common Use Cases: - Environment tags: "production", "staging", "development" - Team tags: "data-eng", "ml-ops", "analytics" - Category tags: "etl", "reporting", "monitoring" Example: workflow = ( WorkflowBuilder("customer_pipeline") .add_tags("production", "data-eng", "daily") .build() ) Tags are stored in the database and exposed via APIs for filtering: GET /workflows?tags=production,data-eng 7. Event-Driven Coordination 7.1. Event Emission Events are emitted using the EmitEventOperator (see Section 5.7). Event Model: An event consists of: - event_name: Globally unique identifier - payload: Dictionary of arbitrary data - emitted_at: Timestamp when the event was emitted - workflow_run_id: ID of the workflow run that emitted the event Storage: Events are persisted to the Event table in the database: CREATE TABLE events ( id SERIAL PRIMARY KEY, event_name VARCHAR NOT NULL, payload JSONB, emitted_at TIMESTAMP WITH TIME ZONE NOT NULL, workflow_run_id INTEGER REFERENCES workflow_runs(id) ); Uniqueness: Events are typically scoped to a specific workflow run to avoid cross-contamination: event_name: "pipeline_{{run.id}}_completed" This ensures that waiting workflows receive events from the correct upstream run. 7.2. Event Waiting Workflows wait for events using the WaitForEventOperator (see Section 5.8). Durable Waiting: The runtime MUST use a durable wait mechanism (e.g., Absurd's ctx.waitForEvent) to ensure: 1. The wait survives runtime crashes. 2. The workflow resumes exactly once when the event is received. Polling vs. Notification: Implementations may use: - Polling: Periodically query the Event table for matching events. - Notification: Use database triggers (e.g., PostgreSQL LISTEN/ NOTIFY) to receive immediate notification when events are emitted. Timeout Behavior: If timeout_seconds is specified: 1. The runtime sets a timer for timeout_seconds. 2. If the event is not received before the timeout: a. The operator fails. b. If an on_failure callback is specified, it is executed. 7.3. Cross-Workflow Communication Events enable workflows to coordinate without tight coupling. Use Case 1: Sequential Pipelines Workflow A emits an event when it completes. Workflow B waits for that event before starting. Workflow A: builder.emit_event( "notify_completion", event_name="pipeline_a_{{ds}}_done", payload={"status": "success"} ) Workflow B: builder.wait_for_event( "wait_for_a", event_name="pipeline_a_{{ds}}_done", timeout_seconds=3600 ) Use Case 2: Fan-Out/Fan-In Workflow A emits an event. Multiple workflows (B, C, D) wait for that event and execute in parallel. Workflow E waits for events from B, C, and D before starting. Use Case 3: Dynamic Dependencies A monitoring workflow emits an event when it detects an anomaly. Multiple remediation workflows wait for this event and execute different corrective actions. 8. Error Handling and Resilience 8.1. Retry Policies Retry policies are defined using the RetryPolicy class (see Section 4.4). Application Hierarchy: 1. Operator-Level Retry Policy: If an operator specifies retry_policy, it is used. 2. Workflow-Level Default Retry Policy: If an operator does not specify retry_policy, the workflow's default_retry_policy is used (if specified). 3. No Retry: If neither is specified, the operator is not retried on failure. Retry Logic: When an operator fails: 1. If retry_policy.max_retries > 0: a. The runtime waits for retry_policy.delay. b. The operator is re-executed. c. If it fails again, the delay is multiplied by retry_policy.backoff_factor. d. Steps 1-3 repeat until max_retries is exhausted or the operator succeeds. 2. If retry_policy.max_retries == 0: a. The operator is marked as failed immediately. b. Callback logic (on_failure) is triggered. Retry Counter: The runtime MUST track the retry count for each operator execution. This count is reset when the operator succeeds. Example: workflow = ( WorkflowBuilder("resilient_pipeline") .set_default_retry_policy(RetryPolicy( max_retries=2, delay=timedelta(seconds=30), backoff_factor=2.0 )) .task("flaky_api_call", "api.fetch_data") .task("critical_operation", "db.write", retry_policy=RetryPolicy( max_retries=5, delay=timedelta(seconds=60), backoff_factor=1.5 )) .build() ) In this example: - flaky_api_call uses the default retry policy (2 retries, 30s, 60s delays). - critical_operation uses a custom retry policy (5 retries, 60s, 90s, 135s, 202s, 303s delays). 8.2. Timeout Policies Timeout policies are defined using the TimeoutPolicy class (see Section 4.5). Timeout Enforcement: When an operator has a timeout_policy: 1. The runtime starts a timer when the operator begins execution. 2. If the operator completes before the timeout: a. The timer is cancelled. b. Normal success/failure logic applies. 3. If the timeout is reached before the operator completes: a. If kill_on_timeout is True: i. The runtime terminates the operator's execution. ii. This may involve killing a subprocess, cancelling a database transaction, or sending a SIGTERM. b. The operator is marked as failed. c. Retry and callback logic is triggered. Interaction with Retries: Timeouts and retries interact as follows: 1. Each retry attempt has its own independent timeout. 2. If an attempt times out, it counts as a failure for retry purposes. Example: Retry Policy: max_retries=3, delay=10s Timeout Policy: timeout=1 minute Execution Timeline: - Attempt 1: Runs for 1 minute, times out, fails. - Wait 10 seconds. - Attempt 2: Runs for 1 minute, times out, fails. - Wait 20 seconds (backoff_factor=2.0). - Attempt 3: Runs for 1 minute, times out, fails. - Wait 40 seconds. - Attempt 4 (final): Runs for 1 minute, times out, fails. - Operator is permanently failed. 8.3. Callback Hooks Callback hooks are defined using the on_success_task_id and on_failure_task_id fields on BaseOperator. Success Callbacks: When an operator completes successfully: 1. If on_success_task_id is specified: a. The runtime executes the specified task. b. This task is treated as a normal operator with its own retry/timeout policies. c. The callback task's success or failure does NOT affect the original operator's status. 2. If the callback task fails: a. The original operator remains marked as successful. b. The callback task's on_failure callback (if any) is triggered. Failure Callbacks: When an operator fails (after retries are exhausted): 1. If on_failure_task_id is specified: a. The runtime executes the specified task. b. The callback task receives context about the failure (e.g., {{failed_task_id}}, {{error_message}}). 2. If the callback task succeeds: a. The original operator remains marked as failed. b. The workflow may continue or stop depending on the dependency graph. 3. If the callback task fails: a. Both the original operator and the callback are marked as failed. Durable Callbacks: Unlike Airflow's fire-and-forget callbacks, Highway callbacks are first-class workflow operators. This means: 1. Callbacks are durable: If the runtime crashes during callback execution, it resumes after restart. 2. Callbacks are retryable: Callbacks can have their own retry policies. 3. Callbacks are visible: Callback execution is tracked in the database and visible in UIs. Example: builder = WorkflowBuilder("monitored_pipeline") builder.task("risky_operation", "api.call_external_service") builder.task("alert_on_failure", "notifications.send_slack", args=["Operation failed: {{error_message}}"]) builder.task("cleanup_on_success", "db.cleanup_temp_tables") # Attach callbacks to the risky operation builder.workflow.tasks["risky_operation"].on_failure_task_id = ( "alert_on_failure" ) builder.workflow.tasks["risky_operation"].on_success_task_id = ( "cleanup_on_success" ) 8.4. Error Propagation Error propagation follows these rules: 1. If an operator fails and has no retry policy: a. The operator is marked as failed immediately. 2. If an operator fails and retries are exhausted: a. The operator is marked as failed. b. If an on_failure callback is specified, it is executed. 3. If an operator fails and dependent operators exist: a. Dependent operators are NOT executed. b. The workflow status is set to FAILED. 4. In a ParallelOperator: a. If any branch fails, the ParallelOperator fails. b. Other branches may continue (runtime-dependent). 5. In a ForEachOperator: a. If any iteration fails, the ForEachOperator fails. b. Other iterations may continue (runtime-dependent). 6. In a WhileOperator: a. If the loop body fails, the WhileOperator fails. b. The loop is NOT retried (unless a retry policy is on the WhileOperator itself). 9. WorkflowBuilder API 9.1. Fluent Interface Design The WorkflowBuilder class provides a fluent API for constructing workflows programmatically. Class: WorkflowBuilder Module: highway_dsl.workflow_dsl Constructor: def __init__( self, name: str, existing_workflow: Optional[Workflow] = None, parent: Optional["WorkflowBuilder"] = None ) Parameters: - name: Workflow name - existing_workflow: If provided, the builder wraps this existing workflow instead of creating a new one. - parent: If provided, this builder is a sub-builder (used for condition branches and loop bodies). Internal State: - workflow: The Workflow object being constructed. - _current_task: The task ID of the most recently added operator (used for automatic dependency chaining). - parent: Reference to parent builder (for nested contexts). Fluent Pattern: All builder methods return self, enabling method chaining: workflow = ( WorkflowBuilder("example") .task("a", "func_a") .task("b", "func_b") .task("c", "func_c") .build() ) This creates a linear chain: a -> b -> c. 9.2. Task Definition Methods def task( self, task_id: str, function: str, **kwargs ) -> "WorkflowBuilder" Create a TaskOperator. Parameters: - task_id: Unique identifier - function: Fully-qualified function name - **kwargs: Additional fields (args, kwargs, result_key, dependencies, retry_policy, timeout_policy, description, etc.) Returns: self Behavior: - Creates a TaskOperator with the specified parameters. - If dependencies is not specified and _current_task is set, adds _current_task as a dependency (automatic chaining). - Sets _current_task to task_id. Example: builder.task( "fetch_data", "api.fetch", args=["{{url}}"], result_key="raw_data", retry_policy=RetryPolicy(max_retries=3) ) 9.3. Control Flow Methods def condition( self, task_id: str, condition: str, if_true: Callable[["WorkflowBuilder"], "WorkflowBuilder"], if_false: Callable[["WorkflowBuilder"], "WorkflowBuilder"], **kwargs ) -> "WorkflowBuilder" Create a ConditionOperator with two branches. Parameters: - task_id: Unique identifier - condition: Boolean expression - if_true: Lambda that receives a sub-builder and returns it after adding tasks to the true branch - if_false: Lambda that receives a sub-builder and returns it after adding tasks to the false branch - **kwargs: Additional fields Returns: self Example: builder.condition( "check_quality", condition="{{data.quality}} > 0.8", if_true=lambda b: b.task("high_quality", "process.advanced"), if_false=lambda b: b.task("low_quality", "process.basic") ) def parallel( self, task_id: str, branches: Dict[str, Callable[["WorkflowBuilder"], "WorkflowBuilder"]], **kwargs ) -> "WorkflowBuilder" Create a ParallelOperator with multiple branches. Parameters: - task_id: Unique identifier - branches: Dictionary mapping branch names to lambdas - **kwargs: Additional fields Returns: self Example: builder.parallel( "parallel_transform", branches={ "transform_a": lambda b: b.task("t_a", "transform.a"), "transform_b": lambda b: b.task("t_b", "transform.b"), "transform_c": lambda b: b.task("t_c", "transform.c") } ) def foreach( self, task_id: str, items: str, loop_body: Callable[["WorkflowBuilder"], "WorkflowBuilder"], **kwargs ) -> "WorkflowBuilder" Create a ForEachOperator. Parameters: - task_id: Unique identifier - items: Template expression evaluating to an iterable - loop_body: Lambda that receives a sub-builder and returns it after adding loop body tasks - **kwargs: Additional fields Returns: self Example: builder.foreach( "process_items", items="{{data.records}}", loop_body=lambda b: b.task("process", "handler.process", args=["{{item}}"]) ) def while_loop( self, task_id: str, condition: str, loop_body: Callable[["WorkflowBuilder"], "WorkflowBuilder"], **kwargs ) -> "WorkflowBuilder" Create a WhileOperator. Parameters: - task_id: Unique identifier - condition: Boolean expression - loop_body: Lambda that receives a sub-builder and returns it after adding loop body tasks - **kwargs: Additional fields Returns: self Example: builder.while_loop( "retry_loop", condition="{{status}} == 'pending'", loop_body=lambda b: b.task("check_status", "check.status", result_key="status") ) def wait( self, task_id: str, wait_for: Union[timedelta, datetime, str], **kwargs ) -> "WorkflowBuilder" Create a WaitOperator. Parameters: - task_id: Unique identifier - wait_for: Duration or datetime to wait for - **kwargs: Additional fields Returns: self Example: builder.wait("daily_wait", wait_for=timedelta(days=1)) 9.4. Scheduling Methods def set_schedule(self, cron: str) -> "WorkflowBuilder" Set the cron schedule for the workflow. Example: builder.set_schedule("0 2 * * *") def set_start_date(self, start_date: datetime) -> "WorkflowBuilder" Set when the schedule becomes active. Example: builder.set_start_date(datetime(2025, 1, 1)) def set_catchup(self, enabled: bool) -> "WorkflowBuilder" Set whether to backfill missed runs. Example: builder.set_catchup(False) def set_paused(self, paused: bool) -> "WorkflowBuilder" Set whether the workflow is paused. Example: builder.set_paused(False) def add_tags(self, *tags: str) -> "WorkflowBuilder" Add tags to the workflow. Example: builder.add_tags("production", "daily", "critical") def set_max_active_runs(self, count: int) -> "WorkflowBuilder" Set maximum concurrent runs. Example: builder.set_max_active_runs(3) def set_default_retry_policy( self, policy: RetryPolicy ) -> "WorkflowBuilder" Set default retry policy for all tasks. Example: builder.set_default_retry_policy( RetryPolicy(max_retries=3, delay=timedelta(seconds=60)) ) 9.5. Event Methods def emit_event( self, task_id: str, event_name: str, **kwargs ) -> "WorkflowBuilder" Create an EmitEventOperator. Parameters: - task_id: Unique identifier - event_name: Name of the event - **kwargs: Additional fields (payload, dependencies, etc.) Returns: self Example: builder.emit_event( "notify_downstream", event_name="pipeline_{{run.id}}_done", payload={"status": "success", "count": "{{record_count}}"} ) def wait_for_event( self, task_id: str, event_name: str, timeout_seconds: Optional[int] = None, **kwargs ) -> "WorkflowBuilder" Create a WaitForEventOperator. Parameters: - task_id: Unique identifier - event_name: Name of the event to wait for - timeout_seconds: Timeout in seconds - **kwargs: Additional fields Returns: self Example: builder.wait_for_event( "wait_upstream", event_name="pipeline_{{upstream_id}}_done", timeout_seconds=3600 ) 9.6. Error Handling Methods def retry( self, max_retries: int = 3, delay: timedelta = timedelta(seconds=5), backoff_factor: float = 2.0 ) -> "WorkflowBuilder" Set retry policy on the current task. Returns: self Behavior: - Applies to _current_task (the most recently added task). - Only works if _current_task is a TaskOperator. Example: builder.task("api_call", "api.fetch").retry(max_retries=5, delay=timedelta(seconds=30)) def timeout( self, timeout: timedelta, kill_on_timeout: bool = True ) -> "WorkflowBuilder" Set timeout policy on the current task. Returns: self Example: builder.task("slow_operation", "compute.heavy").timeout( timeout=timedelta(hours=2) ) def on_success(self, success_task_id: str) -> "WorkflowBuilder" Set success callback on the current task. Returns: self Example: builder.task("risky_task", "api.call").on_success("cleanup_task") def on_failure(self, failure_task_id: str) -> "WorkflowBuilder" Set failure callback on the current task. Returns: self Example: builder.task("risky_task", "api.call").on_failure("alert_task") 9.7. Coordination Methods def join( self, task_id: str, join_tasks: list[str], join_mode: JoinMode, **kwargs ) -> "WorkflowBuilder" Create a JoinOperator to coordinate multiple parallel branches. Parameters: - task_id: Unique identifier - join_tasks: List of task IDs to wait for and coordinate - join_mode: JoinMode enumeration value (ALL_OF, ANY_OF, ALL_SUCCESS, ONE_SUCCESS) - **kwargs: Additional fields (dependencies, description, etc.) Returns: self Behavior: - Creates a JoinOperator that waits for the specified tasks based on the join_mode. - Replaces brittle TriggerRule-based coordination with explicit join semantics. - Recommended over dependency-based joins for all parallel work coordination. Join Modes: - JoinMode.ALL_OF: Wait for all tasks to finish (any final state) - JoinMode.ANY_OF: Complete when first task finishes - JoinMode.ALL_SUCCESS: Wait for all tasks to succeed (fail fast) - JoinMode.ONE_SUCCESS: Complete when first task succeeds Example: builder.task("start", "setup.init") builder.task("branch_a", "process.a", dependencies=["start"]) builder.task("branch_b", "process.b", dependencies=["start"]) builder.task("branch_c", "process.c", dependencies=["start"]) builder.join( task_id="sync_gate", join_tasks=["branch_a", "branch_b", "branch_c"], join_mode=JoinMode.ALL_SUCCESS, description="Wait for all branches to succeed" ) builder.task("finalize", "cleanup.finish", dependencies=["sync_gate"]) This creates a fan-out/fan-in pattern where all three branches must succeed before proceeding to finalize. 9.8. Build and Finalization def build(self) -> Workflow Finalize and return the constructed Workflow object. Behavior: - If start_task is not set, sets it to the first task in the workflow. - Returns the workflow object. - After calling build(), the builder should not be used further. Example: workflow = ( WorkflowBuilder("example") .task("a", "func_a") .task("b", "func_b") .build() ) 10. Variable Resolution and Templating 10.1. Template Syntax Highway DSL supports template variables using double-brace syntax: {{variable_name}} Templates are resolved by the runtime at execution time, not during workflow construction. Supported Locations: Templates can be used in: - TaskOperator: args, kwargs - ConditionOperator: condition - WhileOperator: condition - ForEachOperator: items - EmitEventOperator: event_name, payload - WaitForEventOperator: event_name - SwitchOperator: switch_on Templates are NOT supported in: - task_id fields - function fields - Workflow.name 10.2. Context Variables The following variables are available in template expressions: Workflow Context: {{inputs}} The workflow_run.inputs dictionary. Typically includes data_interval_start, data_interval_end, and other scheduler- provided values. {{ds}} The data interval start timestamp (alias for {{inputs.data_interval_start}}). {{run}} The WorkflowRun object (includes id, status, started_at, etc.). Task Context: {{item}} In ForEachOperator loop bodies, the current iteration item. {{}} Access the result of a previous task (if it specified result_key). {{.output}} Explicit access to a task's output. Workflow Variables: All keys in Workflow.variables are available as top-level template variables. Example: variables = {"api_url": "https://api.example.com"} Template: {{api_url}}/users Resolved: https://api.example.com/users 10.3. Runtime Resolution Template resolution is performed by the runtime (not the DSL) during workflow execution. Resolution Process: 1. The runtime builds a context dictionary containing all available variables. 2. For each operator, the runtime: a. Identifies all string fields that may contain templates. b. Replaces {{variable_name}} with the value from the context. 3. If a variable is not found: a. The runtime MAY raise an error (strict mode). b. The runtime MAY leave the template unresolved (lenient mode). Future Enhancement (Mako Templating): Version 1.1.0 uses simple string replacement. Future versions will use Mako templating for advanced features: - Conditional logic: % if condition: - Loops: % for item in items: - Functions: ${ macros.ds_add(ds, 7) } 11. Serialization Formats 11.1. YAML Serialization Highway DSL workflows can be serialized to YAML using the to_yaml() method. Method: Workflow.to_yaml() -> str Format: Standard YAML 1.2, generated using PyYAML's safe_dump. Field Ordering: Fields are ordered alphabetically by PyYAML (not guaranteed). Exclusions: - Fields with None values are excluded. - Fields marked exclude=True are excluded (e.g., is_internal_loop_task). Special Handling: timedelta: Serialized as ISO 8601 duration (e.g., PT1H for 1 hour). datetime: Serialized as ISO 8601 timestamp (e.g., 2025-01-01T00:00:00). Enum: Serialized as string value (e.g., "task" not "OperatorType.TASK"). Example: Python: workflow = ( WorkflowBuilder("simple_etl") .task("extract", "etl.extract", result_key="raw_data") .task("transform", "etl.transform", args=["{{raw_data}}"]) .build() ) YAML: name: simple_etl version: 1.1.0 description: '' tasks: extract: task_id: extract operator_type: task function: etl.extract args: [] kwargs: {} result_key: raw_data dependencies: [] metadata: {} description: '' transform: task_id: transform operator_type: task function: etl.transform args: - "{{raw_data}}" kwargs: {} dependencies: - extract metadata: {} description: '' variables: {} start_task: extract catchup: false is_paused: false tags: [] max_active_runs: 1 11.2. JSON Serialization Highway DSL workflows can be serialized to JSON using the to_json() method. Method: Workflow.to_json() -> str Format: Standard JSON (RFC 8259), with 2-space indentation. Field Ordering: Fields are ordered as defined in the Pydantic models. Exclusions: Same as YAML (None values and exclude=True fields). Special Handling: Same as YAML, but using JSON-compatible representations: timedelta: "PT1H" (string) datetime: "2025-01-01T00:00:00" (string, ISO 8601) Example: JSON: { "name": "simple_etl", "version": "1.3.0", "description": "", "tasks": { "extract": { "task_id": "extract", "operator_type": "task", "function": "etl.extract", "args": [], "kwargs": {}, "result_key": "raw_data", "dependencies": [], "metadata": {}, "description": "" }, "transform": { "task_id": "transform", "operator_type": "task", "function": "etl.transform", "args": ["{{raw_data}}"], "kwargs": {}, "dependencies": ["extract"], "metadata": {}, "description": "" } }, "variables": {}, "start_task": "extract", "catchup": false, "is_paused": false, "tags": [], "max_active_runs": 1 } 11.3. Deserialization and Validation Workflows can be deserialized from YAML or JSON using class methods. Method: Workflow.from_yaml(yaml_str: str) -> Workflow Method: Workflow.from_json(json_str: str) -> Workflow Validation: Deserialization includes comprehensive Pydantic validation: 1. Type checking: All fields must match their declared types. 2. Required fields: Fields without defaults must be present. 3. Enum validation: Operator types must be valid OperatorType values. 4. Custom validators: Model validators are executed. Error Handling: If validation fails, a pydantic.ValidationError is raised with detailed error messages. Example: yaml_str = """ name: invalid_workflow version: 1.1.0 tasks: bad_task: operator_type: INVALID_TYPE task_id: bad_task """ try: workflow = Workflow.from_yaml(yaml_str) except ValidationError as e: print(e) # Output: Unknown operator type: INVALID_TYPE 12. Dependency Management 12.1. Explicit Dependencies Dependencies can be specified explicitly using the dependencies field on any operator. Example: builder.task("a", "func_a") builder.task("b", "func_b", dependencies=["a"]) builder.task("c", "func_c", dependencies=["a", "b"]) Dependency Graph: a ├── b └── c (also depends on b) Validation: The runtime MUST validate that: 1. All task IDs in dependencies exist in the workflow. 2. No circular dependencies exist. 12.2. Implicit Chain Dependencies The WorkflowBuilder automatically chains tasks if dependencies are not specified. Example: builder.task("a", "func_a") builder.task("b", "func_b") builder.task("c", "func_c") Equivalent to: builder.task("a", "func_a") builder.task("b", "func_b", dependencies=["a"]) builder.task("c", "func_c", dependencies=["b"]) Dependency Graph: a -> b -> c The builder tracks the most recently added task in _current_task and automatically adds it as a dependency to the next task. 12.3. Loop Isolation Tasks inside loop bodies (ForEachOperator, WhileOperator) are marked with is_internal_loop_task=True. Purpose: Prevent external operators (e.g., ParallelOperator) from incorrectly injecting dependencies into loop body tasks. Behavior: 1. When a loop body task is added to the workflow, it is marked as internal. 2. Other operators (e.g., ParallelOperator) check this flag and do NOT add themselves as dependencies. 3. The first task in the loop body has the loop operator as a dependency. Example: builder.foreach( "process_items", items="{{data.items}}", loop_body=lambda b: b.task("process", "handler.process", args=["{{item}}"]) ) Internal representation: - ForEachOperator(task_id="process_items") - TaskOperator(task_id="process", dependencies=["process_items"], is_internal_loop_task=True) 12.4. Parallel Branch Dependencies Tasks in ParallelOperator branches automatically receive the parallel operator as a dependency. Example: builder.parallel( "parallel_transform", branches={ "a": lambda b: b.task("t_a", "transform.a"), "b": lambda b: b.task("t_b", "transform.b") } ) Internal representation: - ParallelOperator(task_id="parallel_transform", branches={"a": ["t_a"], "b": ["t_b"]}) - TaskOperator(task_id="t_a", dependencies=["parallel_transform"]) - TaskOperator(task_id="t_b", dependencies=["parallel_transform"]) 13. Integration with Absurd Runtime 13.1. Mapping Operators to Absurd Primitives Each Highway DSL operator is mapped to Absurd SDK calls by the RefactoredInterpreter. TaskOperator: Mapped to ctx.step() Example: DSL: TaskOperator(task_id="fetch", function="api.fetch", args=["{{url}}"]) Runtime: result = ctx.step("fetch", tool="api.fetch", args=[resolved_url]) ConditionOperator: Mapped to Python if/else with recursive _walk_graph calls. Example: DSL: ConditionOperator(task_id="check", condition="{{score}} > 0.8", if_true="high", if_false="low") Runtime: if eval(resolved_condition): _walk_graph("high") else: _walk_graph("low") WaitOperator: Mapped to ctx.sleep() Example: DSL: WaitOperator(task_id="wait", wait_for=timedelta(hours=1)) Runtime: ctx.sleep(3600) ParallelOperator: Mapped to absurd.spawn() for each branch. Example: DSL: ParallelOperator(task_id="parallel", branches={"a": ["t_a"], "b": ["t_b"]}) Runtime: handles = [] for branch in branches.values(): handle = absurd.spawn("branch_executor", branch_tasks=branch) handles.append(handle) for handle in handles: handle.wait() ForEachOperator: Mapped to a Python for loop with ctx.step() calls. Example: DSL: ForEachOperator(task_id="foreach", items="{{data.items}}", loop_body=[...]) Runtime: items = eval(resolved_items) for item in items: ctx.set_variable("item", item) execute_loop_body() WhileOperator: Mapped to a Python while loop with ctx.step() calls. EmitEventOperator: Mapped to absurd.emitEvent() Example: DSL: EmitEventOperator(task_id="emit", event_name="done", payload={"status": "ok"}) Runtime: absurd.emitEvent(event_name="done", payload={"status": "ok"}) WaitForEventOperator: Mapped to ctx.waitForEvent() Example: DSL: WaitForEventOperator(task_id="wait", event_name="upstream_done", timeout_seconds=3600) Runtime: result = ctx.waitForEvent(event_name="upstream_done", timeout=3600) SwitchOperator: Mapped to Python match/case (or dict lookup for older Python). 13.2. Checkpoint Management Absurd provides checkpointing via ctx.step(). Checkpointing Strategy: 1. Each TaskOperator is wrapped in a ctx.step() call. 2. If the runtime crashes during task execution: a. Absurd records the checkpoint for the failed step. b. On restart, Absurd resumes from the last successful checkpoint. c. The failed task is re-executed (unless retries are exhausted). 3. Results from completed tasks are stored in Absurd's checkpoint storage. 4. Template variables like {{task_id}} retrieve results from checkpoint storage. 13.3. Durable State Absurd ensures durable state by: 1. Storing all workflow state (variables, task results, checkpoint markers) in PostgreSQL. 2. Using serializable transactions to ensure consistency. 3. Persisting state before returning from each ctx.step() call. This ensures: - Exactly-once execution of each task. - Crash-resistance. - Ability to resume workflows after failures. 14. Security Considerations 14.1. Code Injection Risks Template expressions are evaluated by the runtime, which may use Python's eval() or a similar mechanism. Risk: If user-controlled data is used in template expressions without validation, code injection is possible. Example: malicious_input = "__import__('os').system('rm -rf /')" template = f"{{{{data.user_input}}}}" # Becomes: {{__import__...}} # Runtime evaluates this, executing the malicious code. Mitigation: 1. Input Validation: Validate all user inputs before including them in template contexts. 2. Restricted Evaluation: Use a restricted evaluator (e.g., Python's ast.literal_eval or a sandboxed interpreter) instead of eval(). 3. Template Whitelisting: Only allow templates in specific, controlled fields. 4. Future Enhancement: Replace eval() with Mako templating, which provides better sandboxing. 14.2. Variable Validation Workflows may reference variables that do not exist at runtime. Risk: Missing variables can cause workflows to fail unexpectedly. Mitigation: 1. Static Analysis: Analyze workflows during deployment to identify all referenced variables. 2. Required Variables: Require workflows to declare all expected variables. 3. Default Values: Allow workflows to specify default values for optional variables. 14.3. Secret Management Workflows often require secrets (API keys, passwords, database credentials). Anti-Pattern: Storing secrets in Workflow.variables or as literal strings in args/kwargs. Best Practices: 1. Secret Store Integration: Integrate with a secret store (e.g., HashiCorp Vault, AWS Secrets Manager). 2. Secret References: Use template variables that reference secrets by name: {{secrets.api_key}} 3. Runtime Resolution: The runtime fetches secrets at execution time, not during workflow definition. 4. Audit Logging: Log all secret accesses for security auditing. 15. Examples 15.1. Simple Linear Workflow A workflow with three sequential tasks. Python: from highway_dsl import WorkflowBuilder from datetime import timedelta workflow = ( WorkflowBuilder("simple_etl") .task("extract", "etl.extract_data", result_key="raw_data") .task("transform", "etl.transform_data", args=["{{raw_data}}"], result_key="clean_data") .retry(max_retries=3, delay=timedelta(seconds=10)) .task("load", "etl.load_data", args=["{{clean_data}}"]) .timeout(timeout=timedelta(minutes=30)) .build() ) print(workflow.to_yaml()) YAML Output: name: simple_etl version: 1.3.0 tasks: extract: task_id: extract operator_type: task function: etl.extract_data result_key: raw_data dependencies: [] transform: task_id: transform operator_type: task function: etl.transform_data args: - "{{raw_data}}" result_key: clean_data dependencies: - extract retry_policy: max_retries: 3 delay: PT10S backoff_factor: 2.0 load: task_id: load operator_type: task function: etl.load_data args: - "{{clean_data}}" dependencies: - transform timeout_policy: timeout: PT30M kill_on_timeout: true start_task: extract 15.2. Conditional Branching A workflow that routes based on data quality. Python: from highway_dsl import WorkflowBuilder workflow = ( WorkflowBuilder("quality_check_pipeline") .task("fetch", "api.fetch_data", result_key="data") .task("validate", "validators.check_quality", args=["{{data}}"], result_key="quality_score") .condition( "check_quality", condition="{{quality_score}} > 0.8", if_true=lambda b: b.task("premium_processing", "process.premium", args=["{{data}}"]), if_false=lambda b: b.task("standard_processing", "process.standard", args=["{{data}}"]) ) .task("finalize", "api.publish_results", dependencies=["check_quality"]) .build() ) 15.3. Parallel Execution A workflow with three parallel branches. Python: from highway_dsl import WorkflowBuilder workflow = ( WorkflowBuilder("parallel_pipeline") .task("fetch", "api.fetch_data", result_key="data") .parallel( "parallel_transform", branches={ "transform_a": lambda b: b.task("t_a", "transform.method_a", args=["{{data}}"], result_key="result_a"), "transform_b": lambda b: b.task("t_b", "transform.method_b", args=["{{data}}"], result_key="result_b"), "transform_c": lambda b: b.task("t_c", "transform.method_c", args=["{{data}}"], result_key="result_c") } ) .task("aggregate", "aggregate.combine", args=["{{result_a}}", "{{result_b}}", "{{result_c}}"]) .build() ) 15.4. ForEach Loops A workflow that processes each record in a collection. Python: from highway_dsl import WorkflowBuilder workflow = ( WorkflowBuilder("batch_processor") .task("fetch_records", "db.fetch_pending", result_key="records") .foreach( "process_records", items="{{records}}", loop_body=lambda b: ( b.task("validate", "validators.validate_record", args=["{{item}}"]) .task("enrich", "enrichment.add_metadata", args=["{{item}}"], result_key="enriched") .task("save", "db.save_record", args=["{{enriched}}"]) ) ) .task("summarize", "reporting.create_summary") .build() ) 15.5. While Loops A workflow that retries QA until it passes. Python: from highway_dsl import WorkflowBuilder workflow = ( WorkflowBuilder("qa_rework_pipeline") .task("initial_qa", "qa.inspect_product", result_key="qa_results") .while_loop( "rework_loop", condition="{{qa_results.status}} == 'failed'", loop_body=lambda b: ( b.task("perform_rework", "rework.fix_issues", args=["{{qa_results.issues}}"]) .task("rerun_qa", "qa.inspect_product", result_key="qa_results") ) ) .task("finalize", "production.mark_complete") .build() ) 15.6. Scheduled Workflows A workflow that runs daily at 2 AM. Python: from highway_dsl import WorkflowBuilder from datetime import datetime, timedelta workflow = ( WorkflowBuilder("daily_report") .set_schedule("0 2 * * *") .set_start_date(datetime(2025, 1, 1)) .set_catchup(False) .add_tags("production", "daily", "reporting") .set_max_active_runs(1) .task("extract", "reports.extract_daily_data", args=["{{ds}}"], result_key="data") .task("transform", "reports.transform_data", args=["{{data}}"], result_key="report") .task("send", "email.send_report", args=["{{report}}"]) .build() ) 15.7. Event-Driven Workflows Two workflows coordinating via events. Python (Upstream Workflow): from highway_dsl import WorkflowBuilder upstream = ( WorkflowBuilder("upstream_pipeline") .task("process_data", "etl.process", result_key="result") .emit_event( "notify_downstream", event_name="pipeline_{{ds}}_completed", payload={"status": "success", "count": "{{result.count}}"} ) .build() ) Python (Downstream Workflow): from highway_dsl import WorkflowBuilder downstream = ( WorkflowBuilder("downstream_pipeline") .wait_for_event( "wait_for_upstream", event_name="pipeline_{{ds}}_completed", timeout_seconds=3600 ) .task("consume_data", "consumer.process", args=["{{wait_for_upstream.payload}}"]) .build() ) 15.8. Callback Hooks A workflow with success and failure callbacks. Python: from highway_dsl import WorkflowBuilder workflow = ( WorkflowBuilder("monitored_pipeline") .task("risky_operation", "api.call_external_service", result_key="result") .task("success_handler", "notifications.send_success", args=["Operation succeeded: {{result}}"]) .task("failure_handler", "notifications.send_alert", args=["Operation failed"]) .build() ) # Attach callbacks workflow.tasks["risky_operation"].on_success_task_id = "success_handler" workflow.tasks["risky_operation"].on_failure_task_id = "failure_handler" 15.9. Switch/Case Logic A workflow that routes based on a status field. Python: from highway_dsl import WorkflowBuilder workflow = ( WorkflowBuilder("order_processor") .task("fetch_order", "orders.fetch", result_key="order") .switch( "route_by_status", switch_on="{{order.status}}", cases={ "pending": "process_pending", "approved": "process_approved", "rejected": "process_rejected", "cancelled": "process_cancelled" }, default="handle_unknown_status" ) .task("process_pending", "orders.process_pending", args=["{{order}}"], dependencies=["route_by_status"]) .task("process_approved", "orders.process_approved", args=["{{order}}"], dependencies=["route_by_status"]) .task("process_rejected", "orders.process_rejected", args=["{{order}}"], dependencies=["route_by_status"]) .task("process_cancelled", "orders.process_cancelled", args=["{{order}}"], dependencies=["route_by_status"]) .task("handle_unknown_status", "errors.log_unknown_status", args=["{{order}}"], dependencies=["route_by_status"]) .build() ) 15.10. Join Coordination A workflow demonstrating explicit join coordination for parallel branches. Python (ALL_SUCCESS Join): from highway_dsl import WorkflowBuilder, JoinMode workflow = ( WorkflowBuilder("data_pipeline_with_join") .task("start", "setup.initialize", result_key="config") # Create parallel branches .task("branch_a", "process.extract_users", args=["{{config}}"], result_key="users", dependencies=["start"]) .task("branch_b", "process.extract_products", args=["{{config}}"], result_key="products", dependencies=["start"]) .task("branch_c", "process.extract_orders", args=["{{config}}"], result_key="orders", dependencies=["start"]) # Explicit join - wait for all branches to succeed .join( task_id="sync_gate", join_tasks=["branch_a", "branch_b", "branch_c"], join_mode=JoinMode.ALL_SUCCESS, description="Wait for all extractions to succeed" ) # Continue after join .task("merge_data", "process.merge_all", args=["{{users}}", "{{products}}", "{{orders}}"], result_key="merged", dependencies=["sync_gate"]) .task("finalize", "db.save_results", args=["{{merged}}"]) .build() ) Python (ONE_SUCCESS Join - Fallback Pattern): from highway_dsl import WorkflowBuilder, JoinMode workflow = ( WorkflowBuilder("multi_source_fallback") .task("start", "setup.prepare") # Try multiple data sources in parallel .task("fetch_primary", "api.fetch_from_primary", result_key="data", dependencies=["start"]) .task("fetch_secondary", "api.fetch_from_secondary", result_key="data", dependencies=["start"]) .task("fetch_tertiary", "api.fetch_from_tertiary", result_key="data", dependencies=["start"]) # Join succeeds when first source succeeds .join( task_id="data_available", join_tasks=["fetch_primary", "fetch_secondary", "fetch_tertiary"], join_mode=JoinMode.ONE_SUCCESS, description="Proceed when any source succeeds" ) .task("process", "process.handle_data", args=["{{data}}"], dependencies=["data_available"]) .build() ) Python (ANY_OF Join - Race Condition): from highway_dsl import WorkflowBuilder, JoinMode workflow = ( WorkflowBuilder("parallel_race") .task("start", "setup.init") # Multiple approaches racing .task("approach_fast", "compute.fast_but_approximate", result_key="result", dependencies=["start"]) .task("approach_accurate", "compute.slow_but_accurate", result_key="result", dependencies=["start"]) # Complete when first approach finishes (any state) .join( task_id="race_complete", join_tasks=["approach_fast", "approach_accurate"], join_mode=JoinMode.ANY_OF, description="Use whichever completes first" ) .task("use_result", "process.handle", args=["{{result}}"], dependencies=["race_complete"]) .build() ) YAML Output (ALL_SUCCESS example): name: data_pipeline_with_join version: 1.3.0 tasks: start: task_id: start operator_type: task function: setup.initialize result_key: config dependencies: [] branch_a: task_id: branch_a operator_type: task function: process.extract_users args: ["{{config}}"] result_key: users dependencies: [start] branch_b: task_id: branch_b operator_type: task function: process.extract_products args: ["{{config}}"] result_key: products dependencies: [start] branch_c: task_id: branch_c operator_type: task function: process.extract_orders args: ["{{config}}"] result_key: orders dependencies: [start] sync_gate: task_id: sync_gate operator_type: join join_tasks: [branch_a, branch_b, branch_c] join_mode: all_success description: Wait for all extractions to succeed dependencies: [] merge_data: task_id: merge_data operator_type: task function: process.merge_all args: ["{{users}}", "{{products}}", "{{orders}}"] result_key: merged dependencies: [sync_gate] finalize: task_id: finalize operator_type: task function: db.save_results args: ["{{merged}}"] dependencies: [merge_data] start_task: start 16. IANA Considerations This document has no IANA actions. 17. References 17.1. Normative References [RFC2119] Bradner, S., "Key words for use in RFCs to Indicate Requirement Levels", BCP 14, RFC 2119, March 1997. [RFC8259] Bray, T., "The JavaScript Object Notation (JSON) Data Interchange Format", STD 90, RFC 8259, December 2017. [YAML1.2] YAML Ain't Markup Language Version 1.2, 3rd Edition, October 2021. [ISO8601] ISO 8601:2004, Data elements and interchange formats -- Information interchange -- Representation of dates and times. [Pydantic] Pydantic Documentation, https://docs.pydantic.dev/ 17.2. Informative References [Airflow] Apache Airflow Documentation, https://airflow.apache.org/docs/ [DBOS] DBOS Documentation, https://docs.dbos.dev/ [APScheduler] APScheduler Documentation, https://apscheduler.readthedocs.io/ [Absurd] Absurd: Durable Execution for PostgreSQL (internal documentation). [HighwayDSL] Highway DSL Repository, https://github.com/rodmena-limited/highway_dsl Authors' Addresses Farshid Rodmena Rodmena Limited Email: farshid.ashouri@gmail.com Full Copyright Statement Copyright (C) Rodmena Limited (2025). All Rights Reserved. This document is subject to the MIT License. See LICENSE file in the repository for details.