--- name: eio description: "Eio concurrency patterns for OCaml applications. Use when Claude needs to: (1) Write concurrent OCaml code with Eio, (2) Handle network operations with cohttp-eio, (3) Manage resource lifecycles with switches, (4) Implement rate limiting or synchronization, (5) Create parallel operations with fibers, (6) Test async code with Eio_mock, (7) Integrate with bytesrw for streaming, or any other Eio-based concurrency tasks" --- # Eio Concurrency ## Core Concepts ### Why Eio Eio is an effects-based IO library for OCaml 5. Advantages over Lwt/Async: - **Direct-style code**: No monads, concurrent code looks like sequential code - **Performance**: Real stacks, no heap allocations to simulate continuations - **Better backtraces**: Exceptions show proper call traces - **Platform optimization**: Generic API with optimized backends (Linux io_uring, POSIX, Windows) ### Capability-Based Design Pass capabilities explicitly instead of using global resources: ```ocaml type t = { net : _ Eio.Net.t; clock : _ Eio.Time.clock; fs : _ Eio.Path.t; } (* Function signature reveals what resources it needs *) val connect : net:_ Eio.Net.t -> host:string -> connection ``` **Do**: Pass `net`, `clock`, `fs` explicitly—makes dependencies clear and testable. **Don't**: Use global modules like `Unix.gettimeofday` or access ambient resources. ### Structured Concurrency with Switches `Eio.Switch.run` manages resource and fiber lifecycles: ```ocaml Eio.Switch.run @@ fun sw -> let conn = connect ~sw server in (* conn automatically closed when sw exits *) process conn ``` **Do**: Create switches in the smallest possible scope. **Don't**: Take a switch argument if you could create one internally. ### Fibers Lightweight concurrent units running on a single core: ```ocaml (* Run two operations concurrently *) Eio.Fiber.both (fun () -> download file1) (fun () -> download file2) (* Only one fiber executes at a time until one performs an effect *) ``` ### Cancellation Cancellation contexts form a tree. Uncaught exceptions propagate upward, cancelling siblings: ```ocaml (* If one branch fails, the other gets Cancelled *) Eio.Fiber.both (fun () -> may_fail ()) (fun () -> other_work ()) (* receives Cancelled if may_fail raises *) ``` Use `Cancel.protect` for operations that must complete: ```ocaml Eio.Cancel.protect @@ fun () -> (* This won't be cancelled even if parent is *) flush_and_close connection ``` ## Common Patterns ### Concurrent Operations ```ocaml (* Parallel map *) let fetch_all_items api item_ids = Eio.Fiber.List.map (fun id -> Api.item_info api id) item_ids (* Race with timeout *) let with_timeout ~clock duration fn = Eio.Fiber.first (fun () -> Eio.Time.sleep clock duration; Error `Timeout) (fun () -> Ok (fn ())) (* Fork background task attached to switch *) Eio.Fiber.fork ~sw (fun () -> background_work ()) ``` ### File Operations ```ocaml let ( / ) = Eio.Path.( / ) (* Read file *) let content = Eio.Path.load (fs / "config.json") (* Write file with permissions *) Eio.Path.save ~create:(`Or_truncate 0o600) (fs / "data.bin") content (* Create directory if needed *) Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 (fs / "cache") (* Clean up directory tree *) Eio.Path.rmtree (fs / "tmp") (* Check file type *) match Eio.Path.kind ~follow:true path with | `Directory -> ... | `Regular_file -> ... | `Not_found -> ... ``` ### Network Operations ```ocaml (* TCP client *) Eio.Net.connect ~sw net (`Tcp (addr, port)) (* TCP server *) Eio.Net.run_server sock ~on_error:log_error ~max_connections:100 (fun ~sw flow addr -> handle_client ~sw flow) ``` ### HTTPS with cohttp-eio ```ocaml let https_handler uri raw_flow = let tls_config = create_tls_config () in let host = Uri.host uri |> Option.map Domain_name.(host_exn % of_string_exn) in Tls_eio.client_of_flow ?host tls_config raw_flow let client = Cohttp_eio.Client.make net ~https:(Some https_handler) ``` ### Buffered Reading Always use `max_size` to prevent memory exhaustion: ```ocaml let body = Eio.Buf_read.(of_flow ~max_size:10_000_000 flow |> take_all) ``` ### Synchronization Primitives **Mutex**: ```ocaml let mutex = Eio.Mutex.create () Eio.Mutex.use_rw mutex (fun () -> shared_state := new_value) ``` **Semaphore for rate limiting**: ```ocaml let limiter = Eio.Semaphore.make 10 let with_rate_limit fn = Eio.Semaphore.acquire limiter; Fun.protect ~finally:(fun () -> Eio.Semaphore.release limiter) fn ``` **Stream for producer/consumer**: ```ocaml let queue = Eio.Stream.create 100 Eio.Stream.add queue item (* producer *) let item = Eio.Stream.take queue (* consumer, blocks if empty *) ``` **Condition variables** (must check in loop): ```ocaml let cond = Eio.Condition.create () let mutex = Eio.Mutex.create () (* Waiter - MUST use while loop *) Eio.Mutex.use_rw ~protect:false mutex (fun () -> while not (check_condition ()) do Eio.Condition.await cond mutex done) (* Signaler *) Eio.Condition.broadcast cond ``` ## Integration with bytesrw Bridge Eio flows to bytesrw for streaming binary parsing: ```ocaml (* Eio flow → bytesrw reader *) let reader_of_flow ?(buf_len = 4096) flow = let cs = Cstruct.create buf_len in let read () = match Eio.Flow.single_read flow cs with | 0 -> Bytesrw.Bytes.Slice.eod | n -> let buf = Cstruct.to_bytes (Cstruct.sub cs 0 n) in Bytesrw.Bytes.Slice.make buf ~first:0 ~length:n | exception End_of_file -> Bytesrw.Bytes.Slice.eod in Bytesrw.Bytes.Reader.make read (* Then use with Binary.Reader *) let r = Binary.Reader.of_reader (reader_of_flow flow) let frame = Tc_frame.parse r (* effects propagate through *) ``` Note: This involves a copy from Cstruct (Bigarray) to bytes (heap). Unavoidable until OCaml has non-moving bytes. ## RNG Initialization **Critical for crypto**: Initialize RNG before any crypto operations. ```ocaml (* For Eio applications *) Eio_main.run @@ fun env -> Mirage_crypto_rng_eio.run (module Mirage_crypto_rng.Fortuna) env @@ fun () -> (* RNG now available *) let iv = Mirage_crypto_rng.generate 12 in ... (* For Unix applications (simpler but less integrated) *) Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna) ``` Failure to initialize results in predictable random numbers, breaking crypto security. ## Error Handling Errors use `Eio.Io (err, context)` with nested error codes: ```ocaml try Eio.Net.connect ~sw net addr with | Eio.Io (Eio.Net.E (Connection_failure _), _) -> (* Specific error *) Error `Connection_failed | Eio.Io (Eio.Net.E _, _) -> (* Any network error *) Error `Network_error | Eio.Io _ -> (* Any Eio error *) Error `Io_error ``` Add context when re-raising: ```ocaml Eio.Exn.reraise_with_context exn "while connecting to %s" host ``` For tests, hide backend details: ```ocaml Eio.Exn.Backend.show := false ``` ## Testing with Eio_mock Library: `eio.mock` (not `eio_mock`) ### Test Setup Pattern ```ocaml let setup_test f () = Mirage_crypto_rng_unix.use_default (); (* or initialize RNG *) Eio_main.run @@ fun env -> Eio.Switch.run @@ fun sw -> let fs = Eio.Stdenv.fs env in let tmp = Eio.Path.(fs / Filename.get_temp_dir_name () / "test-dir") in (try Eio.Path.rmtree tmp with _ -> ()); Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp; Fun.protect ~finally:(fun () -> try Eio.Path.rmtree tmp with _ -> ()) (fun () -> f ~sw tmp) let test_something = setup_test @@ fun ~sw tmp -> (* test code here *) ``` ### Mock Flow ```ocaml let test_api_call () = Eio_main.run @@ fun _env -> let flow = Eio_mock.Flow.make "response" in (* IMPORTANT: Always end with `Raise End_of_file *) Eio_mock.Flow.on_read flow [ `Return "{\"ok\": true}"; `Raise End_of_file; ]; (* use flow *) ``` ### Simulating Chunked Data ```ocaml let test_partial_reads () = Eio_main.run @@ fun _env -> let flow = Eio_mock.Flow.make "chunked" in Eio_mock.Flow.on_read flow [ `Return "\x00\x01"; (* First 2 bytes *) `Return "\x02\x03"; (* Next 2 bytes *) `Raise End_of_file; ]; (* Parser must handle values spanning chunks *) ``` ### Mock Network ```ocaml let test_network () = Eio_main.run @@ fun _env -> let net = Eio_mock.Net.make "mocknet" in let flow = Eio_mock.Flow.make "conn" in Eio_mock.Net.on_connect net [`Return flow]; Eio_mock.Flow.on_read flow [`Return "data"; `Raise End_of_file]; (* test network operations *) ``` ### Deadlock Detection `Eio_mock.Backend.run` automatically detects deadlocks in tests. ## Good and Bad Examples ### Capability Passing **Bad**: Using global/ambient resources ```ocaml let fetch_data url = let now = Unix.gettimeofday () in (* Global! *) let body = Http.get url in (* Where does network come from? *) { timestamp = now; data = body } ``` **Good**: Explicit capability parameters ```ocaml let fetch_data ~net ~clock url = let now = Eio.Time.now clock in Eio.Switch.run @@ fun sw -> let body = Http.get ~sw net url in { timestamp = now; data = body } ``` ### Switch Scope **Bad**: Taking switch as argument when not needed ```ocaml (* Why does this need a switch? Caller must manage lifetime *) let read_config ~sw ~fs path = Eio.Path.load (fs / path) (* No resources to manage! *) ``` **Good**: Create switch internally when needed ```ocaml (* No switch needed - just reads a file *) let read_config ~fs path = Eio.Path.load (fs / path) (* Switch needed - manages connection lifetime *) let with_connection ~sw ~net addr f = let conn = Eio.Net.connect ~sw net addr in f conn (* conn closed when sw exits *) ``` ### Buffered Reading **Bad**: No size limit allows memory exhaustion ```ocaml let read_body flow = Eio.Buf_read.(of_flow flow |> take_all) (* Unbounded! *) ``` **Good**: Always specify max_size ```ocaml let read_body flow = Eio.Buf_read.(of_flow ~max_size:10_000_000 flow |> take_all) ``` ### Condition Variables **Bad**: Single check can miss wakeups ```ocaml Eio.Mutex.use_rw ~protect:false mutex (fun () -> if not (check_condition ()) then (* Single if! *) Eio.Condition.await cond mutex; do_work ()) ``` **Good**: Loop until condition is true ```ocaml Eio.Mutex.use_rw ~protect:false mutex (fun () -> while not (check_condition ()) do (* Loop handles spurious wakeups *) Eio.Condition.await cond mutex done; do_work ()) ``` ### Error Handling **Bad**: Catching all Eio errors indiscriminately ```ocaml let connect addr = try Eio.Net.connect ~sw net addr with Eio.Io _ -> failwith "connection failed" (* Loses context! *) ``` **Good**: Match specific errors, preserve context ```ocaml let connect addr = try Eio.Net.connect ~sw net addr with | Eio.Io (Eio.Net.E (Connection_failure reason), _) -> Error (`Connection_failed reason) | Eio.Io (Eio.Net.E Connection_refused, _) -> Error `Refused | Eio.Io _ as exn -> Eio.Exn.reraise_with_context exn "connecting to %s" addr ``` ### Mock Testing **Bad**: Mock flow without End_of_file hangs forever ```ocaml let test_read () = Eio_main.run @@ fun _env -> let flow = Eio_mock.Flow.make "test" in Eio_mock.Flow.on_read flow [ `Return "data"; (* Missing End_of_file! Test hangs *) ]; read_all flow ``` **Good**: Always end mock reads with End_of_file ```ocaml let test_read () = Eio_main.run @@ fun _env -> let flow = Eio_mock.Flow.make "test" in Eio_mock.Flow.on_read flow [ `Return "data"; `Raise End_of_file; (* Required! *) ]; read_all flow ``` ### Cancellation Safety **Bad**: Critical cleanup can be cancelled ```ocaml Eio.Fiber.both (fun () -> may_fail ()) (fun () -> do_work (); flush_and_close connection) (* May be cancelled mid-flush! *) ``` **Good**: Protect critical operations ```ocaml Eio.Fiber.both (fun () -> may_fail ()) (fun () -> do_work (); Eio.Cancel.protect @@ fun () -> flush_and_close connection) (* Completes even if cancelled *) ``` ## Common Gotchas | Issue | Problem | Solution | |-------|---------|----------| | Racing reads | Both reads may complete at kernel level | Don't rely on `Fiber.first` for mutual exclusion | | Signal handlers | Most operations risk deadlock | Only `Eio.Condition.broadcast` is safe | | Condition spurious wakeup | May wake without signal | Always check condition in `while` loop | | Mutex in signal handler | Re-entrancy issues | Use conditions instead | | Forgetting End_of_file | Mock flow hangs | Always end mock reads with `Raise End_of_file` | | Missing RNG init | Predictable crypto | Initialize before any crypto operations | ## Library Integration | Library | Purpose | Notes | |---------|---------|-------| | Lwt_eio | Run Lwt + Eio together | Gradual migration path | | Async_eio | Run Async + Eio together | Experimental | | bytesrw | Streaming byte parsing | Requires copy (Cstruct→bytes) | | cohttp-eio | HTTP client/server | Native Eio support | | tls-eio | TLS connections | Use with cohttp-eio for HTTPS | | mirage-crypto-rng | Crypto RNG | Use .unix sublibrary for entropy | ## Best Practices | Practice | Description | |----------|-------------| | Capability passing | Pass `net`, `clock`, `fs` explicitly | | Structured concurrency | Use `Switch.run`, keep scopes small | | Handle errors at boundaries | Convert `Eio.Io` to domain errors | | Buffered reading | Use `Buf_read` with `max_size` | | Parallel with fibers | `Fiber.List.map`, `Fiber.both`, `Fiber.first` | | Isolate blocking | `Eio_unix.run_in_systhread` for non-Eio blocking | | Test with mocks | `Eio_mock` for deterministic tests | | Initialize RNG | Before any crypto operations | ## References - Eio repository: https://github.com/ocaml-multicore/eio - Eio documentation: https://ocaml.org/p/eio/latest/doc/ - cohttp-eio: https://github.com/mirage/ocaml-cohttp - bytesrw: https://erratique.ch/software/bytesrw