# Storage Abstraction Layer ## Overview The Workglow storage layer (`@workglow/storage`) provides unified interfaces for key-value, tabular, and vector storage. Job queue persistence is owned by `@workglow/job-queue`, with backend implementations exported from each vendor package's `./job-queue` subpath. Application code programs against these interfaces, and the concrete backend is selected at configuration time. This design allows the same pipeline to run with in-memory storage during development, SQLite for single-machine deployments, PostgreSQL for production clusters, IndexedDB in the browser, and Supabase for managed cloud infrastructure. Every storage interface is event-driven, schema-typed, and backend-agnostic. The storage layer also integrates with Workglow's service registry for dependency injection and with the task system's input resolution mechanism for runtime schema-based lookups. ## IKvStorage -- Key-Value Storage The simplest storage interface. It maps typed keys to typed values, similar to a `Map` but with persistence, events, and bulk operations. ```typescript interface IKvStorage< Key extends string | number = string, Value extends any = any, Combined = { key: Key; value: Value }, > { put(key: Key, value: Value): Promise; putBulk(items: Array<{ key: Key; value: Value }>): Promise; get(key: Key): Promise; delete(key: Key): Promise; getAll(): Promise; deleteAll(): Promise; size(): Promise; getObjectAsIdString(object: JSONValue): Promise; // Event methods on(name: Event, fn: KvEventListener<...>): void; off(name: Event, fn: KvEventListener<...>): void; emit(name: Event, ...args: ...): void; once(name: Event, fn: KvEventListener<...>): void; waitOn(name: Event): Promise<...>; } ``` ### Events | Event | Parameters | Description | | ----------- | --------------------------- | ------------------------- | | `put` | `(key, value)` | A value was stored | | `get` | `(key, value \| undefined)` | A value was retrieved | | `getAll` | `(results \| undefined)` | All values were retrieved | | `delete` | `(key)` | A value was deleted | | `deleteall` | `()` | All values were deleted | ### Available Backends | Backend | Class | Runtime | Notes | | ----------------- | ----------------------- | --------- | ----------------------------- | | In-Memory | `InMemoryKvStorage` | All | Fast, non-persistent | | SQLite | `SqliteKvStorage` | Node, Bun | File-based persistence | | PostgreSQL | `PostgresKvStorage` | Node, Bun | Network-based | | Supabase | `SupabaseKvStorage` | All | Cloud PostgreSQL | | IndexedDB | `IndexedDbKvStorage` | Browser | Browser-native | | Filesystem (JSON) | `FsFolderJsonKvStorage` | Node, Bun | One JSON file per key | | Filesystem | `FsFolderKvStorage` | Node, Bun | Raw file storage | | Via Tabular | `KvViaTabularStorage` | All | Adapts tabular storage as KV | | Telemetry | `TelemetryKvStorage` | All | Wraps another KV with tracing | ### Usage Example ```typescript import { InMemoryKvStorage } from "@workglow/storage"; const cache = new InMemoryKvStorage(); await cache.put("user:123", { name: "Alice", role: "admin" }); const user = await cache.get("user:123"); // { name: "Alice", role: "admin" } cache.on("put", (key, value) => { console.log(`Cached: ${key}`); }); ``` ## ITabularStorage -- Tabular Storage The workhorse interface for structured data with typed schemas, compound primary keys, querying, pagination, change subscriptions, and auto-generated keys. It is the foundation for document storage, knowledge base metadata, model registries, and more. ```typescript interface ITabularStorage< Schema extends DataPortSchemaObject, PrimaryKeyNames extends ReadonlyArray, Entity = FromSchema, PrimaryKey = SimplifyPrimaryKey, InsertType = InsertEntity>, > { put(value: InsertType): Promise; putBulk(values: InsertType[]): Promise; get(key: PrimaryKey): Promise; delete(key: PrimaryKey | Entity): Promise; getAll(options?: QueryOptions): Promise; deleteAll(): Promise; size(): Promise; deleteSearch(criteria: DeleteSearchCriteria): Promise; getBulk(offset: number, limit: number): Promise; records(pageSize?: number): AsyncGenerator; pages(pageSize?: number): AsyncGenerator; query( criteria: SearchCriteria, options?: QueryOptions ): Promise; subscribeToChanges( callback: (change: TabularChangePayload) => void, options?: TabularSubscribeOptions ): () => void; setupDatabase(): Promise; destroy(): void; } ``` ### Schema Integration Schemas are JSON Schema objects (`DataPortSchemaObject`) that define both the shape of the data and metadata used by the storage layer: ```typescript import type { DataPortSchemaObject } from "@workglow/util/schema"; const UserSchema = { type: "object", properties: { user_id: { type: "string", "x-auto-generated": true }, name: { type: "string" }, email: { type: "string" }, created_at: { type: "string" }, }, required: ["user_id", "name", "email", "created_at"], additionalProperties: false, } as const satisfies DataPortSchemaObject; const UserPrimaryKey = ["user_id"] as const; ``` The `x-auto-generated: true` annotation marks fields whose values are assigned by the storage backend. For integer types, values auto-increment. For string types, a UUID is generated. When inserting, these fields are optional in the `InsertType`. ### Querying The `query` method supports equality and comparison operators: ```typescript // Equality match const results = await storage.query({ category: "electronics" }); // With comparison operator const recent = await storage.query({ created_at: { value: "2025-01-01", operator: ">=" }, }); // Multiple criteria (AND logic) const filtered = await storage.query( { category: "electronics", price: { value: 100, operator: "<" }, }, { orderBy: [{ column: "price", direction: "ASC" }], limit: 10, offset: 0, } ); ``` ### Pagination Two async generator methods support efficient pagination over large datasets: ```typescript // Iterate individual records for await (const record of storage.records(100)) { process(record); } // Iterate pages for await (const page of storage.pages(100)) { await processBatch(page); } ``` ### Change Subscriptions Subscribe to INSERT, UPDATE, and DELETE events -- including changes from other processes: ```typescript const unsubscribe = storage.subscribeToChanges( (change) => { console.log(change.type); // "INSERT" | "UPDATE" | "DELETE" console.log(change.old); // previous entity (for UPDATE/DELETE) console.log(change.new); // new entity (for INSERT/UPDATE) }, { pollingIntervalMs: 1000 } ); // Later: stop listening unsubscribe(); ``` ### Events | Event | Parameters | Description | | ---------- | ----------------------------------- | --------------------------------- | | `put` | `(entity)` | An entity was inserted or updated | | `get` | `(key, entity \| undefined)` | An entity was retrieved | | `query` | `(criteria, entities \| undefined)` | A query was executed | | `delete` | `(key)` | An entity was deleted | | `clearall` | `()` | All entities were deleted | ### Available Backends | Backend | Class | Runtime | Notes | | ---------------- | ------------------------------ | --------- | -------------------------- | | In-Memory | `InMemoryTabularStorage` | All | Fast, non-persistent | | Shared In-Memory | `SharedInMemoryTabularStorage` | All | Shared between instances | | SQLite | `SqliteTabularStorage` | Node, Bun | File-based persistence | | PostgreSQL | `PostgresTabularStorage` | Node, Bun | Network-based | | Supabase | `SupabaseTabularStorage` | All | Cloud PostgreSQL | | IndexedDB | `IndexedDbTabularStorage` | Browser | Browser-native | | Filesystem | `FsFolderTabularStorage` | Node, Bun | Directory-based | | HuggingFace | `HuggingFaceTabularStorage` | All | HuggingFace datasets | | Cached | `CachedTabularStorage` | All | Read-through cache wrapper | | Telemetry | `TelemetryTabularStorage` | All | Wraps with tracing | ## IQueueStorage -- Job Queue Storage Specialized storage for job queue persistence is exported from `@workglow/job-queue`, not `@workglow/storage`. It extends beyond basic CRUD to support atomic job claiming, abort signaling, progress tracking, and TTL-based cleanup. ```typescript interface IQueueStorage { add(job: JobStorageFormat): Promise; get(id: unknown): Promise | undefined>; next(workerId: string): Promise | undefined>; peek(status?: JobStatus, num?: number): Promise>>; size(status?: JobStatus): Promise; complete(job: JobStorageFormat): Promise; deleteAll(): Promise; outputForInput(input: Input): Promise; abort(id: unknown): Promise; getByRunId(runId: string): Promise>>; saveProgress( id: unknown, progress: number, message: string, details: Record | null ): Promise; delete(id: unknown): Promise; deleteJobsByStatusAndAge(status: JobStatus, olderThanMs: number): Promise; setupDatabase(): Promise; subscribeToChanges( callback: (change: QueueChangePayload) => void, options?: QueueSubscribeOptions ): () => void; } ``` ### Key Operations **`next(workerId)`** is the critical method. It atomically finds the next PENDING job whose `run_after` time has passed, sets its status to PROCESSING, increments `run_attempts`, records the `workerId`, and returns it. This atomic claim prevents two workers from processing the same job. **`abort(id)`** sets a job's status to ABORTING. The worker detects this on its next `checkForAbortingJobs` pass and fires the abort controller. **`saveProgress(id, progress, message, details)`** persists progress information without changing the job's status. **`deleteJobsByStatusAndAge(status, olderThanMs)`** enables TTL-based cleanup of completed or failed jobs. ### JobStorageFormat Jobs are stored in a flat format matching database column layouts: ```typescript type JobStorageFormat = { id?: unknown; job_run_id?: string; queue?: string; input: Input; output?: Output | null; error?: string | null; error_code?: string | null; fingerprint?: string; max_retries?: number; status?: JobStatus; run_after: string | null; completed_at: string | null; worker_id?: string | null; progress?: number; progress_message?: string; // ... }; ``` ### JobStatus ```typescript const JobStatus = { PENDING: "PENDING", PROCESSING: "PROCESSING", COMPLETED: "COMPLETED", ABORTING: "ABORTING", FAILED: "FAILED", DISABLED: "DISABLED", } as const; ``` ### Prefix Filtering Queue storage supports prefix columns for multi-tenant scenarios: ```typescript const storage = new SqliteQueueStorage(db, "jobs", { prefixes: [ { name: "user_id", type: "uuid" }, { name: "project_id", type: "uuid" }, ], prefixValues: { user_id: "abc-123", project_id: "proj-456", }, }); ``` Subscriptions can filter by prefix: ```typescript // Only this user's jobs storage.subscribeToChanges(callback); // All projects for this user storage.subscribeToChanges(callback, { prefixFilter: { user_id: "abc-123" }, }); // All jobs (admin view) storage.subscribeToChanges(callback, { prefixFilter: {} }); ``` ### Available Backends | Backend | Class | Import | | ---------- | ----------------------- | ------------------------------- | | In-Memory | `InMemoryQueueStorage` | `@workglow/job-queue` | | SQLite | `SqliteQueueStorage` | `@workglow/sqlite/job-queue` | | PostgreSQL | `PostgresQueueStorage` | `@workglow/postgres/job-queue` | | Supabase | `SupabaseQueueStorage` | `@workglow/supabase/job-queue` | | IndexedDB | `IndexedDbQueueStorage` | `@workglow/indexeddb/job-queue` | | Telemetry | `TelemetryQueueStorage` | `@workglow/job-queue` | ## IVectorStorage -- Vector Storage Extends `ITabularStorage` with vector similarity search capabilities. It stores vector embeddings alongside metadata and supports both pure vector search and hybrid (vector + full-text) search. ```typescript interface IVectorStorage< Metadata extends Record | undefined, Schema extends DataPortSchemaObject, Entity, PrimaryKeyNames, PrimaryKey, InsertType, > extends ITabularStorage { getVectorDimensions(): number; similaritySearch( query: TypedArray, options?: VectorSearchOptions ): Promise<(Entity & { score: number })[]>; hybridSearch?( query: TypedArray, options: HybridSearchOptions ): Promise<(Entity & { score: number })[]>; } ``` ### Similarity Search ```typescript const results = await vectorStorage.similaritySearch(queryVector, { topK: 10, filter: { doc_id: "doc-123" }, scoreThreshold: 0.7, }); for (const result of results) { console.log(result.metadata.text, result.score); } ``` ### Hybrid Search Combines vector similarity with full-text search for improved retrieval quality: ```typescript const results = await vectorStorage.hybridSearch(queryVector, { textQuery: "machine learning transformers", topK: 10, vectorWeight: 0.7, // 70% vector, 30% text relevance }); ``` Not all backends support hybrid search. Check with `typeof storage.hybridSearch === "function"` before calling. ### Search Options ```typescript interface VectorSearchOptions { readonly topK?: number; // Max results (default varies by backend) readonly filter?: Partial; // Metadata filter readonly scoreThreshold?: number; // Minimum similarity score } interface HybridSearchOptions extends VectorSearchOptions { readonly textQuery: string; // Full-text search query readonly vectorWeight?: number; // Weight for vector vs text (0-1) } ``` ### Schema Helpers The module provides helper functions for introspecting vector schemas: ```typescript import { getVectorProperty, getMetadataProperty } from "@workglow/storage"; const vectorColumn = getVectorProperty(schema); // e.g., "vector" const metadataColumn = getMetadataProperty(schema); // e.g., "metadata" ``` ### Available Backends | Backend | Class | Runtime | Notes | | ---------- | ------------------------ | --------- | ----------------------------- | | In-Memory | `InMemoryVectorStorage` | All | Brute-force cosine similarity | | SQLite | `SqliteVectorStorage` | Node, Bun | sqlite-vec extension | | SQLite AI | `SqliteAiVectorStorage` | Node, Bun | Optimized for AI workflows | | PostgreSQL | `PostgresVectorStorage` | Node, Bun | pgvector extension | | IndexedDB | `IndexedDbVectorStorage` | Browser | Brute-force in browser | ## Registry and Input Resolution Storage instances can be registered in the global service registry and resolved at runtime by tasks through the input resolution system. Task schemas use `format` annotations to declare the kind of storage they need: ```typescript static inputSchema(): DataPortSchema { return { type: "object", properties: { storage: { type: "string", format: "storage:tabular", title: "Storage", }, }, } as const satisfies DataPortSchema; } ``` When a task runs, the framework's input resolver maps the string identifier to the actual storage instance from the registry. This decouples task definitions from concrete storage backends. ### Service Tokens | Token | Type | Description | | ---------------------- | ---------------------------------------------------- | ---------------------------- | | `QUEUE_STORAGE` | `IQueueStorage` from `@workglow/job-queue` | Default queue storage | | `RATE_LIMITER_STORAGE` | `IRateLimiterStorage` from `@workglow/job-queue` | Default rate limiter storage | ## Database Setup All non-in-memory backends require a `setupDatabase()` call before use. This creates tables, indices, and any required schema. For production deployments, this setup should be done through proper database migrations rather than calling `setupDatabase()` at runtime. ```typescript const storage = new SqliteTabularStorage(db, "users", UserSchema, UserPrimaryKey); await storage.setupDatabase(); // Creates table and indices // Now safe to use await storage.put({ name: "Alice", email: "alice@example.com", created_at: new Date().toISOString(), }); ``` ## API Reference ### IKvStorage - `put(key, value): Promise` -- Store a value. - `putBulk(items): Promise` -- Store multiple values. - `get(key): Promise` -- Retrieve a value. - `delete(key): Promise` -- Delete a value. - `getAll(): Promise` -- Retrieve all entries. - `deleteAll(): Promise` -- Delete all entries. - `size(): Promise` -- Count entries. ### ITabularStorage - `put(value): Promise` -- Insert or update (returns entity with auto-generated fields). - `putBulk(values): Promise` -- Bulk insert/update. - `get(key): Promise` -- Get by primary key. - `delete(key): Promise` -- Delete by primary key. - `getAll(options?): Promise` -- Get all with optional ordering/limit. - `deleteAll(): Promise` -- Delete all entities. - `size(): Promise` -- Count entities. - `deleteSearch(criteria): Promise` -- Delete by search criteria. - `getBulk(offset, limit): Promise` -- Paginated fetch. - `records(pageSize?): AsyncGenerator` -- Iterate individual records. - `pages(pageSize?): AsyncGenerator` -- Iterate pages. - `query(criteria, options?): Promise` -- Search with criteria and options. - `subscribeToChanges(callback, options?): () => void` -- Subscribe to change notifications. - `setupDatabase(): Promise` -- Initialize the database schema. - `destroy(): void` -- Free resources. ### IQueueStorage (`@workglow/job-queue`) - `add(job): Promise` -- Add a job, returns its ID. - `get(id): Promise` -- Get a job by ID. - `next(workerId): Promise` -- Atomically claim the next pending job. - `peek(status?, num?): Promise` -- Peek at jobs without claiming. - `size(status?): Promise` -- Count jobs by status. - `complete(job): Promise` -- Update a job's final state. - `abort(id): Promise` -- Mark a job for abortion. - `getByRunId(runId): Promise` -- Get jobs by run ID. - `saveProgress(id, progress, message, details): Promise` -- Update progress. - `delete(id): Promise` -- Delete a specific job. - `deleteJobsByStatusAndAge(status, olderThanMs): Promise` -- TTL cleanup. - `deleteAll(): Promise` -- Delete all jobs. - `setupDatabase(): Promise` -- Initialize schema. - `subscribeToChanges(callback, options?): () => void` -- Subscribe to changes. ### IVectorStorage Inherits all `ITabularStorage` methods, plus: - `getVectorDimensions(): number` -- Get the configured vector dimension. - `similaritySearch(query, options?): Promise<(Entity & { score: number })[]>` -- Find similar vectors. - `hybridSearch?(query, options): Promise<(Entity & { score: number })[]>` -- Combined vector + text search.