--- name: cocoindex-v1 description: This skill should be used when building data processing pipelines with CocoIndex v1, a Python library for incremental data transformation. Use when the task involves processing files/data into databases, creating vector embeddings, building knowledge graphs, ETL workflows, or any data pipeline requiring automatic change detection and incremental updates. CocoIndex v1 is Python-native (supports any Python types), has no DSL, and is currently under pre-release (version 1.0.0a1 or later). --- # CocoIndex v1 CocoIndex v1 is a Python library for building incremental data processing pipelines with declarative target states. Think spreadsheets or React for data pipelines: declare what the output should look like based on current input, and CocoIndex automatically handles incremental updates, change detection, and syncing to external systems. ## Overview CocoIndex v1 enables building data pipelines that: - **Automatically handle incremental updates**: Only reprocess changed data - **Use declarative target states**: Declare what should exist, not how to update - **Support any Python types**: No custom DSL—use dataclasses, Pydantic, NamedTuple - **Provide function memoization**: Skip expensive operations when inputs/code unchanged - **Sync to multiple targets**: PostgreSQL, SQLite, LanceDB, Qdrant, file systems **Key principle**: `TargetState = Transform(SourceState)` ## When to Use This Skill Use this skill when building pipelines that involve: - **Document processing**: PDF/Markdown conversion, text extraction, chunking - **Vector embeddings**: Embedding documents/code for semantic search - **Database transformations**: ETL from source DB to target DB - **Knowledge graphs**: Extract entities and relationships from data - **LLM-based extraction**: Structured data extraction using LLMs - **File-based pipelines**: Transform files from one format to another - **Incremental indexing**: Keep search indexes up-to-date with source changes ## Quick Start: Creating a New Project ### Initialize Project Use the built-in CLI to create a new project: ```bash cocoindex init my-project cd my-project ``` This creates: - `main.py` - Main app definition - `pyproject.toml` - Dependencies with pre-release config - `.env` - Environment configuration - `README.md` - Quick start guide ### Add Dependencies for Specific Use Cases Add dependencies to `pyproject.toml` based on your needs: ```toml # For vector embeddings dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"] # For PostgreSQL only dependencies = ["cocoindex>=1.0.0a1", "asyncpg"] # For LLM extraction dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"] ``` See [references/setup_project.md](references/setup_project.md) for complete examples. ### Set Up Database (if using Postgres/Qdrant) For PostgreSQL with Docker: ```bash # Create docker-compose.yml with pgvector image docker-compose up -d ``` For Qdrant with Docker: ```bash # Create docker-compose.yml with Qdrant image docker-compose up -d ``` See [references/setup_database.md](references/setup_database.md) for detailed setup instructions. ### Run the Pipeline ```bash pip install -e . cocoindex update main.py ``` ## Core Concepts ### 1. Apps An **app** is the top-level executable that binds a main function with parameters: ```python import cocoindex as coco @coco.function def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None: # Processing logic here ... app = coco.App( coco.AppConfig(name="MyApp"), app_main, sourcedir=pathlib.Path("./data"), outdir=pathlib.Path("./output"), ) if __name__ == "__main__": app.update(report_to_stdout=True) ``` ### 2. Processing Components A **processing component** groups an item's processing with its target states. Mount components using `coco.mount()`: ```python for file in files: coco.mount( coco.component_subpath("file", str(file.file_path.path)), process_file, file, outdir, ) ``` For operations that return values, use `coco.mount_run()` to wait for the result: ```python table = coco.mount_run( coco.component_subpath("setup", "table"), db.declare_table_target, table_name="my_table", table_schema=schema, ).result() ``` **Key points**: - Each component runs independently - Use `mount()` for parallel processing, `mount_run()` when you need the result immediately - Use stable paths for proper memoization - Component path determines target state ownership ### 3. Function Memoization Add `memo=True` to skip re-execution when inputs/code unchanged: ```python @coco.function(memo=True) def expensive_operation(data: str) -> Result: # LLM call, embedding generation, heavy computation result = expensive_transform(data) return result ``` ### 4. Target States **Declare** what should exist—CocoIndex handles creation/update/deletion: ```python # File target localfs.declare_file(outdir / "output.txt", content) # Database row target table.declare_row(row=MyRecord(id=1, name="example")) # Vector point target (Qdrant) collection.declare_point(point=PointStruct(id="1", vector=[...])) ``` ### 5. Context for Shared Resources Use `ContextKey` to share expensive resources across components: ```python EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder") @coco.lifespan def coco_lifespan(builder: coco.EnvironmentBuilder): embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2") builder.provide(EMBEDDER, embedder) yield ``` The `@coco.lifespan` decorator registers the function to the default CocoIndex environment, which is shared among all apps by default. ```python @coco.function def process_item(text: str) -> None: embedder = coco.use_context(EMBEDDER) embedding = embedder.embed(text) ``` ### 6. ID Generation Generate stable, unique identifiers that persist across incremental updates: ```python from cocoindex.resources.id import generate_id, IdGenerator # Deterministic: same dep → same ID chunk_id = generate_id(chunk.content) # Always distinct: each call → new ID, even with same dep id_gen = IdGenerator() for chunk in chunks: chunk_id = id_gen.next_id(chunk.content) table.declare_row(row=Row(id=chunk_id, content=chunk.content)) ``` Use `generate_id(dep)` when same content should yield same ID. Use `IdGenerator` when you need distinct IDs even for duplicate content. See [ID Generation docs](https://cocoindex.io/docs-v1/resource_types#id-generation) for details. ## Common Pipeline Patterns ### Pattern 1: File Transformation Transform files from input to output directory: ```python import cocoindex as coco from cocoindex.connectors import localfs from cocoindex.resources.file import PatternFilePathMatcher @coco.function(memo=True) def process_file(file, outdir): content = file.read_text() transformed = transform_content(content) # Your logic outname = file.file_path.path.stem + ".out" localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True) @coco.function def app_main(sourcedir, outdir): files = localfs.walk_dir( sourcedir, recursive=True, path_matcher=PatternFilePathMatcher( included_patterns=["*.txt", "*.md"], excluded_patterns=[".*/**"], ), ) for f in files: coco.mount(coco.component_subpath("file", str(f.file_path.path)), process_file, f, outdir) app = coco.App(coco.AppConfig(name="Transform"), app_main, sourcedir=Path("./data"), outdir=Path("./out")) ``` ### Pattern 2: Vector Embedding Pipeline Chunk and embed documents for semantic search: ```python import asyncio from dataclasses import dataclass from typing import Annotated, AsyncIterator import cocoindex as coco import cocoindex.asyncio as coco_aio from cocoindex.connectors import localfs, postgres from cocoindex.ops.text import RecursiveSplitter from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder from cocoindex.resources.id import IdGenerator from numpy.typing import NDArray PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db") _embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2") _splitter = RecursiveSplitter() @dataclass class Embedding: id: int # Generated stable ID filename: str text: str embedding: Annotated[NDArray, _embedder] # Auto-infer dimensions start_line: int end_line: int @coco_aio.lifespan async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]: async with await postgres.create_pool(DATABASE_URL) as pool: builder.provide(PG_DB, postgres.register_db("embedding_db", pool)) yield @coco.function(memo=True) async def process_chunk(chunk_id, filename, chunk, table): table.declare_row( row=Embedding( id=chunk_id, filename=str(filename), text=chunk.text, embedding=await _embedder.embed_async(chunk.text), start_line=chunk.start.line, end_line=chunk.end.line, ), ) @coco.function(memo=True) async def process_file(file, table): text = file.read_text() chunks = _splitter.split(text, chunk_size=1000, min_chunk_size=300, chunk_overlap=200) id_gen = IdGenerator() # Generate stable IDs for each chunk await asyncio.gather(*( process_chunk(id_gen.next_id(chunk.text), file.file_path.path, chunk, table) for chunk in chunks )) @coco.function def app_main(sourcedir): target_db = coco.use_context(PG_DB) target_table = coco.mount_run( coco.component_subpath("setup", "table"), target_db.declare_table_target, table_name="embeddings", table_schema=postgres.TableSchema(Embedding, primary_key=["id"]), ).result() files = localfs.walk_dir(sourcedir, recursive=True) for file in files: coco.mount(coco.component_subpath("file", str(file.file_path.path)), process_file, file, target_table) app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data")) ``` ### Pattern 3: LLM-Based Extraction Extract structured data using LLMs: ```python import instructor from pydantic import BaseModel from litellm import acompletion _instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON) class ExtractionResult(BaseModel): title: str topics: list[str] @coco.function(memo=True) # Memo avoids re-calling LLM async def extract_and_store(content, message_id, table): result = await _instructor_client.chat.completions.create( model="gpt-4", response_model=ExtractionResult, messages=[{"role": "user", "content": f"Extract topics: {content}"}], ) table.declare_row(row=Message(id=message_id, title=result.title, content=content)) ``` ## Connectors and Operations CocoIndex v1 provides connectors for reading from and writing to various external systems including databases (SQL and vector), file systems, and more. **For detailed connector documentation**, see: - [references/connectors.md](references/connectors.md) - Complete connector reference with examples - [Pattern examples](#common-pipeline-patterns) - Real-world usage in pipelines - [AI-optimized docs](https://cocoindex.io/docs-v1/llms.txt) - Comprehensive online documentation ## Text and Embedding Operations ### Text Splitting ```python from cocoindex.ops.text import RecursiveSplitter, detect_code_language splitter = RecursiveSplitter() language = detect_code_language(filename="example.py") chunks = splitter.split( text, chunk_size=1000, min_chunk_size=300, chunk_overlap=200, language=language, # Syntax-aware splitting ) ``` ### Embeddings ```python from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2") # Sync embedding = embedder.embed(text) # Async embedding = await embedder.embed_async(text) ``` ## CLI Commands ### Run Pipeline ```bash cocoindex update main.py # Run app in main.py cocoindex update main.py:my_app # Run specific app cocoindex update my_module:my_app # Run from module ``` ### Drop All State ```bash cocoindex drop main.py [-f] # Drop and reset ``` ### List Apps ```bash cocoindex ls main.py # List apps in file cocoindex ls --db ./cocoindex.db # List apps in DB ``` ### Show Component Paths ```bash cocoindex show main.py # Show component tree ``` ## Best Practices ### 1. Use Stable Component Paths ```python # ✅ Good: Stable identifiers coco.component_subpath("file", str(file.file_path.path)) coco.component_subpath("record", record.id) # ❌ Bad: Unstable identifiers coco.component_subpath("file", file) # Object reference coco.component_subpath("idx", idx) # Index changes ``` ### 2. Add Memoization for Expensive Operations ```python # ✅ Good: Memoize expensive ops @coco.function(memo=True) async def process_chunk(chunk, table): embedding = await embedder.embed_async(chunk.text) # Expensive! table.declare_row(...) # ❌ Bad: No memoization @coco.function # Re-embeds every run async def process_chunk(chunk, table): embedding = await embedder.embed_async(chunk.text) ``` ### 3. Use Context for Shared Resources ```python # ✅ Good: Load model once @coco.lifespan def coco_lifespan(builder): model = load_expensive_model() builder.provide(MODEL_KEY, model) yield # ❌ Bad: Load model every time @coco.function def process(data): model = load_expensive_model() # Loaded repeatedly! ``` ### 4. Use Type Annotations ```python # ✅ Good: Type-safe from dataclasses import dataclass from typing import Annotated from numpy.typing import NDArray @dataclass class Record: id: int name: str vector: Annotated[NDArray, embedder] # Auto-infer dimensions # ❌ Bad: No type safety record = {"id": 1, "name": "example", "vector": [...]} ``` ### 5. Organize with Setup Phase ```python with coco.component_subpath("setup"): table = coco.mount_run(...).result() with coco.component_subpath("processing"): for item in items: coco.mount(coco.component_subpath(item.id), ...) ``` ## Troubleshooting ### "Module not found" Error Ensure pyproject.toml has pre-release config: ```toml [tool.uv] prerelease = "explicit" ``` ### PostgreSQL pgvector Not Found Enable the pgvector extension: ```bash # Connect to your database and enable the extension psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;" ``` See [references/setup_database.md](references/setup_database.md) for detailed setup instructions. ### Memoization Not Working Check component paths are stable: ```python # Use stable IDs, not object references coco.component_subpath(file.stable_key) # ✅ coco.component_subpath(file) # ❌ ``` ### Everything Reprocessing Add `memo=True` to expensive functions: ```python @coco.function(memo=True) # Add this async def process_item(item): ... ``` ## Resources ### references/ - **[setup_project.md](references/setup_project.md)**: Project setup guide with dependency examples for different use cases - **[setup_database.md](references/setup_database.md)**: Database setup guide (PostgreSQL, SQLite, LanceDB, Qdrant) - **[connectors.md](references/connectors.md)**: Complete connector reference with usage examples - **[patterns.md](references/patterns.md)**: Detailed pipeline patterns with full working code - **[api_reference.md](references/api_reference.md)**: Quick API reference for common functions ### assets/ - **simple-template/**: Minimal project template structure ## Additional Resources **For AI Agents:** - [AI-Optimized Documentation](https://cocoindex.io/docs-v1/llms.txt) - Comprehensive documentation optimized for LLM consumption **For Humans:** - [CocoIndex Documentation](https://docs.cocoindex.dev/docs-v1/) - Full documentation site - [Programming Guide](https://docs.cocoindex.dev/docs-v1/programming_guide/core_concepts) - Core concepts and patterns - [GitHub Examples](https://github.com/cocoindex-io/cocoindex/tree/v1/examples) - Real-world example projects - [CocoIndex on PyPI](https://pypi.org/project/cocoindex/) - Package repository (pre-release) ## Version Note This skill is for **CocoIndex v1** (pre-release: `>=1.0.0a1`). It uses a completely different API from v0. Key differences: - Python-native (no DSL) - Any Python types supported - No flow definitions required - More flexible and seamless experience