> Message-driven workflows with observable flows for emergent AI. clearflow · PyPI Skip to main content

Compose type-safe flows for emergent AI

Project description

ClearFlow

Coverage Status PyPI PyPI Downloads Python License llms.txt

Compose type-safe flows for emergent AI. 100% test coverage, zero dependencies.

Version Policy

Alpha Phase (0.x.y): ClearFlow is in active development. Breaking changes may occur in any release. Pin exact versions in production:

pip install clearflow==0.1.0  # Pin to exact version

After 1.0.0, we'll follow strict Semantic Versioning.

Why ClearFlow?

  • 100% test coverage – Every path proven to work
  • Type-safe transformations – Errors caught at development time, not runtime
  • Immutable state – No hidden mutations
  • Zero dependencies – No hidden failure modes
  • Single exit enforcement – No ambiguous endings
  • AI-Ready Documentation – llms.txt for optimal coding assistant integration

Quick Start

pip install clearflow

Upgrading from v0.x? See the Migration Guide for breaking changes.

AI Assistant Integration

ClearFlow provides comprehensive documentation in llms.txt format for optimal AI assistant support.

Claude Code Setup

Add ClearFlow documentation to Claude Code with one command:

claude mcp add-json clearflow-docs '{
    "type":"stdio",
    "command":"uvx",
    "args":["--from", "mcpdoc", "mcpdoc", "--urls", "ClearFlow:https://raw.githubusercontent.com/artificial-sapience/clearflow/main/llms.txt"]
}'

For IDEs (Cursor, Windsurf), see the mcpdoc documentation.

Direct URL Access

Use these URLs directly in any AI tool that supports llms.txt:

Examples

Name Description
Chat Simple conversational flow with OpenAI
Portfolio Analysis Multi-specialist workflow for financial analysis
RAG Full retrieval-augmented generation with vector search

Core Concepts

Node[TIn, TOut]

A unit that transforms state from TIn to TOut (or Node[T] when types are the same).

  • prep(state: TIn) -> TIn – optional pre-work/validation
  • exec(state: TIn) -> NodeResult[TOut]required; return new state + outcome
  • post(result: NodeResult[TOut]) -> NodeResult[TOut] – optional cleanup/logging

Nodes are frozen dataclasses that execute async transformations without mutating input state.

NodeResult[T]

Holds the new state and an outcome string used for routing.

flow()

A function that creates a flow with explicit routing:

flow("Name", start_node)
  .route(start_node, "outcome1", next_node)
  .route(next_node, "outcome2", final_node)
  .end(final_node, "done")  # exactly one termination

Routing: Routes are (node, outcome) pairs. Each outcome must have exactly one route.
Type inference: The flow infers types from start to end, supporting transformations.
Composability: A flow is itself a Node – compose flows within flows.

ClearFlow vs PocketFlow

Aspect ClearFlow PocketFlow
State Immutable, passed via NodeResult Mutable, passed via shared param
Routing Outcome-based explicit routes Action-based graph edges
Termination Exactly one exit enforced Multiple exits allowed
Type safety Full Python 3.13+ generics Dynamic (no annotations)

ClearFlow emphasizes robust, type-safe orchestration with validation and guardrails. PocketFlow emphasizes brevity and flexibility with minimal overhead.

Development

Install uv

curl -LsSf https://astral.sh/uv/install.sh | sh

Clone and set up development environment

git clone https://github.com/artificial-sapience/clearflow.git
cd ClearFlow
uv sync --all-extras     # Creates venv and installs deps automatically
./quality-check.sh       # Run all checks

License

MIT

Acknowledgments

Inspired by PocketFlow's Node-Flow-State pattern.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

clearflow-0.1.0.tar.gz (55.2 kB view details)

Uploaded Source

Built Distribution

clearflow-0.1.0-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file clearflow-0.1.0.tar.gz.

File metadata

  • Download URL: clearflow-0.1.0.tar.gz
  • Upload date:
  • Size: 55.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for clearflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2a45d2f1e334f2765b092f18f5788a80e44a119beb5b25a6bd4d4676c445d5fe
MD5 148fee2cca5e0254e1b527b7d263fdfe
BLAKE2b-256 4001cba7dcfa5092d850bfaa5806e2cddbffb259631e3ef45fd55e949f82cc2d

See more details on using hashes here.

Provenance

The following attestation bundles were made for clearflow-0.1.0.tar.gz:

Publisher: release.yml on artificial-sapience/clearflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file clearflow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: clearflow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for clearflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c489c5a4b5032ce8e4896f72685437c640f1d159f57744c0be4c82171a2a9884
MD5 6a660769e9cb1ce944d48869839ba859
BLAKE2b-256 0fce49be5bdb6e2e119c6f964e7584d224b10b7e5d761de167357f5326ec7ee1

See more details on using hashes here.

Provenance

The following attestation bundles were made for clearflow-0.1.0-py3-none-any.whl:

Publisher: release.yml on artificial-sapience/clearflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.
# ClearFlow [![Coverage Status](https://coveralls.io/repos/github/artificial-sapience/clearflow/badge.svg?branch=main)](https://coveralls.io/github/artificial-sapience/clearflow?branch=main) [![PyPI](https://badge.fury.io/py/clearflow.svg)](https://pypi.org/project/clearflow/) [![PyPI Downloads](https://static.pepy.tech/personalized-badge/clearflow?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=GREEN&left_text=downloads)](https://pepy.tech/projects/clearflow) ![Python](https://img.shields.io/badge/Python-3.13%2B-blue) ![License](https://img.shields.io/badge/License-MIT-yellow) [![llms.txt](https://img.shields.io/badge/llms.txt-green)](https://raw.githubusercontent.com/artificial-sapience/clearflow/main/llms.txt) Compose type-safe flows for emergent AI. 100% test coverage, zero dependencies. ## Version Policy **Alpha Phase (0.x.y)**: ClearFlow is in active development. Breaking changes may occur in any release. Pin exact versions in production: ```bash pip install clearflow==0.1.0 # Pin to exact version ``` After 1.0.0, we'll follow strict [Semantic Versioning](https://semver.org/). ## Why ClearFlow? - **100% test coverage** – Every path proven to work - **Type-safe transformations** – Errors caught at development time, not runtime - **Immutable state** – No hidden mutations - **Zero dependencies** – No hidden failure modes - **Single exit enforcement** – No ambiguous endings - **AI-Ready Documentation** – llms.txt for optimal coding assistant integration ## Quick Start ```bash pip install clearflow ``` > **Upgrading from v0.x?** See the [Migration Guide](MIGRATION.md) for breaking changes. ## AI Assistant Integration ClearFlow provides comprehensive documentation in [llms.txt](https://llmstxt.org/) format for optimal AI assistant support. ### Claude Code Setup Add ClearFlow documentation to Claude Code with one command: ```bash claude mcp add-json clearflow-docs '{ "type":"stdio", "command":"uvx", "args":["--from", "mcpdoc", "mcpdoc", "--urls", "ClearFlow:https://raw.githubusercontent.com/artificial-sapience/clearflow/main/llms.txt"] }' ``` For IDEs (Cursor, Windsurf), see the [mcpdoc documentation](https://github.com/langchain-ai/mcpdoc#configuration). ### Direct URL Access Use these URLs directly in any AI tool that supports llms.txt: - **Minimal index** (~2KB): - **Full documentation** (~63KB): ## Examples | Name | Description | |------|-------------| | [Chat](examples/chat/) | Simple conversational flow with OpenAI | | [Portfolio Analysis](examples/portfolio_analysis/) | Multi-specialist workflow for financial analysis | | [RAG](examples/rag/) | Full retrieval-augmented generation with vector search | ## Core Concepts ### `Node[TIn, TOut]` A unit that transforms state from `TIn` to `TOut` (or `Node[T]` when types are the same). - `prep(state: TIn) -> TIn` – optional pre-work/validation - `exec(state: TIn) -> NodeResult[TOut]` – **required**; return new state + outcome - `post(result: NodeResult[TOut]) -> NodeResult[TOut]` – optional cleanup/logging Nodes are frozen dataclasses that execute async transformations without mutating input state. ### `NodeResult[T]` Holds the **new state** and an **outcome** string used for routing. ### `flow()` A function that creates a flow with **explicit routing**: ```python flow("Name", start_node) .route(start_node, "outcome1", next_node) .route(next_node, "outcome2", final_node) .end(final_node, "done") # exactly one termination ``` **Routing**: Routes are `(node, outcome)` pairs. Each outcome must have exactly one route. **Type inference**: The flow infers types from start to end, supporting transformations. **Composability**: A flow is itself a `Node` – compose flows within flows. ## ClearFlow vs PocketFlow | Aspect | ClearFlow | PocketFlow | |--------|-----------|------------| | **State** | Immutable, passed via `NodeResult` | Mutable, passed via `shared` param | | **Routing** | Outcome-based explicit routes | Action-based graph edges | | **Termination** | Exactly one exit enforced | Multiple exits allowed | | **Type safety** | Full Python 3.13+ generics | Dynamic (no annotations) | ClearFlow emphasizes **robust, type-safe orchestration** with validation and guardrails. PocketFlow emphasizes **brevity and flexibility** with minimal overhead. ## Development ### Install uv - Please see [official uv docs](https://docs.astral.sh/uv/getting-started/installation/#installation-methods) for other ways to install uv. ```bash curl -LsSf https://astral.sh/uv/install.sh | sh ``` ### Clone and set up development environment ```bash git clone https://github.com/artificial-sapience/clearflow.git cd ClearFlow uv sync --all-extras # Creates venv and installs deps automatically ./quality-check.sh # Run all checks ``` ## License [MIT](LICENSE) ## Acknowledgments Inspired by [PocketFlow](https://github.com/The-Pocket/PocketFlow)'s Node-Flow-State pattern."""ClearFlow: Compose type-safe flows for emergent AI.""" from abc import ABC, abstractmethod from collections.abc import Mapping from dataclasses import dataclass from types import MappingProxyType from typing import Protocol, cast, final, override __all__ = [ "Node", "NodeResult", "flow", ] RouteKey = tuple[str, str] # (node_name, outcome) class NodeBase(Protocol): """Non-generic base protocol for all nodes. Provides the common interface without type parameters, allowing heterogeneous collections of nodes. """ name: str async def __call__( self, state: object, # clearflow: ignore[ARCH009] # Type erasure needed for heterogeneous collections ) -> "NodeResult[object]": # clearflow: ignore[ARCH009] # Type erasure needed for heterogeneous collections """Execute the node with any state type.""" ... @final @dataclass(frozen=True) class NodeResult[T]: """Result of node execution. Attributes: state: The transformed state from node execution. outcome: The routing outcome determining next node. """ state: T outcome: str @dataclass(frozen=True, kw_only=True) class Node[TIn, TOut = TIn](ABC, NodeBase): """Abstract base for workflow nodes. Subclass and implement async exec() to process state and return outcomes for routing. Supports optional prep() and post() hooks for setup and cleanup. Type parameters: TIn: Input state type TOut: Output state type (defaults to TIn for non-transforming nodes) """ name: str def __post_init__(self) -> None: """Validate node configuration after initialization. Raises: ValueError: If node name is empty or contains only whitespace. """ if not self.name or not self.name.strip(): msg = f"Node name must be a non-empty string, got: {self.name!r}" raise ValueError(msg) @override async def __call__(self, state: TIn) -> NodeResult[TOut]: # pyright: ignore[reportIncompatibleMethodOverride] # NodeBase uses object for type erasure but Node preserves type safety """Execute node lifecycle. Returns: NodeResult containing transformed state and routing outcome. """ state = await self.prep(state) result = await self.exec(state) return await self.post(result) async def prep(self, state: TIn) -> TIn: # noqa: PLR6301 # Template method hook for subclasses """Pre-execution hook. Returns: State passed through unchanged by default. """ return state @abstractmethod async def exec(self, state: TIn) -> NodeResult[TOut]: """Execute main node logic - must be implemented by subclasses.""" ... async def post(self, result: NodeResult[TOut]) -> NodeResult[TOut]: # noqa: PLR6301 # Template method hook for subclasses """Post-execution hook. Returns: Result passed through unchanged by default. """ return result @final @dataclass(frozen=True, kw_only=True) class _Flow[TStartIn, TEndOut = TStartIn](Node[TStartIn, TEndOut]): """Internal flow implementation that transforms TStartIn to TEndOut. Implementation note: We use 'object' for node types internally because Python's type system cannot track types through runtime-determined paths. Type safety is maintained at the public API boundaries - exec() guarantees TStartIn→TEndOut transformation through construction-time validation. """ start: NodeBase routes: Mapping[RouteKey, NodeBase | None] @override async def exec(self, state: TStartIn) -> NodeResult[TEndOut]: """Execute the flow by routing through nodes based on outcomes. Returns: Final node result containing transformed state and None outcome. Raises: ValueError: If no route is defined for an outcome from a node. """ current_node = self.start current_state: object = state # clearflow: ignore[ARCH009] # Type erasure needed for dynamic routing while True: # Execute node result = await current_node(current_state) key = (current_node.name, result.outcome) # Raise error if no route defined - all outcomes must be explicitly handled if key not in self.routes: msg = f"No route defined for outcome '{result.outcome}' from node '{current_node.name}'" raise ValueError(msg) # Check next node next_node = self.routes[key] if next_node is None: return cast("NodeResult[TEndOut]", result) # Continue current_node = next_node current_state = result.state @final @dataclass(frozen=True, kw_only=True) class _FlowBuilder[TStartIn, TStartOut]: """Flow builder for composing node routes. Type parameters: TStartIn: The input type the flow accepts (from start node) TStartOut: The output type of the start node Call end() to specify where the flow ends and get the completed flow. """ _name: str _start: Node[TStartIn, TStartOut] _routes: MappingProxyType[RouteKey, NodeBase | None] _reachable: frozenset[str] # Node names that are reachable from start def _validate_and_create_route( self, from_node: NodeBase, outcome: str, *, is_termination: bool = False ) -> RouteKey: """Validate that a route can be added from the given node. Args: from_node: The node to route from outcome: The outcome that triggers this route is_termination: Whether this is a termination route Returns: The route key for this route Raises: ValueError: If from_node is not reachable or route already exists """ # Check reachability if from_node.name not in self._reachable: action = "end at" if is_termination else "route from" msg = f"Cannot {action} '{from_node.name}' - not reachable from start" raise ValueError(msg) # Check for duplicate routes route_key: RouteKey = (from_node.name, outcome) if route_key in self._routes: msg = f"Route already defined for outcome '{outcome}' from node '{from_node.name}'" raise ValueError(msg) return route_key def route( self, from_node: NodeBase, outcome: str, to_node: NodeBase, ) -> "_FlowBuilder[TStartIn, TStartOut]": """Connect nodes: from_node --outcome--> to_node. Args: from_node: Source node outcome: Outcome that triggers this route to_node: Destination node (use end() for flow completion) Returns: Builder for continued route definition and flow completion Raises: ValueError: If from_node is not reachable or route already exists """ route_key = self._validate_and_create_route(from_node, outcome) # Add route and mark to_node as reachable new_routes = {**self._routes, route_key: to_node} new_reachable = self._reachable | {to_node.name} return _FlowBuilder[TStartIn, TStartOut]( _name=self._name, _start=self._start, _routes=MappingProxyType(new_routes), _reachable=new_reachable, ) def end[TEndIn, TEndOut]( self, end: Node[TEndIn, TEndOut], outcome: str, ) -> Node[TStartIn, TEndOut]: """End the flow at the specified node and outcome. This completes the flow definition by specifying where it ends. The flow's output type is determined by the final node's output type. Args: end: The node where the flow ends outcome: The outcome from this node that completes the flow Returns: A flow node that transforms TStartIn to TEndOut Raises: ValueError: If end node is not reachable or route already exists """ route_key = self._validate_and_create_route(end, outcome, is_termination=True) new_routes = {**self._routes, route_key: None} return _Flow[TStartIn, TEndOut]( name=self._name, start=self._start, routes=MappingProxyType(new_routes), ) def flow[TStartIn, TStartOut]( name: str, start: Node[TStartIn, TStartOut], ) -> _FlowBuilder[TStartIn, TStartOut]: """Create a flow with the given name and starting node. Args: name: The name of the flow start: The starting node that accepts TStartIn and outputs TStartOut Returns: Builder for route definition and flow completion """ return _FlowBuilder[TStartIn, TStartOut]( _name=name, _start=start, _routes=MappingProxyType({}), _reachable=frozenset({start.name}), # Start node is always reachable )# CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Core Philosophy **Tagline**: "Compose type-safe flows for emergent AI" ClearFlow provides mission-critical AI orchestration with verifiable correctness. Built for Python engineers who demand: - **Deep immutability** - All state transformations create new immutable data structures - **Immutable transformations** - Nodes transform state without mutation (though they may perform I/O) - **Type safety** - Full static typing with pyright strict mode - **100% test coverage** - Every path tested, no exceptions - **Explicit routing** - Given an outcome, the next step is always the same - **Zero dependencies** - Stdlib only for maximum reliability Target audience: Python engineers building mission-critical AI systems who require verifiable orchestration with explicit control flow. ## Development Commands ```bash # Install dependencies uv sync # Install runtime dependencies uv sync --all-extras # Install with dev dependencies # Run quality checks (enforced before commits) ./quality-check.sh # Runs all checks: custom linters, lint, format, type check, tests # Custom linters (mission-critical compliance) python linters/check-architecture-compliance.py # Architecture violations python linters/check-immutability.py # Deep immutability enforcement python linters/check-test-suite-compliance.py # Test isolation and resource management # Individual quality commands uv run ruff check --fix clearflow tests # Auto-fix linting (no unsafe fixes) uv run ruff format clearflow tests # Format code uv run pyright clearflow tests # Type check (pyright - only type checker) uv run pytest -x -v tests # Run all tests uv run pytest -x -v tests -k "specific_test" # Run specific test # Coverage requirements uv run pytest --cov=clearflow --cov-report=term-missing --cov-fail-under=100 ``` ## Architecture Overview ClearFlow is a minimal orchestration framework with functional patterns and **zero third-party dependencies**. It implements a **Node-Flow-State** pattern for managing workflows that include language model calls and other async operations. ### Core Concepts 1. **Nodes**: Async functions that transform state - Inherit from `Node[T]` or `Node[TIn, TOut]` and override `exec()` method - Nodes are frozen dataclasses with a `name` field - Input: state of type `T` (any type: dict, TypedDict, dataclass, primitives) - Output: `NodeResult[T](state, outcome)` - Designed for explicit transformations - Lifecycle hooks: `prep()`, `exec()`, `post()` 2. **State**: Unconstrained - use any type T - Type-safe with `T` where T is any Python type - Natural Python operations: `{**state, "key": "value"}` - Encourages immutable patterns - Works with dict, TypedDict, dataclass, primitives 3. **Flow**: Type-safe workflow builder - Create with `flow("name", start_node)` function - Chain with `.route(from_node, outcome, to_node)` - End with `.end(final_node, outcome)` for single termination - Single termination rule: exactly one route to `None` - Full generic support with type inference ### Key Facts - **Code size**: ~250 lines total, ~185 non-comment lines - **Test coverage**: 100% required - **Type safety**: No unnecessary `Any` (required for metaclass patterns) - **Immutability**: All dataclasses frozen - **Routing**: Explicit (NOT deterministic execution) - **Single termination**: Exactly one route to `None` per flow ### Common Patterns ```python from dataclasses import dataclass from typing import override from clearflow import Node, NodeResult, flow # Creating nodes - Node is a frozen dataclass @dataclass(frozen=True) class DocumentLoader(Node[DocumentState]): name: str = "loader" @override async def exec(self, state: DocumentState) -> NodeResult[DocumentState]: content = await load_document(state["path"]) new_state: DocumentState = {**state, "content": content} return NodeResult(new_state, outcome="loaded") # Building a flow with single termination loader = DocumentLoader() processor = ProcessorNode() complete = CompleteNode() flow_instance = ( flow("Pipeline", loader) .route(loader, "loaded", processor) .route(loader, "error", complete) .route(processor, "processed", complete) .end(complete, "done") # Single termination ) ``` ### Testing Requirements - **100% coverage**: No exceptions, ever - **Deep immutability**: Use frozen dataclasses or tuples for all test state - **Real AI scenarios**: Model actual AI orchestration patterns (RAG, agents, tool use) - **Functional purity**: Test that transformations are pure with no side effects - **Precise types**: Every test knows exact TIn and TOut types - **Educational tests**: Tests should demonstrate best practices for mission-critical AI ### Code Quality Standards **CRITICAL**: These standards maintain trust: - All linting rules must pass without suppression - Pyright must pass in strict mode (sole type checker) - Minimal `# pyright: ignore` comments (only for documented limitations) - No `Any` types except where required (e.g., metaclass patterns) - Prefer boring, obvious code over clever solutions **LINTER SUPPRESSION POLICY**: - **NEVER add linter suppressions without explicit user approval** - This includes: `# noqa`, `# pyright: ignore`, etc. - All approved suppressions MUST include a justification comment - Example: `# noqa: C901 # Display function complexity acceptable for UI` - Always fix the root cause instead of suppressing when possible #### Custom Linters ClearFlow uses three custom linters to enforce mission-critical standards: 1. **Architecture Compliance** (`linters/check-architecture-compliance.py`) - No patching/mocking of internal components in tests - No imports from private modules (`_internal`) - No use of `TYPE_CHECKING` (indicates circular dependencies) - No `object` or `Any` types in parameters 2. **Immutability Compliance** (`linters/check-immutability.py`) - All dataclasses must have `frozen=True` - No `list` in type annotations (use `tuple[T, ...]`) - No mutable default arguments - No list building with `.append()` in production code 3. **Test Suite Compliance** (`linters/check-test-suite-compliance.py`) - No `asyncio.run()` in tests (use `@pytest.mark.asyncio`) - No manual event loop creation without cleanup - All async tests must have `@pytest.mark.asyncio` - All resources must use context managers These linters run automatically as part of `./quality-check.sh` and enforce strict policies for violations. ### Contributing Guidelines 1. **PR Standards** - Must maintain 100% test coverage - Must pass all type checks - Must use frozen dataclasses - Must handle all outcomes explicitly 2. **Documentation** - Focus on guarantees and limitations - No marketing language - Examples must work exactly as shown - Be explicit about what we don't do 3. **Feature Requests** - Reject anything that compromises type safety - Reject anything that reduces testability - Reject anything that adds implicit behavior - "No" is a complete answer ### Documentation Style - **Factual and concise** - No verbosity, no "we/our" language - **Ego-free** - No defensiveness, no marketing speak - **Direct responses** - State facts and limitations without explanation ### Red Flags to Avoid 1. **Never claim**: - "Deterministic orchestration" or "deterministic execution" (we only provide explicit routing) - "Exhaustive outcome handling" (we don't enforce this) - "Compile-time safety" (Python doesn't have this) - "Makes language model agents reliable" (we only provide orchestration structure) - "Production-ready agents" (we provide orchestration, not complete agents) - "You can't test LLM outputs" (you can test them, just not deterministically) 2. **Never add**: - Claims about execution order or timing guarantees - Methods that hide what happens between node calls - Optional parameters that change flow behavior - Dependencies that could introduce unpredictability 3. **Language to avoid**: - "Deterministic" when describing the framework - "Unreasonable AI" or other hyperbolic characterizations of LLMs - Absolute statements about what users can't do - Marketing language that can't be verified ### Before Any Change Ask: - Can this be tested completely? - Does this make behavior more explicit? - Is this simpler than the alternative? If any answer is "no", don't do it. ### Git Workflow 1. **Branch Protection**: Main branch requires PR with passing checks 2. **Conventional Commits**: Use `fix:`, `feat:`, `docs:`, `ci:` prefixes 3. **Local Protection**: Pre-commit hook prevents direct commits to main 4. **PR Process**: ```bash git checkout -b type/description # Make changes ./quality-check.sh git commit -m "type: clear description" git push -u origin type/description gh pr create --title "type: description" --body "concise explanation" ``` ## Remember ClearFlow provides explicit routing with single termination enforcement. Keep the code minimal, the documentation concise, and the claims verifiable. ## Documentation Size Limits - README.md: ~100 lines (proportional to 250-line codebase) - Individual docs: <100 lines - Total documentation: <500 lines ## Release Process Automated via GitVersion and Release Drafter. Manual trigger of release.yml publishes to PyPI. **PyPI Package**: ## Critical Technical Distinctions **Explicit routing ≠ Deterministic execution** - ClearFlow provides explicit routing (given outcome X, next step is always Y) but NOT deterministic execution. ## Technical Notes - **Pyright only** - Removed mypy, pyright supports PEP 695 defaults - **Type stubs** - Stub only what you use (6 DSPy APIs, not 127 files) - **Metaclass patterns** - Field descriptors must return `Any` (standard practice) ## llms.txt Implementation ClearFlow includes comprehensive llms.txt support for optimal AI assistant integration: 1. **Files**: - `llms.txt` - Minimal index with documentation links (~2KB) - `llms-full.txt` - Auto-generated expanded content (~63KB) - Generated with: `uv run llms_txt2ctx --optional true llms.txt > llms-full.txt` 2. **Key Decisions**: - Single `clearflow/__init__.py` contains entire implementation (not split files) - Example links point to README.md files (context before code) - Include CLAUDE.md in llms.txt for AI context - Use GitHub raw URLs (we have no separate website) 3. **Integration**: - See README.md for Claude Code setup instructions - Direct URLs work with any llms.txt-compatible tool 4. **Maintenance**: - Manual generation: `uv run python scripts/generate_llms_txt_files.py` - Review changes before committing to maintain quality - Validate URLs: `cat llms.txt | grep -oE 'https://[^)]+' | xargs -I {} curl -I {}` ## Complexity Management **Radical Simplification Strategy** for Grade A compliance: - Replace complex content analysis with static descriptions - Remove file system dependencies when possible - Use simple dictionary lookups instead of conditional chains - Question if dynamic behavior is truly necessary **Common Over-engineering Patterns to Avoid**: - Complex text processing for marginal metadata gains - Multiple decision branches for utility scripts - Dynamic file content analysis when static works - Perfect descriptions when "good enough" suffices **Example**: llms.txt generation - static descriptions work as well as complex extraction ## Security and Suppressions **Subprocess Security Suppressions** (legitimate cases): ```python import subprocess # noqa: S404 # Required for running uv/mcpdoc commands in dev setup ["uv", "run", "cmd"], # noqa: S607 # Safe: hardcoded command with literal args ``` **Pattern**: Development/configuration scripts with hardcoded commands are safe to suppress ## Messaging Principles - **Avoid vague claims** - "Full transparency" misleads about features we don't have - **Use active voice** - "Compose flows" not "Composing flows" - **Acknowledge AI nature** - "emergent AI" not "unpredictable AI" (less adversarial) - **Be specific** - "Type-safe", "Zero dependencies" are verifiable features# Portfolio Analysis Example Multi-specialist workflow for portfolio allocation decisions using simulated market data. ## Flow ```mermaid graph LR Start([Market Data]) --> Q[QuantAnalyst] Q -->|analysis_complete| R[RiskAnalyst] Q -->|analysis_failed| E[ErrorHandler] R -->|risk_acceptable| P[PortfolioManager] R -->|risk_limits_exceeded| E P -->|recommendations_ready| C[ComplianceOfficer] P -->|analysis_failed| E C -->|compliance_approved| D[DecisionNode] C -->|compliance_failed| E E -->|error_handled| D D -->|decision_ready| End([Final Decision]) ``` ## Quick Start ```bash # From project root directory # 1. Set up your OpenAI API key cp .env.example .env # Edit .env and add your API key # 2. Install dependencies uv sync --all-extras # 3. Run the example cd examples/portfolio_analysis python main.py # If venv is activated # Or: uv run python main.py ``` ## How It Works This example demonstrates a sequential workflow where each specialist node analyzes market data and passes enriched state to the next stage: 1. **QuantAnalyst** - Technical analysis and opportunity identification 2. **RiskAnalyst** - Risk assessment and limit checking 3. **PortfolioManager** - Allocation recommendations 4. **ComplianceOfficer** - Regulatory validation 5. **DecisionNode** - Final synthesis and execution plan 6. **ErrorHandler** - Converts errors to conservative decisions Each node uses DSPy for structured LLM outputs with Pydantic validation. ## Key Features - **Sequential processing** - Each specialist processes in order - **Type-safe transformations** - `MarketData → QuantInsights → RiskAssessment → Decision` - **Error recovery** - Failures route to ErrorHandler then continue - **Structured outputs** - DSPy signatures ensure consistent responses - **Audit trail** - Complete reasoning chain for compliance ## Files - `main.py` - Entry point with scenario selection - `portfolio_flow.py` - Flow definition and routing - `market_data.py` - Simulated market data generation - `specialists/` - Individual specialist node implementations - `shared/` - Common models and configuration"""Test Flow orchestration features of ClearFlow. This module tests the Flow class functionality including linear flows, branching, single termination enforcement, and flow composition. """ from dataclasses import dataclass, replace from dataclasses import dataclass as dc from typing import override from clearflow import Node, NodeResult, flow from tests.conftest import ValidationState @dataclass(frozen=True) class ChatState: """State for chat routing tests.""" query: str = "" intent: str = "" agent: str = "" response_type: str = "" formatted: bool = False @dataclass(frozen=True) class DocState: """State for document processing tests.""" source: str loaded: str = "" doc_count: str = "" embedded: str = "" embedding_dim: str = "" stored: str = "" # Test nodes for chat routing - defined outside test to reduce complexity @dc(frozen=True) class IntentClassifier(Node[ChatState]): """Classifies user intent for appropriate AI response.""" name: str = "classifier" @override async def exec(self, state: ChatState) -> NodeResult[ChatState]: query = state.query or "" intent = self._classify_intent(query) new_state = replace(state, intent=intent) return NodeResult(new_state, outcome=intent) @staticmethod def _classify_intent(query: str) -> str: """Classify query intent. Returns: Intent string: "technical", "question", or "general". """ query_lower = str(query).lower() if "code" in query_lower or "bug" in query_lower: return "technical" if "?" in str(query): return "question" return "general" @dc(frozen=True) class TechnicalAgent(Node[ChatState]): """Handles technical queries with code examples.""" name: str = "technical_agent" @override async def exec(self, state: ChatState) -> NodeResult[ChatState]: new_state = replace( state, agent="technical", response_type="code_example", ) return NodeResult(new_state, outcome="responded") @dc(frozen=True) class QAAgent(Node[ChatState]): """Handles Q&A with retrieval augmented generation.""" name: str = "qa_agent" @override async def exec(self, state: ChatState) -> NodeResult[ChatState]: new_state = replace( state, agent="qa", response_type="retrieved_answer", ) return NodeResult(new_state, outcome="responded") @dc(frozen=True) class GeneralAgent(Node[ChatState]): """Handles general conversation.""" name: str = "general_agent" @override async def exec(self, state: ChatState) -> NodeResult[ChatState]: new_state = replace( state, agent="general", response_type="chat", ) return NodeResult(new_state, outcome="responded") @dc(frozen=True) class ResponseFormatter(Node[ChatState]): """Formats final response for user.""" name: str = "formatter" @override async def exec(self, state: ChatState) -> NodeResult[ChatState]: new_state = replace(state, formatted=True) return NodeResult(new_state, outcome="complete") # Test data classes and nodes for linear flow @dc(frozen=True) class RawText: """Raw text to be processed.""" content: str source: str @dc(frozen=True) class TokenizedText: """Text split into tokens.""" raw: RawText tokens: tuple[str, ...] @dc(frozen=True) class IndexedDocument: """Final indexed document.""" tokenized: TokenizedText token_count: int indexed: bool = True @dc(frozen=True) class TokenizerNode(Node[RawText, TokenizedText]): """Tokenizes text for embedding generation.""" name: str = "tokenizer" @override async def exec(self, state: RawText) -> NodeResult[TokenizedText]: tokens = tuple(state.content.split()) tokenized = TokenizedText(raw=state, tokens=tokens) return NodeResult(tokenized, outcome="tokenized") @dc(frozen=True) class IndexerNode(Node[TokenizedText, IndexedDocument]): """Creates embeddings and indexes document.""" name: str = "indexer" @override async def exec(self, state: TokenizedText) -> NodeResult[IndexedDocument]: indexed = IndexedDocument(tokenized=state, token_count=len(state.tokens)) return NodeResult(indexed, outcome="indexed") # Nodes for nested flow testing @dc(frozen=True) class DocumentLoader(Node[DocState]): """Loads documents for processing.""" name: str = "loader" @override async def exec(self, state: DocState) -> NodeResult[DocState]: new_state = replace(state, loaded="true", doc_count="5") return NodeResult(new_state, outcome="loaded") @dc(frozen=True) class Embedder(Node[DocState]): """Creates embeddings from loaded documents.""" name: str = "embedder" @override async def exec(self, state: DocState) -> NodeResult[DocState]: new_state = replace( state, embedded="true", embedding_dim="768", ) return NodeResult(new_state, outcome="embedded") @dc(frozen=True) class VectorStore(Node[DocState]): """Stores embeddings in vector database.""" name: str = "vector_store" @override async def exec(self, state: DocState) -> NodeResult[DocState]: new_state = replace(state, stored="true") return NodeResult(new_state, outcome="indexed") class TestFlow: """Test the Flow orchestration.""" @staticmethod async def test_linear_flow_build() -> None: """Test building a linear flow.""" tokenizer = TokenizerNode() indexer = IndexerNode() flow_instance = flow("RAG", tokenizer).route(tokenizer, "tokenized", indexer).end(indexer, "indexed") assert flow_instance.name == "RAG" @staticmethod async def test_linear_flow_execution() -> None: """Test executing a linear flow.""" tokenizer = TokenizerNode() indexer = IndexerNode() flow_instance = flow("RAG", tokenizer).route(tokenizer, "tokenized", indexer).end(indexer, "indexed") initial = RawText(content="Natural language processing", source="test.txt") result = await flow_instance(initial) assert result.outcome == "indexed" assert isinstance(result.state, IndexedDocument) assert result.state.token_count == 3 @staticmethod async def test_chat_routing_flow_setup() -> None: """Test building an AI chat routing flow.""" # Build AI chat routing flow using new API classifier = IntentClassifier() technical = TechnicalAgent() qa = QAAgent() general = GeneralAgent() formatter = ResponseFormatter() chat_flow = ( flow("ChatRouter", classifier) .route(classifier, "technical", technical) .route(classifier, "question", qa) .route(classifier, "general", general) .route(technical, "responded", formatter) .route(qa, "responded", formatter) .route(general, "responded", formatter) .end(formatter, "complete") ) # Just verify flow builds correctly assert chat_flow.name == "ChatRouter" @staticmethod async def test_chat_technical_path() -> None: """Test technical query routing in chat flow.""" classifier = IntentClassifier() technical = TechnicalAgent() formatter = ResponseFormatter() chat_flow = ( flow("TechRouter", classifier) .route(classifier, "technical", technical) .route(technical, "responded", formatter) .end(formatter, "complete") ) tech_input = ChatState(query="How do I fix this bug in my code?") result = await chat_flow(tech_input) assert result.state.intent == "technical" assert result.state.agent == "technical" assert result.outcome == "complete" @staticmethod async def test_chat_question_path() -> None: """Test question routing in chat flow.""" classifier = IntentClassifier() qa = QAAgent() formatter = ResponseFormatter() chat_flow = ( flow("QARouter", classifier) .route(classifier, "question", qa) .route(qa, "responded", formatter) .end(formatter, "complete") ) question_input = ChatState(query="What is RAG?") result = await chat_flow(question_input) assert result.state.intent == "question" assert result.state.agent == "qa" assert result.outcome == "complete" @staticmethod async def test_chat_general_path() -> None: """Test general conversation routing.""" classifier = IntentClassifier() general = GeneralAgent() formatter = ResponseFormatter() chat_flow = ( flow("GeneralRouter", classifier) .route(classifier, "general", general) .route(general, "responded", formatter) .end(formatter, "complete") ) input_state = ChatState(query="Hello there") result = await chat_flow(input_state) assert result.state.intent == "general" assert result.state.agent == "general" assert result.outcome == "complete" @staticmethod async def test_single_termination_enforcement() -> None: """Test that flows must have exactly one termination point.""" @dc(frozen=True) class DataValidator(Node[ValidationState]): """Validates incoming data for processing.""" name: str = "validator" @override async def exec(self, state: ValidationState) -> NodeResult[ValidationState]: return NodeResult(state, outcome="valid") @dc(frozen=True) class DataProcessor(Node[ValidationState]): """Processes validated data.""" name: str = "processor" @override async def exec(self, state: ValidationState) -> NodeResult[ValidationState]: return NodeResult(state, outcome="processed") validator = DataValidator() processor = DataProcessor() # This works - single termination point valid_flow = ( flow("ValidationPipeline", validator).route(validator, "valid", processor).end(processor, "processed") ) # Test that it runs successfully result = await valid_flow(ValidationState(input_text="test data")) assert result.outcome == "processed" @staticmethod async def test_flow_composition() -> None: """Test that flows can be composed as nodes.""" loader = DocumentLoader() embedder = Embedder() # Create inner flow inner_flow = flow("Inner", loader).route(loader, "loaded", embedder).end(embedder, "embedded") # Use inner flow as a node vector_store = VectorStore() outer_flow = flow("Outer", inner_flow).route(inner_flow, "embedded", vector_store).end(vector_store, "indexed") # Just verify it builds assert outer_flow.name == "Outer" @staticmethod async def test_nested_flow_execution() -> None: """Test execution of nested flows.""" loader = DocumentLoader() embedder = Embedder() doc_flow = flow("DocFlow", loader).route(loader, "loaded", embedder).end(embedder, "embedded") vector_store = VectorStore() pipeline = flow("Pipeline", doc_flow).route(doc_flow, "embedded", vector_store).end(vector_store, "indexed") doc_input = DocState( source="kb", loaded="", doc_count="", embedded="", embedding_dim="", stored="", ) result = await pipeline(doc_input) assert result.outcome == "indexed" assert result.state.stored == "true"404: Not Found"""Test Node abstraction features of ClearFlow. This module tests the Node class functionality including lifecycle hooks, lifecycle hooks, and routing patterns for mission-critical AI orchestration. """ from dataclasses import dataclass as dc from dataclasses import replace from typing import override from clearflow import Node, NodeResult from tests.conftest import AgentState, Message, ValidationState # Module-level test nodes to reduce complexity @dc(frozen=True) class TokenCountNode(Node[ValidationState]): """Node for token counting in LLM validation.""" name: str = "token_counter" @override async def exec(self, state: ValidationState) -> NodeResult[ValidationState]: # Transformation: count tokens (simplified) token_count = len(state.input_text.split()) if token_count > 100: errors = (*state.errors, f"Token count {token_count} exceeds limit") new_state = replace(state, errors=errors, validated=False) outcome = "too_long" else: new_state = replace(state, validated=True) outcome = "valid_length" return NodeResult(new_state, outcome=outcome) @dc(frozen=True) class PromptState: """Immutable state for prompt engineering pipeline.""" raw_prompt: str sanitized: bool = False validated: bool = False enhanced: bool = False @dc(frozen=True) class PromptEngineeringNode(Node[PromptState]): """Node demonstrating lifecycle hooks for prompt engineering.""" name: str = "prompt_engineer" @override async def prep(self, state: PromptState) -> PromptState: # Sanitize prompt in prep phase return replace(state, sanitized=True) @override async def exec(self, state: PromptState) -> NodeResult[PromptState]: # Validate prompt in main execution new_state = replace(state, validated=True) return NodeResult(new_state, outcome="validated") @override async def post( self, result: NodeResult[PromptState], ) -> NodeResult[PromptState]: # Enhance prompt in post phase new_state = replace(result.state, enhanced=True) return NodeResult(new_state, outcome=result.outcome) @dc(frozen=True) class LLMRouterNode(Node[AgentState]): """Routes to different paths based on LLM analysis.""" name: str = "llm_router" @override async def exec(self, state: AgentState) -> NodeResult[AgentState]: # Analyze last user message for intent (simulating LLM classification) last_msg = state.messages[-1] if state.messages else None # Determine outcome and response based on message content outcome, response = self._classify_intent(last_msg) # Immutable state update new_state = AgentState( messages=(*state.messages, response), context=state.context, temperature=0.3 if outcome == "code_generation" else state.temperature, ) return NodeResult(new_state, outcome=outcome) @staticmethod def _classify_intent(msg: Message | None) -> tuple[str, Message]: """Classify user intent from message. Returns: Tuple of (intent string, response message). """ if not msg or msg.role != "user": return "no_input", Message(role="assistant", content="Please provide input.") content_lower = msg.content.lower() if "weather" in content_lower: return "tool_required", Message( role="assistant", content="I'll check the weather for you.", ) if "code" in content_lower: return "code_generation", Message( role="assistant", content="I'll help you write code.", ) return "direct_response", Message( role="assistant", content="I understand your request.", ) class TestNode: """Test the Node abstraction.""" @staticmethod async def test_immutable_transformations() -> None: """Test that nodes perform immutable transformations - same input produces same output.""" node = TokenCountNode() initial = ValidationState(input_text="Short prompt for testing") # Multiple calls with same input produce same output (immutable transformations) result1 = await node(initial) result2 = await node(initial) assert result1.state == result2.state assert result1.outcome == result2.outcome @staticmethod async def test_validation_success() -> None: """Test successful validation for short text.""" node = TokenCountNode() initial = ValidationState(input_text="Short prompt for testing") result = await node(initial) assert result.state.validated is True assert result.outcome == "valid_length" # Verify immutability assert initial.validated is False @staticmethod async def test_validation_failure() -> None: """Test validation failure for long text.""" node = TokenCountNode() # Create text with more than 100 words long_text = " ".join(["word"] * 101) initial = ValidationState(input_text=long_text) result = await node(initial) assert result.state.validated is False assert result.outcome == "too_long" assert len(result.state.errors) == 1 assert "exceeds limit" in result.state.errors[0] @staticmethod async def test_lifecycle_hooks() -> None: """Test that prep and post hooks work correctly.""" node = PromptEngineeringNode() initial = PromptState(raw_prompt="Explain quantum computing") result = await node(initial) assert result.state.sanitized is True assert result.state.validated is True assert result.state.enhanced is True assert initial.sanitized is False # Original unchanged @staticmethod async def test_router_no_input() -> None: """Test router handles missing user input.""" node = LLMRouterNode() empty_state = AgentState(messages=(), context="assistant") result = await node(empty_state) assert result.outcome == "no_input" assert "Please provide input" in result.state.messages[0].content @staticmethod async def test_router_tool_required() -> None: """Test router identifies tool-required intent.""" node = LLMRouterNode() weather_state = AgentState( messages=(Message(role="user", content="What's the weather in NYC?"),), context="weather_assistant", ) result = await node(weather_state) assert result.outcome == "tool_required" assert len(result.state.messages) == 2 assert "check the weather" in result.state.messages[-1].content @staticmethod async def test_router_code_generation() -> None: """Test router identifies code generation intent and adjusts temperature.""" node = LLMRouterNode() code_state = AgentState( messages=(Message(role="user", content="Help me write Python code"),), context="coding_assistant", temperature=0.7, ) result = await node(code_state) assert result.outcome == "code_generation" assert result.state.temperature == 0.3 # Lowered for code generation assert "write code" in result.state.messages[-1].content @staticmethod async def test_router_direct_response() -> None: """Test router handles general queries.""" node = LLMRouterNode() general_state = AgentState( messages=(Message(role="user", content="Tell me about history"),), context="assistant", ) result = await node(general_state) assert result.outcome == "direct_response" assert "understand your request" in result.state.messages[-1].content404: Not FoundMIT License Copyright (c) 2025 ClearFlow Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.