# gnougo-flow-core — YAML Workflow DSL Engine (Python)
Python 3.10+ implementation of `GnOuGo.Flow.Core`, the declarative YAML workflow DSL engine.
Write YAML workflows that orchestrate LLMs, MCP servers, templates, loops, human input, and dynamic code generation — all from a single file.
---
## Package Status and Parity
The .NET library at [`src/GnOuGo.Flow.Core/`](../../../src/GnOuGo.Flow.Core/) is the **source of truth**.
This Python package mirrors its public surface as closely as Python idioms allow. See [`PORTING_TODO.md`](PORTING_TODO.md) for the detailed parity log and remaining work items.
| Area | Status |
|---|---|
| YAML DSL parser (`version:`) | Yes |
| Validation + compilation pipeline | Yes |
| Expression interpolation `${...}` + built-in functions | Yes (AST-based JS-subset interpreter) |
| Mustache `template.render` engine | Yes |
| WFScript (`functions:` block) | Yes multi-statement (`var`/`let`/`const`, `if`/`else`, `return`) |
| Runtime engine + step registry | Yes |
| Step types: `set`, `emit`, `sequence`, `parallel`, `loop.sequential`, `loop.parallel`, `switch`, `template.render`, `llm.call`, `mcp.list`, `mcp.call`, `human.input`, `workflow.call`, `workflow.plan`, `workflow.execute` | Yes |
| MCP integrations (`InMemoryMcpClientFactory`, `ConfiguredMcpClientFactory`, cache helper) | Yes |
| `LLMRequest.reasoning` field | Yes |
| Model metadata catalog (pricing, token limits, capabilities, overrides) | Yes |
| `workflow.plan` defaults `reasoning="high"` | Yes |
| Workflow source telemetry (`source_text` / `source_format`) | Yes |
| `JsonSchemaConverter` (inputs/outputs to JSON Schema) | Yes |
| `WorkflowCheckpointer` + `WorkflowEngine.resume_async` | Yes |
| CLI: `validate` / `inspect` / `run` subcommands | Yes |
---
## Table of Contents
- [Package Status and Parity](#package-status-and-parity)
- [Architecture](#architecture)
- [Get Started — One-file with mocks](#get-started--one-file-with-mocks)
- [Quick Start](#quick-start)
- [Document Structure](#document-structure)
- [Step Types Reference](#step-types-reference)
- [template.render](#templaterender--mustache-templating)
- [llm.call](#llmcall--call-a-language-model)
- [mcp.list](#mcplist--discover-mcp-server-capabilities)
- [mcp.call](#mcpcall--call-mcp-tools-or-prompts)
- [set](#set--initialize-or-modify-variables)
- [emit](#emit--send-progress-messages-to-the-ui)
- [human.input](#humaninput--pause-and-wait-for-user-input)
- [sequence](#sequence--run-steps-sequentially)
- [parallel](#parallel--run-branches-in-parallel)
- [loop.sequential](#loopsequential--iterate-sequentially)
- [loop.parallel](#loopparallel--iterate-in-parallel)
- [switch](#switch--conditional-branching)
- [workflow.call](#workflowcall--call-a-sub-workflow)
- [workflow.plan](#workflowplan--generate-a-workflow-dynamically-via-llm)
- [workflow.execute](#workflowexecute--execute-a-planned-workflow)
- [Typed Inputs](#typed-inputs)
- [Typed Outputs](#typed-outputs)
- [Expressions `${...}`](#expressions-)
- [WFScript - Custom JavaScript Functions](#wfscript--custom-javascript-functions)
- [Error Handling](#error-handling)
- [Model Metadata Catalog](#model-metadata-catalog)
- [CLI](#cli)
- [Python Runtime Notes](#python-runtime-notes)
---
## Architecture
```text
librairies/python/gnougo-flow-core/
pyproject.toml # Python package metadata and dependencies
src/gnougo_flow_core/ # Publishable Python library
models.py # DSL model (Document, Workflow, Step, etc.)
parsing.py # Parse YAML to model (PyYAML)
expressions.py # Expression interpolation `${...}`
_jsmini.py # In-tree JS-subset interpreter for expressions and WFScript
templating.py # Minimal Mustache-compatible renderer
scripting.py # WFScript helpers
compilation.py # Document validation + compilation
runtime.py # Execution engine + executor registry
runtime_contracts.py # Protocols for LLM, MCP, HITL, workflow fetching, telemetry
checkpointing.py # Workflow checkpoint contracts and in-memory implementation
integrations/ # MCP and LLM adapter helpers
runtime_steps/ # Executor re-export modules for step families
tests/ # Dedicated Python unit tests
```
The package is intentionally independent from the .NET assembly at runtime. It keeps the same DSL concepts and stable contracts so workflows can be shared across Python and .NET hosts.
---
## Get Started — One-file with mocks
This example is a complete Python script that runs fully locally: the LLM client and MCP server are mocked in memory, so no API key, network call, or external MCP process is required.
Install the package:
```bash
python -m pip install gnougo-flow-core
```
Create `one_file_flow.py`:
```python
import asyncio
import json
from gnougo_flow_core.compilation import WorkflowCompiler
from gnougo_flow_core.integrations import InMemoryMcpClientFactory, MockMcpServerConfig
from gnougo_flow_core.models import LLMResponse, McpCallResult, McpToolInfo
from gnougo_flow_core.parsing import WorkflowParser
from gnougo_flow_core.runtime import WorkflowEngine, apply_workflow_input_defaults
WORKFLOW_YAML = """
version: 1
name: one-file-mocked-flow
workflows:
main:
inputs:
topic: { type: string, required: true }
steps:
- id: discover
type: mcp.list
input:
servers: [demo]
include: ["tools"]
- id: facts
type: mcp.call
input:
server: demo
kind: tool
method: get_facts
request:
topic: "${data.inputs.topic}"
- id: summarize
type: llm.call
input:
model: mock-gpt
prompt: "Summarize these facts as one sentence: ${json(data.steps.facts.response)}"
- id: final
type: template.render
input:
engine: mustache
template: "{{summary}}"
data:
summary: "${data.steps.summarize.text}"
mode: text
outputs:
answer: "${data.steps.final.text}"
tools_seen: "${len(data.steps.discover.tools)}"
facts: "${data.steps.facts.response}"
"""
class MockLLMClient:
async def call_async(self, request):
return LLMResponse(
text=f"[Mock {request.model}] Summary generated from MCP facts.",
usage={"prompt_tokens": 12, "completion_tokens": 18, "total_tokens": 30},
)
def build_mcp_factory() -> InMemoryMcpClientFactory:
factory = InMemoryMcpClientFactory()
def get_facts(arguments):
topic = (arguments or {}).get("topic", "unknown")
return McpCallResult(
is_error=False,
content={
"topic": topic,
"facts": [
f"{topic} is handled by a mocked MCP tool.",
"No network or external service is required.",
],
},
)
factory.register_server(
"demo",
MockMcpServerConfig(
description="A mock knowledge server",
tools=[
McpToolInfo(
name="get_facts",
description="Returns deterministic facts for a topic",
input_schema={
"type": "object",
"properties": {"topic": {"type": "string"}},
"required": ["topic"],
},
)
],
tool_handlers={"get_facts": get_facts},
),
)
return factory
async def main() -> None:
document = WorkflowParser.parse(WORKFLOW_YAML)
compiled = WorkflowCompiler().compile(document)
workflow = compiled.workflows[compiled.entrypoint]
engine = WorkflowEngine()
engine.llm_client = MockLLMClient()
engine.mcp_client_factory = build_mcp_factory()
inputs = apply_workflow_input_defaults(workflow.source, {"topic": "GnOuGo.Flow"})
result = await engine.execute_async(workflow, inputs)
if not result.success:
message = result.error.message if result.error else "unknown error"
raise RuntimeError(f"Workflow failed: {message}")
print(json.dumps(result.outputs, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())
```
Run it:
```bash
python one_file_flow.py
```
Expected output shape:
```json
{
"answer": "[Mock mock-gpt] Summary generated from MCP facts.",
"tools_seen": 1,
"facts": {
"topic": "GnOuGo.Flow",
"facts": [
"GnOuGo.Flow is handled by a mocked MCP tool.",
"No network or external service is required."
]
}
}
```
When developing inside this repository, you can run against the local source tree instead of the published package:
```powershell
$env:PYTHONPATH = "C:\github\GnouGo\librairies\python\gnougo-flow-core\src"
python one_file_flow.py
```
---
## Quick Start
Install the published Python package:
```bash
python -m pip install gnougo-flow-core
```
Or add it to a local `uv` project:
```bash
uv add gnougo-flow-core
```
For repository development, install the package with its development extras from this directory:
```bash
uv sync --extra dev
```
Create `hello.yaml`:
```yaml
version: 1
name: hello-world
workflows:
main:
inputs:
name: { type: string, required: true }
steps:
- id: greet
type: template.render
input:
engine: mustache
template: "Hello {{name}}! Welcome to GnOuGo.Flow."
data: { name: "${data.inputs.name}" }
mode: text
outputs:
greeting: "${data.steps.greet.text}"
```
Validate it:
```bash
gnougo-flow validate hello.yaml
```
Inspect it:
```bash
gnougo-flow inspect hello.yaml
```
Run it from the CLI:
```bash
gnougo-flow run hello.yaml -i name=World
```
Run it from Python:
```python
import asyncio
from gnougo_flow_core.compilation import WorkflowCompiler
from gnougo_flow_core.parsing import WorkflowParser
from gnougo_flow_core.runtime import WorkflowEngine, apply_workflow_input_defaults
async def main() -> None:
yaml_text = open("hello.yaml", encoding="utf-8").read()
document = WorkflowParser.parse(yaml_text)
compiled = WorkflowCompiler().compile(document)
workflow = compiled.workflows[compiled.entrypoint]
inputs = apply_workflow_input_defaults(workflow.source, {"name": "World"})
result = await WorkflowEngine().execute_async(workflow, inputs)
if not result.success:
raise RuntimeError(result.error.message if result.error else "Workflow failed")
print(result.outputs)
asyncio.run(main())
```
Runtime integrations such as LLM clients, MCP clients, human input providers, workflow fetchers, telemetry, and checkpointing are injected through Python protocols in `gnougo_flow_core.runtime_contracts`.
---
## Document Structure
Every workflow file starts with:
```yaml
version: 1 # DSL version (required, always 1)
name: my-workflow # Document name (optional)
functions: | # Global WFScript functions (optional)
function myHelper(x) { return x * 2; }
workflows:
main: # Entrypoint workflow (by convention)
inputs: # Input parameters with types (optional)
message: { type: string, required: true }
steps: # Ordered list of steps (required)
- id: step1
type: template.render
input: { ... }
outputs: # Output expressions (optional)
result: "${data.steps.step1.text}"
```
You can define **multiple workflows** in the same document and call them via `workflow.call`.
### Step Common Fields
Every step supports:
```yaml
- id: unique_step_id # Required — unique within the workflow
type: step_type # Required — one of the step types below
if: "${expression}" # Optional — guard; step is skipped if false
input: { ... } # Step-specific input (supports ${...} at any depth)
output: alias_name # Optional — also expose output as data.
retry: # Optional — automatic retry for retryable errors
max: 3
backoff_ms: 1000
backoff_mult: 2.0
jitter_ms: 100
on_error: # Optional — error handler (see Error Handling)
cases:
- if: "${error.code == \"LLM_TIMEOUT\"}"
action: continue
set_output: "fallback value"
- action: stop
```
### Data Access
All expressions read from a shared `data` context:
| Path | Content |
|------|---------|
| `data.inputs.*` | Workflow input parameters |
| `data.steps..*` | Output of a previously executed step |
| `data.env.*` | Environment variables |
---
## Step Types Reference
### `template.render` — Mustache Templating
Renders a Mustache template with data from the workflow context.
```yaml
- id: greet
type: template.render
input:
engine: mustache
template: "Hello {{name}}, you have {{count}} items."
data:
name: "${data.inputs.name}"
count: "${len(data.inputs.items)}"
mode: text # "text" (default) or "json"
```
**Output:** `{ text: "Hello World, you have 3 items." }`
---
### `llm.call` — Call a Language Model
Sends a prompt to an LLM and returns the response. Supports structured JSON output.
#### Basic call
```yaml
- id: summarize
type: llm.call
input:
model: gpt-4o-mini # Required
prompt: "Summarize this: ${data.inputs.text}" # Required
system: "You are a concise summarizer." # Optional
provider: openai # Optional (default: auto-routed)
temperature: 0.7 # Optional override; omit by default
max_tokens: 2048 # Optional
reasoning: auto # Optional — auto|minimal|low|medium|high|max
# Default: omitted (provider decides).
# Unsupported optional fields are removed by runtime metadata.
```
`temperature`, `reasoning`, `structured_output`, and tool-calling support are checked against the runtime model metadata catalog before the configured LLM client is called. For example, a request to `o4-mini` with `temperature: 0.7` is automatically sent without `temperature`.
**Output:** `{ text: "...", usage: { prompt_tokens, completion_tokens, total_tokens }, meta: { model } }`
#### Structured output (JSON mode)
```yaml
- id: classify
type: llm.call
input:
model: gpt-4o
prompt: "Classify this ticket and return JSON: ${data.inputs.ticket}"
structured_output:
schema_inline:
type: object
properties:
category: { type: string }
priority: { type: string, enum: [low, medium, high, critical] }
confidence: { type: number }
required: [category, priority]
strict: true
```
**Output:** `{ text: "...", json: { category: "bug", priority: "high", confidence: 0.92 }, usage: {...} }`
Access: `data.steps.classify.json.category`, `data.steps.classify.json.priority`
---
### `mcp.list` — Discover MCP Server Capabilities
Lists tools, resources, and/or prompts exposed by one or more MCP servers.
Use a one-item array for a single server, or `servers: ["*"]` to discover all configured MCP servers.
```yaml
- id: discover
type: mcp.list
input:
servers: [github, docs] # Required — configured MCP server names
include: ["tools", "prompts"] # Optional — default: ["tools"]
- id: discover_all
type: mcp.list
input:
servers: ["*"]
include: ["tools"]
```
**Output:** `{ status, text, servers: [...], tools: [...], resources: [...], prompts: [...] }`
Flattened `tools`, `resources`, and `prompts` entries each include a `server` field so downstream steps can keep the server affinity when multiple MCP servers are discovered at once.
---
### `mcp.call` — Call MCP Tools or Prompts
Calls one or more capabilities on an MCP server. Three modes are available:
#### Direct tool call (preferred when tool names are known)
```yaml
- id: weather
type: mcp.call
input:
server: weather-server
kind: tool
method: get_weather
request: { location: "Paris", units: "celsius" }
timeout_ms: 30000
```
**Output:** `{ status: "ok", response: { temperature: 22, ... } }`
#### Direct prompt call
```yaml
- id: summarize_prompt
type: mcp.call
input:
server: my-server
kind: prompt
method: summarize_document
request: { text: "${data.inputs.document}" }
```
**Output:** `{ status: "ok", text: "...", messages: [...] }`
#### LLM-assisted call (auto-selects the right tool)
Combine `mcp.list` → `mcp.call` with a prompt to let an LLM choose the best tool:
```yaml
- id: discover
type: mcp.list
input:
servers: [github]
- id: smart_call
type: mcp.call
input:
server: github
model: gpt-4o-mini
temperature: 0.2
prompt: "Find and call the right tool to list my repositories"
tools: "${data.steps.discover.tools}"
prompts: "${data.steps.discover.prompts}"
structured_output:
schema_inline:
type: object
properties:
repos:
type: array
items:
type: object
properties:
name: { type: string }
url: { type: string }
required: [name, url]
required: [repos]
strict: true
```
**Output (LLM-assisted):** `{ status: "ok", selection_mode: "llm", text: "...", tool_calls: [...], results: [...], json: {...} }`
#### Output access patterns
| Mode | Access |
|------|--------|
| Single tool | `data.steps..status`, `data.steps..response` |
| Single prompt | `data.steps..status`, `data.steps..text` |
| Batch/auto | `data.steps..results` (array) |
| LLM-assisted | `data.steps..text`, `data.steps..json` |
> **Important:** The `response` object is tool-specific. Do not assume field names unless documented by the tool. Use `json(data.steps..response)` to serialize it.
---
### `set` — Initialize or Modify Variables
Sets variables in the workflow data context using expressions.
```yaml
- id: init_vars
type: set
input:
total: 0
prefix: "report_"
full_name: "${data.inputs.first_name + ' ' + data.inputs.last_name}"
items_count: "${len(data.inputs.items)}"
```
**Output:** `{ total: 0, prefix: "report_", full_name: "...", items_count: 5 }`
---
### `emit` — Send Progress Messages to the UI
Pushes real-time feedback to the user interface during long-running workflows.
```yaml
- id: notify_progress
type: emit
input:
message: "Processing item ${data.steps.loop.index} of ${data.steps.loop.count}..."
level: progress # "thinking" | "info" | "progress" | "response"
```
| Level | Visual |
|-------|--------|
| `thinking` | Subtle animated (default) |
| `info` | Blue informational |
| `progress` | Green progress indicator |
| `response` | Highlighted, monospace — appears as assistant content |
---
### `human.input` — Pause and Wait for User Input
Pauses the workflow and prompts the user for input. The workflow resumes when the user submits a response.
#### Quick choices
```yaml
- id: approve
type: human.input
input:
prompt: "The agent wants to call API X. Approve?"
context: "${json(data.steps.plan)}"
choices:
- approve
- reject
- modify
timeout_ms: 300000 # 5 minutes (default)
```
#### Structured form fields
```yaml
- id: user_config
type: human.input
input:
prompt: "Please configure the following settings:"
fields:
- name: api_key
type: string
required: true
description: Your API key
- name: region
type: select
options: [us-east, eu-west, ap-south]
default: us-east
- name: max_retries
type: string
required: false
default: "3"
```
**Output:** The user's response as a JSON object (e.g., `{ "response": "approve" }` or `{ "api_key": "...", "region": "eu-west", "max_retries": "3" }`).
> **Timeout:** If the user doesn't respond within `timeout_ms`, the step fails with error code `HUMAN_INPUT_TIMEOUT`.
---
### `sequence` — Run Steps Sequentially
Groups sub-steps that execute one after another.
```yaml
- id: pipeline
type: sequence
steps:
- id: step_a
type: llm.call
input: { model: gpt-4o-mini, prompt: "Step A" }
- id: step_b
type: llm.call
input: { model: gpt-4o-mini, prompt: "Continue from: ${data.steps.step_a.text}" }
```
---
### `parallel` — Run Branches in Parallel
Executes independent branches concurrently.
```yaml
- id: gather
type: parallel
branches:
- steps:
- id: fetch_weather
type: mcp.call
input: { server: weather, kind: tool, method: get_weather, request: { location: "Paris" } }
- steps:
- id: fetch_news
type: mcp.call
input: { server: news, kind: tool, method: get_headlines, request: { topic: "tech" } }
```
---
### `loop.sequential` — Iterate Sequentially
Loops with `while` condition or fixed `times` count.
```yaml
# Fixed count
- id: retry_loop
type: loop.sequential
input:
times: 5
steps:
- id: attempt
type: llm.call
input: { model: gpt-4o-mini, prompt: "Attempt ${data.steps.retry_loop.index}" }
# While condition
- id: poll
type: loop.sequential
input:
while: "${data.steps.check.status != 'ready'}"
max_iterations: 20
steps:
- id: check
type: mcp.call
input: { server: my-server, kind: tool, method: check_status, request: {} }
```
**Loop context:** `data.steps..index` (current iteration, 0-based), `data.steps..count` (total completed).
---
### `loop.parallel` — Iterate in Parallel
Loops over an array of items, executing iterations concurrently.
```yaml
- id: process_all
type: loop.parallel
input:
items: "${data.inputs.urls}"
max_concurrency: 5
steps:
- id: fetch
type: mcp.call
input:
server: http-client
kind: tool
method: fetch_url
request: { url: "${data.steps.process_all.item}" }
```
**Loop context:** `data.steps..item` (current item), `data.steps..index`, `data.steps..results` (collected results).
---
### `switch` — Conditional Branching
Two forms: expression-based and when-based.
#### Form A — Expression/value matching
```yaml
- id: route
type: switch
input:
expr: "${data.steps.classify.json.category}"
cases:
- value: bug
steps:
- id: handle_bug
type: llm.call
input: { model: gpt-4o-mini, prompt: "Triage this bug..." }
- value: feature
steps:
- id: handle_feature
type: llm.call
input: { model: gpt-4o-mini, prompt: "Plan this feature..." }
default:
- id: handle_other
type: emit
input: { message: "Unknown category, routing to human.", level: info }
```
#### Form B — When conditions
```yaml
- id: priority_route
type: switch
cases:
- when: "${data.inputs.priority == 'critical'}"
steps:
- id: escalate
type: human.input
input: { prompt: "Critical issue! Immediate action required." }
- when: "${data.inputs.priority == 'high'}"
steps:
- id: auto_handle
type: llm.call
input: { model: gpt-4o, prompt: "Handle high-priority: ${data.inputs.message}" }
default:
- id: queue
type: emit
input: { message: "Queued for later processing.", level: info }
```
---
### `workflow.call` — Call a Sub-Workflow
Calls another workflow defined in the same document or fetched from an external source.
#### Local call
```yaml
- id: run_analysis
type: workflow.call
input:
ref:
kind: local
workflow: analysis # Name of a workflow in the same document
inputs:
data: "${data.inputs.raw_data}"
```
#### Remote call (URL)
```yaml
- id: run_remote
type: workflow.call
input:
ref:
kind: url
url: "https://example.com/workflows/analysis.yaml"
workflow: main
inputs:
data: "${data.inputs.raw_data}"
```
---
### `workflow.plan` — Generate a Workflow Dynamically via LLM
The most powerful step type: asks an LLM to **generate a complete YAML workflow** from a natural-language instruction, then validates and compiles it before execution.
#### Basic usage
```yaml
- id: plan
type: workflow.plan
input:
generator:
model: gpt-4o
instruction: "Build a workflow that fetches weather for Paris and summarizes it."
context: "Available tools include weather and summarization APIs."
```
#### Full configuration
```yaml
- id: plan
type: workflow.plan
input:
generator:
model: gpt-4o # LLM model for planning
provider: openai # Optional — LLM provider
instruction: "Analyze the user's request and build a workflow."
context: "${json(data.inputs)}"
# Reasoning effort for the planning LLM call (and the MCP pre-filter).
# Defaults to "high" (max) because planning is heavy reasoning work.
# Set to "auto" to let the provider decide, or any of:
# "minimal" | "low" | "medium" | "high" | "max" | "auto".
# Models without thinking support ignore this field.
reasoning: high
# MCP pre-filter: uses an LLM to select only relevant MCP servers/tools
# before injecting them into the planning prompt (reduces prompt size)
prefilter: true # true (default) | false | { model, provider }
# Policy constraints — restrict what the LLM can generate
policy:
allowed_step_types: # Whitelist of step types
- llm.call
- mcp.call
- mcp.list
- template.render
- set
- emit
- sequence
denied_step_types: # Blacklist (takes precedence)
- workflow.plan # Prevent recursive planning
allow_remote_workflow_refs: false
# Limits
limits:
max_steps_total: 20 # Maximum number of steps in the generated workflow
# Validation
validate:
compile: true # Parse + compile the generated YAML (default: true)
# Self-correction on failure
on_invalid:
action: reprompt # "reprompt" (re-send error to LLM) | "fail"
max_attempts: 3 # Number of attempts before giving up
```
**Output:** `{ workflow: { dsl, name, workflows: [...] }, yaml: "...", meta: { model, attempt } }`
**Features:**
- **Automatic MCP discovery**: Connects to all configured MCP servers, lists their tools/prompts, and injects them into the planning prompt so the LLM knows what's available.
- **MCP pre-filter**: Uses a lightweight LLM call to select only the MCP servers/tools relevant to the task instruction — reduces prompt size and cost.
- **Full DSL reference injection**: The LLM receives the complete DSL documentation (step types, expressions, error handling) so it can generate valid workflows.
- **Policy enforcement**: Generated workflows are validated against allowed/denied step types and max step limits.
- **Self-correction**: If the generated YAML is invalid (parse error, policy violation, compilation error), the error is sent back to the LLM for automatic correction.
- **OpenTelemetry tracing**: Full GenAI convention traces for the planning LLM call, MCP discovery, and pre-filter phases.
---
### `workflow.execute` — Execute a Planned Workflow
Executes a workflow that was dynamically generated by `workflow.plan`.
```yaml
- id: plan
type: workflow.plan
input:
generator:
model: gpt-4o
instruction: "${data.inputs.task}"
- id: execute
type: workflow.execute
input:
from_step: plan # References the workflow.plan step that produced the YAML
```
The plan + execute pattern is the foundation of **agentic workflows**: the user describes a goal in natural language, the LLM plans the steps, and the engine executes them.
---
## Typed Inputs
Workflow inputs support rich type declarations with validation at runtime.
**Supported types:** `string`, `number`, `boolean`, `array`, `object`, `dictionary`, `any`
```yaml
workflows:
main:
inputs:
# Simple scalar
name:
type: string
required: true
description: The user's name
# With default value
mode:
type: string
required: false
default: standard
# Array with typed items
tags:
type: array
items: { type: string }
required: false
default: []
# Nested object
config:
type: object
properties:
timeout: { type: number }
retries: { type: number }
required: false
# Dictionary (string keys, typed values)
headers:
type: dictionary
additionalProperties: { type: string }
```
---
## Typed Outputs
Workflow outputs support type annotations and descriptions. This enables:
- Self-documenting workflow contracts
- Automatic JSON Schema generation (for MCP tool exposure)
- Nested type descriptors for arrays, objects, and dictionaries
### Short form (expression only)
```yaml
outputs:
result: "${data.steps.step1.text}"
```
### Long form (with type and description)
```yaml
outputs:
summary:
expr: "${data.steps.llm_summary.text}"
type: string
description: LLM-generated summary text
items_processed:
expr: "${data.steps.process.count}"
type: number
description: Number of items processed
success:
expr: "${data.steps.result.ok}"
type: boolean
description: Whether the workflow succeeded
```
### Complex types
```yaml
outputs:
# Array of strings
tags:
expr: "${data.steps.extract.tags}"
type: array
items: { type: string }
description: Extracted tags
# Typed object
report:
expr: "${data.steps.build.report}"
type: object
properties:
title: { type: string }
score: { type: number }
description: Structured report
# Dictionary
metrics:
expr: "${data.steps.collect.metrics}"
type: dictionary
additionalProperties: { type: number }
description: Named metrics map
```
### JSON Schema generation
`OutputDef` types are convertible to JSON Schema via `JsonSchemaConverter.OutputsToJsonSchema(outputs)`, used for MCP tool exposure and API documentation.
---
## Expressions `${...}`
Expressions are embedded in strings using `${...}` syntax. They are JavaScript-style expressions evaluated by the in-tree JS-subset interpreter in `gnougo_flow_core._jsmini`.
### Data access
- `data.inputs.*` — workflow input parameters
- `data.steps..*` — output of a previously executed step
- `data.env.*` — environment variables
- Optional chaining: `data.steps.maybe_skipped?.value`
### Operators
`&& || ! == != < <= > >= + - * / % ??`
### Built-in functions
| Function | Description |
|----------|-------------|
| `exists(val)` | `true` if val is non-null |
| `coalesce(a, b, ...)` | Returns first non-null argument |
| `len(val)` | Length of string or array (0 for null) |
| `length(val)` | Alias for `len(val)` |
| `lower(s)` | Lowercase string |
| `upper(s)` | Uppercase string |
| `trim(s)` | Trims whitespace |
| `contains(s, sub)` | `true` if string `s` contains `sub` |
| `startsWith(s, prefix)` | `true` if `s` starts with prefix |
| `endsWith(s, suffix)` | `true` if `s` ends with suffix |
| `replace(s, old, new)` | Replaces all occurrences |
| `substring(s, start)` | Characters from position `start` to end |
| `substring(s, start, len)` | `len` characters starting at `start` |
| `toNumber(val)` | Converts to number |
| `json(val)` | Serializes value to JSON string |
| `fromJson(s)` | Parses a JSON string into a node |
| `now()` | Returns the current local date/time as an ISO-8601 string |
| `base64(val)` | Encodes the UTF-8 string value as Base64 |
| `formatDate(dateStr, fmt)` | Formats a date string (default: `yyyy-MM-dd`) |
### JavaScript-style expression support
- Ternary: `${data.inputs.mode == "fast" ? 0.0 : 0.7}`
- Template literals: `` ${`Hello ${data.inputs.name}`} ``
- Array methods: `${data.inputs.items.filter(i => i.active).length}`
---
## WFScript — Custom JavaScript Functions
Define reusable functions in the `functions:` block (document-level or workflow-level):
```yaml
version: 1
name: smart-triage
functions: |
function classify(text) {
if (contains(lower(text), "urgent")) return "critical";
if (contains(lower(text), "bug")) return "bug";
return "general";
}
function truncate(text, maxLen) {
if (len(text) <= maxLen) return text;
return text.substring(0, maxLen) + "...";
}
workflows:
main:
inputs:
message: { type: string, required: true }
steps:
- id: route
type: switch
input:
expr: "${functions.classify(data.inputs.message)}"
cases:
- value: critical
steps:
- id: escalate
type: human.input
input:
prompt: "URGENT: ${functions.truncate(data.inputs.message, 100)}"
- value: bug
steps:
- id: triage_bug
type: llm.call
input:
model: gpt-4o-mini
prompt: "Triage this bug report: ${data.inputs.message}"
```
---
## Error Handling
### Retry
Automatically retries a step on transient (retryable) errors:
```yaml
retry:
max: 3 # Maximum attempts
backoff_ms: 1000 # Initial delay between retries
backoff_mult: 2.0 # Multiplier for exponential backoff
jitter_ms: 100 # Random jitter added to each delay
```
### on_error
Evaluated **after retries are exhausted** (or immediately for non-retryable errors):
```yaml
on_error:
cases:
- if: "${error.code == \"LLM_TIMEOUT\" || error.code == \"LLM_NETWORK\"}"
action: continue
set_output:
text: "Temporary LLM issue — using fallback"
- if: "${error.code == \"INPUT_VALIDATION\"}"
action: stop # Stop the workflow immediately
- action: stop # Default: stop on unknown errors
```
**Error context variables:** `error.code`, `error.message`, `error.retryable`, `step.id`, `step.type`
**Actions:** `continue` (skip the step, optionally set a fallback output) | `stop` (abort the workflow)
### Common error codes
| Code | Retryable | Description |
|------|-----------|-------------|
| `INPUT_VALIDATION` | No | Missing or malformed input |
| `LLM_TIMEOUT` | Yes | LLM request timed out |
| `LLM_NETWORK` | Yes | Network error reaching the LLM |
| `MCP_CONNECTION_ERROR` | Yes | Cannot connect to MCP server |
| `MCP_TOOL_ERROR` | No | MCP tool returned an error |
| `TEMPLATE_PLAN` | No | `workflow.plan` failed to generate valid YAML |
| `TEMPLATE_POLICY` | No | Generated workflow violates policy constraints |
| `HUMAN_INPUT_TIMEOUT` | No | User didn't respond within `timeout_ms` |
| `NO_HITL_PROVIDER` | No | No human input provider configured |
### Full example — resilient LLM call with fallback
```yaml
- id: summarize
type: llm.call
input:
model: gpt-4o-mini
prompt: "Summarize: ${json(data.inputs)}"
retry:
max: 3
backoff_ms: 1000
backoff_mult: 2
jitter_ms: 100
on_error:
cases:
- if: "${error.code == \"LLM_TIMEOUT\" || error.code == \"LLM_NETWORK\"}"
action: continue
set_output:
text: "Summary temporarily unavailable."
- action: stop
```
---
## Model Metadata Catalog
The Python runtime includes a model metadata catalog aligned with the .NET implementation. It centralizes:
- token limits: `context_window_tokens`, `max_input_tokens`, `max_output_tokens`
- pricing: `input_per_1m_tokens`, `output_per_1m_tokens`
- capabilities: temperature, reasoning effort, structured output, tools, JSON mode, vision, embeddings
- aliases and user-provided extensions
`WorkflowEngine.sanitize_llm_request()` removes unsupported optional request fields before calling the configured LLM client. This prevents provider crashes such as sending `temperature` to reasoning models that reject it.
Pricing uses the same metadata resolver. `try_get_pricing()` and `estimate_cost()` read builtin pricing by default and can also use `LLMOptions.model_metadata_files` / `LLMOptions.model_overrides` when passed explicitly.
```python
from gnougo_flow_core import WorkflowEngine, LLMOptions, LLMModelMetadata, ModelCapabilityMetadata
engine = WorkflowEngine()
engine.llm_options = LLMOptions(
model_metadata_files=["config/my-models.json"],
model_overrides={
"my-local-model:latest": LLMModelMetadata(
provider_type="ollama",
context_window_tokens=32768,
max_output_tokens=8192,
capabilities=ModelCapabilityMetadata(
supports_temperature=True,
supports_reasoning_effort=False,
supports_structured_output=False,
supports_tools=False,
),
)
},
)
```
External metadata files can also use .NET-style camelCase field names:
```jsonc
{
"models": {
"model-id": {
"providerType": "openai",
"contextWindowTokens": 128000,
"maxOutputTokens": 16384,
"pricing": { "inputPer1MTokens": 0.15, "outputPer1MTokens": 0.60 },
"capabilities": {
"supportsTemperature": true,
"supportsReasoningEffort": false,
"supportsStructuredOutput": true,
"supportsTools": true
}
}
},
"aliases": { "short-name": "model-id" }
}
```
Metadata precedence is:
```text
builtin catalog < model_metadata_files < model_overrides < heuristics for missing fields
```
---
## CLI
The published package exposes the `gnougo-flow` command.
```bash
# Validate a workflow (check syntax, types, compilation)
gnougo-flow validate examples/triage.yaml
# Inspect the structure (workflows, steps, inputs, outputs)
gnougo-flow inspect examples/triage.yaml
# Execute with key=value inputs
gnougo-flow run examples/triage.yaml -i message=hello -i priority=normal
# Execute with full JSON input
gnougo-flow run examples/triage.yaml -j '{"message":"hello","priority":"normal"}'
# Execute with full JSON input loaded from a file
gnougo-flow run examples/triage.yaml -j @inputs.json
```
When running directly from the repository with `uv`, prefix commands with `uv run`:
```bash
uv run gnougo-flow validate examples/triage.yaml
uv run gnougo-flow inspect examples/triage.yaml
uv run gnougo-flow run examples/triage.yaml -i message=hello
```
---
## Python Runtime Notes
The Python package is not a NativeAOT binary; it is a Python 3.10+ library and CLI. It still follows the same design goals as `GnOuGo.Flow.Core`:
- YAML parsing uses PyYAML and typed Python models.
- JSON-like workflow data stays in Python dictionaries/lists/scalars.
- Templating is implemented in-tree with a minimal Mustache-compatible renderer.
- Expression interpolation and WFScript use `gnougo_flow_core._jsmini`, an in-tree JavaScript-subset interpreter with execution limits.
- Runtime services are injected through protocols instead of concrete infrastructure dependencies.
- MCP helpers live in `gnougo_flow_core.integrations`:
- `InMemoryMcpClientFactory` and `MockMcpServerConfig` for tests and demos.
- `ConfiguredMcpClientFactory` and `McpSessionAdapter` for injected MCP sessions.
- `RoutingLLMClientAdapter` for adapting a routing LLM client.
- `WorkflowEngine.mcp_cache` defaults to `McpCacheHelper`, a 5-minute sliding TTL cache for MCP tools/resources/prompts per server. Set it to `None` to disable capability caching.
- `WorkflowEngine.resume_async`, `WorkflowCheckpointer`, and `limits.run_id` support resumable workflow execution.
Development commands:
```bash
uv sync --extra dev
uv run --extra dev pytest
uv run --extra dev ruff check .
python -m pip install --upgrade build
python -m build
```
The release pipeline injects the generated repository version into `pyproject.toml` before building and publishing the package to PyPI.