--- name: Concurrency description: This skill should be used when the user asks about "Effect concurrency", "fibers", "Fiber", "forking", "Effect.fork", "Effect.forkDaemon", "parallel execution", "Effect.all concurrency", "Deferred", "Queue", "PubSub", "Semaphore", "Latch", "fiber interruption", "Effect.race", "Effect.raceAll", "concurrent effects", or needs to understand how Effect handles parallel and concurrent execution. version: 1.0.0 --- # Concurrency in Effect ## Overview Effect provides lightweight fiber-based concurrency: - **Fibers** - Lightweight threads managed by Effect runtime - **Structured concurrency** - Parent fibers supervise children - **Safe interruption** - Clean cancellation with resource cleanup - **Concurrent primitives** - Queue, Deferred, Semaphore, PubSub ## Basic Parallel Execution ### Effect.all with Concurrency ```typescript import { Effect } from "effect" // Run in parallel const results = yield* Effect.all( [fetchUser(1), fetchUser(2), fetchUser(3)], { concurrency: "unbounded" } ) // Limit concurrency const results = yield* Effect.all(tasks, { concurrency: 5 }) // Sequential (default) const results = yield* Effect.all(tasks) ``` ### Effect.forEach with Concurrency ```typescript const users = yield* Effect.forEach( userIds, (id) => fetchUser(id), { concurrency: 10 } ) ``` ## Fibers ### Creating Fibers with fork ```typescript const program = Effect.gen(function* () { // Fork creates a new fiber const fiber = yield* Effect.fork(longRunningTask) // Do other work while fiber runs yield* doOtherWork() // Wait for fiber to complete const result = yield* Fiber.join(fiber) }) ``` ### Fork Variants ```typescript // Regular fork - child supervised by parent const fiber = yield* Effect.fork(task) // Daemon fork - runs independently const fiber = yield* Effect.forkDaemon(task) // Fork in specific scope const fiber = yield* Effect.forkIn(scope)(task) // Fork to different executor const fiber = yield* Effect.forkWithErrorHandler(task, onError) ``` ### Fiber Operations ```typescript import { Fiber } from "effect" // Wait for result const result = yield* Fiber.join(fiber) // Wait but don't unwrap (get Exit) const exit = yield* Fiber.await(fiber) // Interrupt fiber yield* Fiber.interrupt(fiber) // Poll without blocking const maybeResult = yield* Fiber.poll(fiber) ``` ## Racing ### Effect.race - First to Complete ```typescript // First successful result wins, others interrupted const fastest = yield* Effect.race( fetchFromServer1(), fetchFromServer2() ) ``` ### Effect.raceAll - Race Many ```typescript const fastest = yield* Effect.raceAll([ fetchFromCDN1(), fetchFromCDN2(), fetchFromCDN3() ]) ``` ### Effect.raceFirst - Include Failures ```typescript // First to complete (success OR failure) const first = yield* Effect.raceFirst(task1, task2) ``` ## Deferred - One-Time Promise ```typescript import { Deferred } from "effect" const program = Effect.gen(function* () { // Create deferred const deferred = yield* Deferred.make() // Fork waiter const fiber = yield* Effect.fork( Effect.gen(function* () { const value = yield* Deferred.await(deferred) yield* Effect.log(`Got: ${value}`) }) ) // Complete the deferred yield* Deferred.succeed(deferred, "Hello!") yield* Fiber.join(fiber) }) ``` ## Queue - Concurrent Queue ```typescript import { Queue } from "effect" const program = Effect.gen(function* () { // Bounded queue (backpressure) const queue = yield* Queue.bounded(100) // Producer yield* Effect.fork( Effect.forEach( [1, 2, 3, 4, 5], (n) => Queue.offer(queue, n) ) ) // Consumer const items = yield* Effect.forEach( Array.from({ length: 5 }), () => Queue.take(queue) ) }) ``` ### Queue Variants ```typescript // Bounded - blocks when full const bounded = yield* Queue.bounded(100) // Unbounded - never blocks producer const unbounded = yield* Queue.unbounded() // Dropping - drops new items when full const dropping = yield* Queue.dropping(100) // Sliding - drops old items when full const sliding = yield* Queue.sliding(100) ``` ## PubSub - Publish/Subscribe ```typescript import { PubSub } from "effect" const program = Effect.gen(function* () { const pubsub = yield* PubSub.bounded(100) // Subscribe creates a queue const sub1 = yield* PubSub.subscribe(pubsub) const sub2 = yield* PubSub.subscribe(pubsub) // Publish to all subscribers yield* PubSub.publish(pubsub, "Hello!") // Each subscriber receives message const msg1 = yield* Queue.take(sub1) const msg2 = yield* Queue.take(sub2) }) ``` ## Semaphore - Limit Concurrency ```typescript import { Effect } from "effect" const program = Effect.gen(function* () { // Create semaphore with 3 permits const semaphore = yield* Effect.makeSemaphore(3) // At most 3 concurrent executions yield* Effect.forEach( tasks, (task) => semaphore.withPermits(1)(task), { concurrency: "unbounded" } ) }) ``` ## Latch - Coordination Point ```typescript import { Latch } from "effect" const program = Effect.gen(function* () { // Create closed latch const latch = yield* Latch.make(false) // Workers wait at latch yield* Effect.fork( Effect.forEach( workers, (worker) => Effect.gen(function* () { yield* Latch.await(latch) yield* worker.start() }), { concurrency: "unbounded" } ) ) // Open latch - all workers proceed yield* Latch.open(latch) }) ``` ## Interruption ### Interrupting Fibers ```typescript const fiber = yield* Effect.fork(longTask) // Later... yield* Fiber.interrupt(fiber) ``` ### Uninterruptible Regions ```typescript // Protect critical section from interruption const critical = Effect.uninterruptible( Effect.gen(function* () { yield* beginTransaction() yield* performOperations() yield* commitTransaction() }) ) ``` ### Interruptible Within Uninterruptible ```typescript const program = Effect.uninterruptible( Effect.gen(function* () { yield* criticalSetup() // This part can be interrupted yield* Effect.interruptible(longOperation) yield* criticalTeardown() }) ) ``` ## Supervision Structured concurrency ensures child fibers are managed: ```typescript const parent = Effect.gen(function* () { const child1 = yield* Effect.fork(task1) const child2 = yield* Effect.fork(task2) // If parent fails/interrupts, children are interrupted yield* failingOperation() }) // child1 and child2 automatically interrupted ``` ### Daemon Fibers Escape supervision with daemon: ```typescript const daemon = yield* Effect.forkDaemon(backgroundTask) // Runs independently of parent ``` ## Common Patterns ### Timeout with Fallback ```typescript const withTimeout = task.pipe( Effect.timeout("5 seconds"), Effect.map(Option.getOrElse(() => defaultValue)) ) ``` ### Worker Pool ```typescript const workerPool = Effect.gen(function* () { const semaphore = yield* Effect.makeSemaphore(numWorkers) return (task: Effect.Effect) => semaphore.withPermits(1)(task) }) ``` ### Parallel with Error Collection ```typescript const results = yield* Effect.all( tasks, { concurrency: "unbounded", mode: "either" // Collect all results } ) ``` ## Best Practices 1. **Use Effect.all concurrency** for simple parallelism 2. **Use Semaphore** to limit concurrent operations 3. **Prefer structured concurrency** over daemon fibers 4. **Handle interruption** in long-running effects 5. **Use Queue for producer/consumer** patterns 6. **Use Deferred for one-time coordination** ## Additional Resources For comprehensive concurrency documentation, consult `${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt`. Search for these sections: - "Fibers" for fiber management - "Basic Concurrency" for parallel execution - "Deferred" for synchronization primitives - "Queue" for concurrent queues - "PubSub" for publish/subscribe - "Semaphore" for concurrency limiting