--- name: pyo3-interop description: Rust↔Python interop architecture in duroxide-python. Use when modifying the PyO3 bridge, adding ScheduledTask types, fixing GIL deadlocks, changing tracing delegation, or debugging block_in_place / with_gil behavior. --- # PyO3 Interop Architecture ## Overview duroxide-python bridges Rust's duroxide runtime to Python via PyO3/maturin. The interop has two distinct paths — orchestrations (generator-based, synchronous blocking) and activities (synchronous GIL calls). Getting this wrong causes GIL deadlocks, silent replay corruption, or dropped futures. ## File Map | File | Role | |------|------| | `src/handlers.rs` | Core interop — orchestration handler loop, activity invocation, global context maps, select/race/join, activity cancellation | | `src/types.rs` | `ScheduledTask` enum — the protocol between Python and Rust | | `src/lib.rs` | PyO3 module entry point, `#[pyfunction]` trace functions | | `src/runtime.rs` | `PyRuntime` — wraps `duroxide::Runtime`, global tokio runtime | | `src/client.rs` | `PyClient` — wraps `duroxide::Client`, all methods with `py.allow_threads()` | | `src/provider.rs` | `PySqliteProvider` | | `src/pg_provider.rs` | `PyPostgresProvider` | | `python/duroxide/__init__.py` | Python wrapper: SqliteProvider, PostgresProvider, Client, Runtime, decorators | | `python/duroxide/context.py` | OrchestrationContext, ActivityContext | | `python/duroxide/driver.py` | Generator driver: create_generator, next_step, dispose_generator | ## ⚠️ Critical: GIL Deadlock Problem This is the most important difference between duroxide-python and duroxide-node. **Get this wrong and the process deadlocks.** ### The Problem PyO3 holds the GIL when Python calls into Rust `#[pymethods]`. If that method calls `TOKIO_RT.block_on()`, it blocks the thread while holding the GIL. Meanwhile, orchestration handlers running on tokio threads need the GIL via `Python::with_gil()` — **deadlock**. ``` Thread A (Python → Rust): client.wait_for_orchestration() → PyO3 holds GIL → TOKIO_RT.block_on(async { ... }) ← BLOCKS, holding GIL Thread B (Tokio → Python): orchestration handler invoked → block_in_place + Python::with_gil() ← BLOCKS, waiting for GIL ``` ### The Fix EVERY method that calls `block_on` must use `py.allow_threads()` to release the GIL before blocking: ```rust fn wait_for_orchestration(&self, py: Python<'_>, id: String, timeout: u64) -> PyResult<...> { py.allow_threads(|| { TOKIO_RT.block_on(async { self.client.wait_for_orchestration(&id, timeout).await .map_err(|e| format!("{e}")) }) }) .map_err(PyRuntimeError::new_err) } ``` This pattern is applied to ALL 20+ methods in `client.rs` and `runtime.rs`. ### Error Handling Across the Boundary `PyErr` is not `Send`, so you can't return `PyResult` from inside `allow_threads`. Pattern: 1. Inside `allow_threads`: map errors to `String` via `.map_err(|e| format!("{e}"))` 2. Outside `allow_threads`: map `String` to `PyErr` via `.map_err(PyRuntimeError::new_err)` ### Rules for ANY New Method 1. **Add `py: Python<'_>` parameter** to the method signature 2. **Wrap `TOKIO_RT.block_on()` in `py.allow_threads(|| { ... })`** 3. **Map errors to `String` inside, to `PyErr` outside** 4. **Never hold the GIL while blocking on tokio** ## Orchestration Interop (Blocking Generator Loop) The replay engine calls `poll_once()` on the handler future. If the future isn't ready in one poll, it's **dropped**. **Solution: `block_in_place` + `with_gil`** ```rust fn call_create_blocking(&self, payload: String) -> Result { tokio::task::block_in_place(|| { Python::with_gil(|py| { let result = self.create_fn.call1(py, (payload,))?; // parse result... }) }) } ``` ### Orchestration Handler Sequence ``` Rust (tokio thread) Python (GIL) ─────────────────── ──────────────────── 1. invoke(ctx, input) ├─ Store ctx in ORCHESTRATION_CTXS[instance_id] ├─ call_create_blocking(payload) ──────► create_generator(payload) │ (block_in_place + with_gil) ├─ Create OrchestrationContext │ ├─ Create generator: fn(ctx, input) │ ├─ gen.send(None) → first yield │ └─ Return {"status": "yielded", "task": ...} │◄────────────────────────────────────────┘ ├─ Loop: │ ├─ execute_task(ctx, task) // Real DurableFuture or replay │ ├─ call_next_blocking(result) ──────► next_step(result) │ │ (block_in_place + with_gil) ├─ gen.send(value) or gen.throw(exc) │ │ └─ Return next task or completion │ │◄────────────────────────────────────┘ │ └─ If completed/error: break └─ Remove ctx from ORCHESTRATION_CTXS ``` ## Activity Interop (Synchronous GIL Call) Activities in duroxide-python are **synchronous** functions (unlike duroxide-node's async activities). They run on tokio threads via `block_in_place` + `with_gil`: ``` Rust Python ──── ────── invoke(ctx, input) ├─ Generate unique token (act-0, act-1, ...) ├─ Store ctx in ACTIVITY_CTXS[token] ├─ block_in_place + with_gil ─────────────► wrapped_fn(payload) │ ├─ Parse ctx, create ActivityContext │ ├─ Call user's function (synchronous) │ └─ Return JSON result │◄───────────────────────────────────────────┘ └─ Remove token from ACTIVITY_CTXS ``` Users can use `asyncio.run()` inside activities if they need async I/O. ## Cross-Thread Tracing Python callbacks acquire the GIL on tokio threads. Rust contexts live on tokio threads. Global `HashMap`s bridge the two: ```rust static ACTIVITY_CTXS: LazyLock>> static ORCHESTRATION_CTXS: LazyLock>> ``` **Python calls PyO3 functions that look up the Rust context:** ```python # In OrchestrationContext (fire-and-forget, no yield) def trace_info(self, message): orchestration_trace_log(self.instance_id, "info", str(message)) # In ActivityContext (fire-and-forget) def trace_info(self, message): activity_trace_log(self._trace_token, "info", str(message)) ``` ### Rules for Tracing 1. **Never expose `is_replaying` to Python** — Rust `OrchestrationContext.trace()` handles suppression 2. **Always use global maps, not thread-locals** — Python runs on a different thread 3. **Clean up map entries on ALL exit paths** — leaked entries cause stale traces 4. **Use atomic tokens for activities** (not instance_id) — multiple activities for the same instance can run concurrently ## ScheduledTask Protocol Python yields plain dicts. Rust deserializes them via `serde_json` into `ScheduledTask` enum variants: ```rust #[derive(Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum ScheduledTask { Activity { name: String, input: String }, ActivityWithRetry { name: String, input: String, retry: RetryPolicyConfig }, Timer { delay_ms: u64 }, WaitEvent { name: String }, SubOrchestration { name: String, input: String }, SubOrchestrationWithId { name: String, instance_id: String, input: String }, SubOrchestrationVersioned { name: String, version: String, input: String }, Orchestration { name: String, instance_id: String, input: String }, OrchestrationVersioned { name: String, version: String, instance_id: String, input: String }, NewGuid, UtcNow, ContinueAsNew { input: String }, ContinueAsNewVersioned { input: String, version: String }, Join { tasks: Vec }, Select { tasks: Vec }, } ``` ### Adding a New ScheduledTask Type 1. Add variant to `ScheduledTask` in `src/types.rs` with correct `serde` attributes 2. Add execution branch in `execute_task()` in `src/handlers.rs` 3. If it should work in `select/race`, add branch in `make_select_future()` 4. If it should work in `join/all`, add branch in `make_join_future()` 5. Add Python method to `OrchestrationContext` in `python/duroxide/context.py` 6. Add test in `tests/test_e2e.py` 7. Rebuild: `maturin develop` ## Global Tokio Runtime ```rust static TOKIO_RT: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .expect("failed to build tokio runtime") }); ``` All async operations go through `TOKIO_RT.block_on()` (with GIL released via `py.allow_threads()`). No pyo3-async-runtimes needed. ## Provider Polymorphism PyO3 doesn't support trait objects in constructors. Python wrapper detects provider type: ```python class Runtime: def __init__(self, provider, options=None): if getattr(provider, "_type", None) == "postgres": self._native = PyRuntime.from_postgres(provider._native, options) else: self._native = PyRuntime.from_sqlite(provider._native, options) ``` ## PyO3 Object Mutability `#[pyclass(get_all)]` makes fields readable but NOT writable from Python. Even with `set_all`, setting a Python `dict` to an `Option` field fails with a type mismatch. **Solution:** Create pure Python wrapper objects (e.g., `OrchestrationResult`) instead of mutating PyO3 objects: ```python class OrchestrationResult: def __init__(self, status, output=None, error=None): self.status = status self.output = output # parsed from JSON self.error = error def _parse_status(raw): output = raw.output if output is not None: output = json.loads(output) return OrchestrationResult(status=raw.status, output=output, error=raw.error) ``` ## Common Pitfalls | Pitfall | What Happens | Fix | |---------|-------------|-----| | Missing `py.allow_threads()` around `block_on` | GIL deadlock — process hangs forever | Wrap ALL `TOKIO_RT.block_on()` calls | | Returning `PyErr` from inside `allow_threads` | Compile error — `PyErr` is not `Send` | Map to `String` inside, `PyErr` outside | | Thread-local for cross-thread context | Lookup returns `None` — traces silently fail | Use global `HashMap` | | Mutating PyO3 `#[pyclass]` fields from Python | `TypeError` or silently ignored | Use Python wrapper objects | | `cargo build` instead of `maturin develop` | Python imports stale `.so` — changes don't take effect | Always use `maturin develop` | | Missing `serde(rename_all)` on new ScheduledTask variants | Deserialization fails — task type not recognized | Match Python naming convention (camelCase in JSON) | ## Build Requirements ```bash # MUST use maturin (not cargo build) for the Python extension source .venv/bin/activate maturin develop # Debug build + install maturin develop --release # Release build + install maturin build --release # Build wheel for distribution # cargo build alone produces a .dylib/.so that Python can't find ```