--- name: Streams description: This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing. version: 1.0.0 --- # Streams in Effect ## Overview Effect Streams provide: - **Lazy evaluation** - Elements produced on demand - **Resource safety** - Automatic cleanup - **Backpressure** - Producer/consumer coordination - **Composition** - Transform, filter, merge streams - **Error handling** - Typed errors in stream pipeline ```typescript Stream // Produces values of type A // May fail with error E // Requires environment R ``` ## Creating Streams ### From Values ```typescript import { Stream } from "effect" // From known values const numbers = Stream.make(1, 2, 3, 4, 5) // From iterable const fromArray = Stream.fromIterable([1, 2, 3]) // Empty stream const empty = Stream.empty // Single value const single = Stream.succeed(42) // Infinite stream const infinite = Stream.iterate(1, (n) => n + 1) ``` ### From Effects ```typescript // Single effect as stream const fromEffect = Stream.fromEffect(fetchData()) // Repeat effect const polling = Stream.repeatEffect(checkStatus()) // Repeat with schedule const scheduled = Stream.repeatEffectWithSchedule( checkStatus(), Schedule.spaced("5 seconds") ) ``` ### From Async Sources ```typescript // From async iterable const fromAsyncIterable = Stream.fromAsyncIterable( asyncGenerator(), (error) => new StreamError({ cause: error }) ) // From callback/event emitter const fromCallback = Stream.async((emit) => { const handler = (value: number) => emit.single(value) eventEmitter.on("data", handler) return Effect.sync(() => eventEmitter.off("data", handler)) }) // From queue const fromQueue = Stream.fromQueue(queue) ``` ### Generating Streams ```typescript // Unfold from seed const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1])) // Range const range = Stream.range(1, 100) // Repeat value const repeated = Stream.repeat(Stream.succeed("ping")).pipe( Stream.take(5) ) ``` ## Transforming Streams ### map - Transform Elements ```typescript const doubled = numbers.pipe( Stream.map((n) => n * 2) ) // With Effect const enriched = users.pipe( Stream.mapEffect((user) => fetchProfile(user.id)) ) // Parallel mapping const parallel = items.pipe( Stream.mapEffect(process, { concurrency: 10 }) ) ``` ### filter - Select Elements ```typescript const evens = numbers.pipe( Stream.filter((n) => n % 2 === 0) ) // With Effect const valid = items.pipe( Stream.filterEffect((item) => validate(item)) ) ``` ### flatMap - Nested Streams ```typescript const expanded = numbers.pipe( Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)) ) // 1, 10, 100, 2, 20, 200, ... ``` ### take/drop ```typescript const first5 = numbers.pipe(Stream.take(5)) const skip5 = numbers.pipe(Stream.drop(5)) const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10)) const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10)) ``` ## Combining Streams ### concat - Sequential ```typescript const combined = Stream.concat(stream1, stream2) // or const combined = stream1.pipe(Stream.concat(stream2)) ``` ### merge - Interleaved ```typescript // Interleave elements from both const merged = Stream.merge(stream1, stream2) // Merge multiple const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 }) ``` ### zip - Pair Elements ```typescript const zipped = Stream.zip(names, ages) // Stream<[string, number]> // With function const combined = Stream.zipWith( names, ages, (name, age) => ({ name, age }) ) ``` ### interleave ```typescript const interleaved = Stream.interleave(stream1, stream2) // a1, b1, a2, b2, ... ``` ## Consuming Streams ### Running to Collection ```typescript // To array const array = yield* Stream.runCollect(numbers) // Returns Chunk // To single value (first) const first = yield* Stream.runHead(numbers) // Returns Option // Fold/reduce const sum = yield* Stream.runFold( numbers, 0, (acc, n) => acc + n ) ``` ### Running for Effects ```typescript // Execute effect for each element yield* numbers.pipe( Stream.runForEach((n) => Effect.log(`Got: ${n}`)) ) // Drain (consume, ignore values) yield* numbers.pipe(Stream.runDrain) ``` ### Running to Sink ```typescript import { Sink } from "effect" // Custom sink const sum = yield* numbers.pipe( Stream.run(Sink.sum) ) // Collect to array const array = yield* numbers.pipe( Stream.run(Sink.collectAll()) ) ``` ## Chunking Streams process elements in chunks for efficiency: ```typescript // Chunk elements const chunked = numbers.pipe( Stream.grouped(10) // Stream of 10-element chunks ) // Process by chunk const processed = numbers.pipe( Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)) ) // Re-chunk const rechunked = numbers.pipe( Stream.rechunk(100) ) ``` ## Error Handling ```typescript // Catch errors const safe = stream.pipe( Stream.catchAll((error) => Stream.succeed(fallbackValue)) ) // Catch specific error const handled = stream.pipe( Stream.catchTag("NetworkError", (error) => Stream.succeed(cachedValue) ) ) // Retry on failure const resilient = stream.pipe( Stream.retry(Schedule.exponential("1 second")) ) // On error, switch to fallback stream const withFallback = stream.pipe( Stream.orElse(() => fallbackStream) ) ``` ## Resource Management ```typescript // Stream with resource lifecycle const fileStream = Stream.acquireRelease( Effect.sync(() => fs.openSync("data.txt", "r")), (fd) => Effect.sync(() => fs.closeSync(fd)) ).pipe( Stream.flatMap((fd) => Stream.repeatEffectOption( Effect.sync(() => { const buffer = Buffer.alloc(1024) const bytes = fs.readSync(fd, buffer) return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none() }) ) ) ) // Scoped streams const scoped = Stream.scoped( Effect.acquireRelease(openConnection, closeConnection) ) ``` ## Sinks Sinks consume stream elements: ```typescript import { Sink } from "effect" // Built-in sinks Sink.sum // Sum numbers Sink.count // Count elements Sink.head // First element Sink.last // Last element Sink.collectAll() // Collect to Chunk Sink.forEach(f) // Run effect for each // Custom sink const maxSink = Sink.foldLeft( Number.NEGATIVE_INFINITY, (max, n: number) => Math.max(max, n) ) ``` ## Common Patterns ### Batched Processing ```typescript const batchProcess = stream.pipe( Stream.grouped(100), Stream.mapEffect((batch) => Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch))) ) ) ``` ### Rate Limiting ```typescript const rateLimited = stream.pipe( Stream.throttle({ units: 1, duration: "100 millis", strategy: "shape" }) ) ``` ### Debouncing ```typescript const debounced = stream.pipe( Stream.debounce("500 millis") ) ``` ### Windowing ```typescript // Time-based windows const windows = stream.pipe( Stream.groupedWithin(1000, "1 second") ) ``` ## Best Practices 1. **Use chunking for efficiency** - Batch operations when possible 2. **Handle backpressure** - Use appropriate buffer strategies 3. **Clean up resources** - Use acquireRelease for external resources 4. **Process in parallel** - Use concurrency option in mapEffect 5. **Handle errors early** - Catch/retry before final consumption ## Additional Resources For comprehensive stream documentation, consult `${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt`. Search for these sections: - "Creating Streams" for stream construction - "Consuming Streams" for running streams - "Operations" for transformations - "Error Handling in Streams" for error patterns - "Resourceful Streams" for resource management - "Sink" for custom sinks