# Task Graph DAG Engine
## Overview
The Workglow Task Graph engine is the foundational execution layer of the framework. It models computation as a **directed acyclic graph (DAG)** where nodes are tasks and edges are dataflows that carry typed data between output ports and input ports. The engine is responsible for resolving execution order via topological sorting, propagating data through the graph along dataflow edges, and managing the lifecycle of every task from `PENDING` through `COMPLETED` or `FAILED`.
The engine lives in the `@workglow/task-graph` package and provides two primary abstractions:
- **TaskGraph** -- a low-level DAG container with explicit `addTask`, `addDataflow`, and `run` methods.
- **Workflow** -- a high-level builder that chains tasks with `pipe()`, `parallel()`, and control-flow helpers, then compiles down to a TaskGraph for execution.
Both abstractions share the same runtime: `TaskGraphRunner` executes the graph, `TaskRunner` executes individual tasks, and `Dataflow` objects shuttle data between them.
---
## Core Concepts
### Task
A **Task** is the atomic unit of computation. Every task is an instance of a class that extends the base `Task` class. Tasks declare their shape through static properties and JSON Schema definitions, and provide execution logic via the `execute()` method.
Key characteristics of a task:
- **Statically typed ports**: Input and output ports are defined by `inputSchema()` and `outputSchema()` static methods that return JSON Schema (`DataPortSchema`) objects.
- **Lifecycle-managed**: Each task transitions through well-defined statuses (`PENDING`, `PROCESSING`, `STREAMING`, `COMPLETED`, `FAILED`, `ABORTING`, `DISABLED`).
- **Independently runnable**: A task can be executed standalone via `task.run()` or as part of a graph.
- **Event-driven**: Tasks emit events (`start`, `complete`, `error`, `progress`, `status`, `stream_start`, `stream_chunk`, `stream_end`) that allow external code to observe execution.
### TaskGraph
A **TaskGraph** wraps a `DirectedAcyclicGraph` data structure specialized for tasks and dataflows. It enforces the acyclic invariant at the structural level -- you cannot add an edge that would create a cycle.
The TaskGraph provides:
- Node management (`addTask`, `removeTask`, `getTask`, `getTasks`)
- Edge management (`addDataflow`, `removeDataflow`, `getDataflow`, `getDataflows`)
- Topological ordering (`topologicallySortedNodes`)
- Execution (`run`, `runPreview`)
- Serialization (`toJSON`, `toDependencyJSON`)
- Event subscription (`subscribe`, `subscribeToTaskStatus`, `subscribeToTaskProgress`, `subscribeToDataflowStatus`, `subscribeToTaskStreaming`)
### Dataflow
A **Dataflow** is a directed edge that connects one task's output port to another task's input port. It is identified by four components:
```
sourceTaskId[sourceTaskPortId] ==> targetTaskId[targetTaskPortId]
```
For example, a dataflow from task A's `result` port to task B's `value` port:
```typescript
new Dataflow("taskA", "result", "taskB", "value");
```
Dataflows carry a `value` property that is populated during execution and can also carry streaming data via a `ReadableStream`.
### Topological Execution
When a TaskGraph runs, the `TaskGraphRunner` retrieves all tasks in topological order -- an ordering that guarantees every task executes only after all of its upstream dependencies have completed. For each task in order, the runner:
1. Copies output data from incoming dataflows into the task's `runInputData`.
2. Executes the task.
3. Pushes the task's output data onto all outgoing dataflows.
This ensures deterministic, dependency-respecting execution without the caller needing to manually manage ordering.
---
## Task Definition
Every task class must declare several static properties and two static schema methods. Here is the minimal structure:
```typescript
import { Task } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";
interface MyInput {
text: string;
}
interface MyOutput {
wordCount: number;
}
class WordCountTask extends Task {
static readonly type = "WordCountTask";
static readonly category = "Text";
static readonly title = "Word Count";
static readonly description = "Counts words in a string";
static readonly cachePolicy = { kind: "deterministic" } as const;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: {
text: { type: "string", title: "Input Text" },
},
required: ["text"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: {
wordCount: { type: "integer", title: "Word Count" },
},
} as const satisfies DataPortSchema;
}
async execute(input: MyInput): Promise {
const words = input.text.trim().split(/\s+/);
return { wordCount: words.length };
}
}
```
### Required Static Properties
| Property | Type | Description |
| ------------- | -------- | ----------------------------------------------------- |
| `type` | `string` | Unique identifier for this task class in the registry |
| `category` | `string` | Grouping label for UI organization |
| `title` | `string` | Human-readable name |
| `description` | `string` | Brief description of the task's purpose |
| `cachePolicy` | `object` | Whether and where results can be cached across runs |
### Optional Static Properties
| Property | Type | Default | Description |
| ---------------------------- | --------- | ------- | --------------------------------------------- |
| `hasDynamicSchemas` | `boolean` | `false` | Set `true` if schemas change at runtime |
| `passthroughInputsToOutputs` | `boolean` | `false` | Mirror dynamic input ports to output |
| `customizable` | `boolean` | `false` | Allow saving as a preset in the builder |
| `isGraphOutput` | `boolean` | `false` | Mark as the definitive output node of a graph |
| `hasDynamicEntitlements` | `boolean` | `false` | Entitlements depend on runtime state |
### The execute() Method
The `execute()` method receives the validated input and an `IExecuteContext` object:
```typescript
async execute(input: Input, context: IExecuteContext): Promise