--- name: cdc description: Change Data Capture - architecture, entrypoints, bytecode emission, sync engine integration, tests --- # CDC (Change Data Capture) - Internal Feature Map ## Overview CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a dedicated CDC table (`turso_cdc` by default). It is per-connection, enabled via PRAGMA, and operates at the bytecode generation (translate) layer. The sync engine consumes CDC records to push local changes to the remote. ## Architecture Diagram ``` User SQL (INSERT/UPDATE/DELETE/DDL) | v ┌─────────────────────────────────────────────────┐ │ Translate layer (core/translate/) │ │ ┌───────────────────────────────────────────┐ │ │ │ prepare_cdc_if_necessary() │ │ │ │ - checks CaptureDataChangesInfo │ │ │ │ - opens CDC table cursor (OpenWrite) │ │ │ │ - skips if target == CDC table itself │ │ │ └───────────────────────────────────────────┘ │ │ ┌───────────────────────────────────────────┐ │ │ │ emit_cdc_insns() │ │ │ │ - writes (change_id, change_time, │ │ │ │ change_type, table_name, id, │ │ │ │ before, after, updates) into CDC tbl │ │ │ └───────────────────────────────────────────┘ │ │ + emit_cdc_full_record() / emit_cdc_patch_record() │ └─────────────────────────────────────────────────┘ | v CDC table (turso_cdc or custom name) | v ┌─────────────────────────────────────────────────┐ │ Sync engine (sync/engine/) │ │ DatabaseTape reads CDC table → DatabaseChange │ │ → apply/revert → push to remote │ └─────────────────────────────────────────────────┘ ``` ## Core Data Types ### `CaptureDataChangesMode` + `CaptureDataChangesInfo` — `core/lib.rs` CDC behavior is controlled by two types: ```rust #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[repr(u8)] enum CdcVersion { V1 = 1, V2 = 2, } const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2; enum CaptureDataChangesMode { Id, // capture only rowid Before, // capture before-image After, // capture after-image Full, // before + after + updates } struct CaptureDataChangesInfo { mode: CaptureDataChangesMode, table: String, // CDC table name version: Option, // schema version (V1 or V2) } ``` The connection stores `Option` — `None` means CDC is off. Key methods on `CdcVersion`: - `has_commit_record()` — `self >= V2`, gates COMMIT record emission - `Display`/`FromStr` — round-trips `"v1"` ↔ `V1`, `"v2"` ↔ `V2` Key methods on `CaptureDataChangesInfo`: - `parse(value: &str, version: Option)` — parses PRAGMA argument `"[,]"`, returns `None` for "off" - `cdc_version()` — returns `CdcVersion` (panics if version is None). Single accessor replacing old `is_v1()`/`is_v2()`/`version()` methods. - `has_before()` / `has_after()` / `has_updates()` — mode capability checks - `mode_name()` — returns mode as string Convenience trait `CaptureDataChangesExt` on `Option` provides: - `has_before()` / `has_after()` / `has_updates()` — delegates to inner, returns false for None - `table()` — returns `Option<&str>`, None when CDC is off ### CDC Table Schema v1 Default table name: `turso_cdc` (constant `TURSO_CDC_DEFAULT_TABLE_NAME`) ```sql CREATE TABLE turso_cdc ( change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, -- unixepoch() change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE table_name TEXT, id , -- rowid of changed row before BLOB, -- binary record (before-image) after BLOB, -- binary record (after-image) updates BLOB -- binary record of per-column changes ); ``` ### CDC Table Schema v2 (current) ```sql CREATE TABLE turso_cdc ( change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, -- unixepoch() change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT table_name TEXT, id , -- rowid of changed row before BLOB, -- binary record (before-image) after BLOB, -- binary record (after-image) updates BLOB, -- binary record of per-column changes change_txn_id INTEGER -- transaction ID (groups rows into transactions) ); ``` v2 adds: - `change_txn_id` column — groups CDC rows by transaction. Assigned via `conn_txn_id(candidate)` opcode which get-or-sets a per-connection transaction ID. - `change_type=2` (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit `COMMIT`. The CDC table is created at runtime by the `InitCdcVersion` opcode via `CREATE TABLE IF NOT EXISTS`. ### CDC Version Table When CDC is first enabled, a version tracking table is created: ```sql CREATE TABLE turso_cdc_version ( table_name TEXT PRIMARY KEY, version TEXT NOT NULL ); ``` Current version: `CDC_VERSION_CURRENT = CdcVersion::V2` (defined in `core/lib.rs`, re-exported from `core/translate/pragma.rs`) ### Version Detection in InitCdcVersion The `InitCdcVersion` opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it: - If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking) - If CDC table doesn't exist → create with current version (v2) - If version row already exists → use that version as-is ### `DatabaseChange` — `sync/engine/src/types.rs:229-249` Sync engine's Rust representation of a CDC row. Has `into_apply()` and `into_revert()` methods for forward/backward replay. ### `OperationMode` — `core/translate/emitter.rs` Used by `emit_cdc_insns()` to determine `change_type` value: - `INSERT` → 1 - `UPDATE` / `SELECT` → 0 - `DELETE` → -1 - `COMMIT` → 2 (v2 only, emitted by `emit_cdc_commit_insns`) ## Entry Points ### 1. PRAGMA — Enable/Disable CDC **Set:** `core/translate/pragma.rs` - Checks MVCC is not enabled (CDC and MVCC are mutually exclusive) - Parses mode string via `CaptureDataChangesInfo::parse()` with `CDC_VERSION_CURRENT` - Emits a single `InitCdcVersion` opcode — all CDC setup (table creation, version tracking, state change) happens at execution time **Get (read current mode):** `core/translate/pragma.rs` - Returns 3 columns: `mode`, `table`, `version` - When off: returns `("off", NULL, NULL)` - When active: returns `(mode_name, table, version)` **Pragma registration:** `core/pragma.rs` — `UnstableCaptureDataChangesConn` with columns `["mode", "table", "version"]` ### 2. Connection State **Field:** `core/connection.rs` — `capture_data_changes: RwLock>` **Getter:** `get_capture_data_changes_info()` — returns read guard **Setter:** `set_capture_data_changes_info(opts: Option)` **Default:** initialized as `None` (CDC off) ### 3. ProgramBuilder Integration **Field:** `core/vdbe/builder.rs` — `capture_data_changes_info: Option` **Accessor:** `capture_data_changes_info()` — returns `&Option` **Passed from:** `core/translate/mod.rs` — read from connection when creating builder ### 4. PrepareContext **Field:** `core/vdbe/mod.rs` — `capture_data_changes: Option` **Set from:** `PrepareContext::from_connection()` — clones from `connection.get_capture_data_changes_info()` ### 5. InitCdcVersion Opcode — `core/vdbe/execute.rs` Always emitted by PRAGMA SET. Handles all CDC setup at execution time: 1. For "off": stores `None` in `state.pending_cdc_info`, returns early 2. Checks if CDC table already exists (for v1 backward compatibility) 3. Creates CDC table (`CREATE TABLE IF NOT EXISTS ...`) — v2 schema with `change_txn_id` column 4. Creates version table (`CREATE TABLE IF NOT EXISTS turso_cdc_version ...`) 5. Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses `INSERT OR IGNORE` to preserve existing version rows. 6. Reads back actual version from the table 7. Stores computed `CaptureDataChangesInfo` in `state.pending_cdc_info` The connection's CDC state is **not applied in the opcode**. Instead, `pending_cdc_info` is applied in `halt()` only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged. All table creation is done via nested `conn.prepare()`/`run_ignore_rows()` calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema. ## Bytecode Emission (core/translate/emitter.rs) These are the core CDC code generation functions: | Function | Purpose | |----------|---------| | `prepare_cdc_if_necessary()` | Opens CDC table cursor if CDC is active and target != CDC table | | `emit_cdc_full_record()` | Reads all columns from cursor into a MakeRecord (for before/after image) | | `emit_cdc_patch_record()` | Builds record from in-flight register values (for after-image of INSERT/UPDATE) | | `emit_cdc_insns()` | Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. | | `emit_cdc_commit_insns()` | Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. | | `emit_cdc_autocommit_commit()` | End-of-statement COMMIT emission. Checks `is_autocommit()` at runtime — only emits COMMIT if in autocommit mode. v2 only. | ### COMMIT Emission Strategy (v2) Per-row call sites use `emit_cdc_insns()` (no COMMIT). End-of-statement sites call `emit_cdc_autocommit_commit()` which checks `is_autocommit()` at runtime: - **Autocommit mode:** emits a COMMIT record after the statement completes - **Explicit transaction (`BEGIN...COMMIT`):** skips per-statement COMMIT; the explicit `COMMIT` statement emits the COMMIT record via `emit_cdc_commit_insns()` This ensures multi-row statements like `INSERT INTO t VALUES (1),(2),(3)` produce one COMMIT at the end, not one per row. ## Integration Points — Where CDC Records Are Emitted ### INSERT — `core/translate/insert.rs` - **Per-row:** `emit_cdc_insns()` after insert, and before delete for REPLACE/conflict - **End-of-statement:** `emit_cdc_autocommit_commit()` in `emit_epilogue()` after the insert loop ### UPDATE — `core/translate/emitter.rs` - **Per-row:** captures before-image, after-image via patch record, emits `emit_cdc_insns()` - **End-of-statement:** `emit_cdc_autocommit_commit()` after the update loop ### DELETE — `core/translate/emitter.rs` - **Per-row:** captures before-image and emits `emit_cdc_insns()` - **End-of-statement:** `emit_cdc_autocommit_commit()` after the delete loop ### UPSERT (ON CONFLICT DO UPDATE) — `core/translate/upsert.rs` - **Per-row:** `emit_cdc_insns()` for all three cases: pure insert, update after conflict, replace - No end-of-statement COMMIT — upsert shares INSERT's epilogue ### Schema Changes (DDL) — `core/translate/schema.rs` - **CREATE TABLE:** `emit_cdc_insns()` (insert into `sqlite_schema`) + `emit_cdc_autocommit_commit()` - **DROP TABLE:** `emit_cdc_insns()` per-row in metadata loop + `emit_cdc_autocommit_commit()` after loop - **CREATE INDEX:** `emit_cdc_insns()` + `emit_cdc_autocommit_commit()` (`core/translate/schema.rs`) - **DROP INDEX:** `emit_cdc_insns()` per-row + `emit_cdc_autocommit_commit()` after loop (`core/translate/index.rs`) DDL in explicit transactions (`BEGIN; CREATE TABLE t(x); COMMIT`) does NOT emit per-statement COMMIT — the autocommit check prevents it. ### ALTER TABLE — `core/translate/update.rs` - Sets `cdc_update_alter_statement` on the update plan when CDC has updates mode ### Views/Triggers — Explicitly excluded - `core/translate/view.rs` — passes `None` for CDC cursor - `core/translate/trigger.rs` — passes `None` for CDC cursor ### Subqueries — No CDC - `core/translate/subquery.rs` — `cdc_cursor_id: None` ## Helper Functions (for reading CDC data) ### `table_columns_json_array(table_name)` — `core/function.rs`, `core/vdbe/execute.rs` Returns JSON array of column names for a table. Used to interpret binary records. ### `bin_record_json_object(columns_json, blob)` — `core/function.rs`, `core/vdbe/execute.rs` Decodes a binary record (from `before`/`after`/`updates` columns) into a JSON object using column names. ## Sync Engine Integration The sync engine is the primary consumer of CDC data. ### DatabaseTape — `sync/engine/src/database_tape.rs` - **CDC config:** `DEFAULT_CDC_TABLE_NAME = "turso_cdc"`, `DEFAULT_CDC_MODE = "full"` - **PRAGMA name:** `CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn"` - **Initialization:** `connect()` sets CDC pragma and caches `cdc_version` from `turso_cdc_version` table. Must be called before `iterate_changes()`. - **Version caching:** `cdc_version: RwLock>` — set by `connect()`, read by `iterate_changes()`. Panics if not set. - **Iterator:** `DatabaseChangesIterator` reads CDC table in batches, emits `DatabaseTapeOperation`. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch. `ignore_schema_changes: true` (default) filters out `sqlite_schema` row changes but not COMMIT records. ### Sync Operations — `sync/engine/src/database_sync_operations.rs` - **Change counting:** `SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?` ### Sync Engine — `sync/engine/src/database_sync_engine.rs` - **Initialization:** `open_db()` calls `main_tape.connect(coro)` to ensure CDC is set up and version is cached before any `iterate_changes()` calls. - During `apply_changes`, checks if CDC table existed, re-creates it after sync ### Replay Generator — `sync/engine/src/database_replay_generator.rs` - Requires `updates` column to be populated (full mode) ## Bindings CDC Surface All bindings expose `cdc_operations` as part of sync stats: | Binding | File | |---------|------| | Python | `bindings/python/src/turso_sync.rs` | | JavaScript | `bindings/javascript/sync/src/lib.rs` | | JS (generator) | `bindings/javascript/sync/src/generator.rs` | | Go | `bindings/go/bindings_sync.go` | | React Native | `bindings/react-native/src/types.ts` | | SDK Kit (C header) | `sync/sdk-kit/turso_sync.h` | | SDK Kit (Rust) | `sync/sdk-kit/src/bindings.rs` | ## Tests - **Integration tests:** `tests/integration/functions/test_cdc.rs` — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in `tests/integration/functions/mod.rs`. - **Sync engine tests:** `sync/engine/src/database_tape.rs` — CDC table reads, tape iteration, replay of schema changes. - **JS binding tests:** `bindings/javascript/sync/packages/{wasm,native}/promise.test.ts` Run: `cargo test -- test_cdc` (integration) or `cargo test -p turso_sync_engine -- database_tape` (sync engine). ## User-facing Documentation - **CLI manual page:** `cli/manuals/cdc.md` — accessible via `.manual cdc` in the REPL - **Database manual:** `docs/manual.md` — CDC section linked in TOC ## Key Design Decisions 1. **Per-connection, not per-database.** Each connection has its own CDC mode and can target different tables. 2. **Bytecode-level implementation.** CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers. 3. **Self-exclusion.** Changes to the CDC table and `turso_cdc_version` table are never captured (checked in `prepare_cdc_if_necessary`). 4. **Schema changes tracked.** DDL operations are recorded as changes to `sqlite_schema` table. 5. **Binary record format.** Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload). 6. **Transaction-aware.** CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries. 7. **Version tracking.** CDC schema version is recorded in `turso_cdc_version` table and carried in `CaptureDataChangesInfo.version` for future schema evolution. 8. **Atomic PRAGMA.** Connection CDC state is deferred via `pending_cdc_info` in `ProgramState` and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged. 9. **Per-statement COMMIT (v2).** COMMIT records are emitted once per statement (not per row), using `emit_cdc_autocommit_commit()` which checks `is_autocommit()` at runtime. In explicit transactions, only the final `COMMIT` emits a COMMIT CDC record. 10. **Backward-compatible version detection.** Pre-existing v1 CDC tables (without `turso_cdc_version`) are detected by checking table existence before creation. Existing tables get `CdcVersion::V1` inserted into the version table. 11. **Typed version enum.** `CdcVersion` enum with `#[repr(u8)]` and `Ord`/`PartialOrd` enables feature gating via integer comparison (`has_commit_record()` = `self >= V2`). `Display`/`FromStr` handles database round-trip. 12. **CDC and MVCC mutual exclusion.** Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.