()((query, ctx) =>
Effect.gen(function* () {
// Create stream
const stream = yield* SearchKernel.pipe(
Effect.flatMap((k) => k.searchStream(query))
)
// Initialize state
ctx.set(statusAtom, 'streaming')
ctx.set(resultsAtom, [])
// Consume stream progressively
yield* Stream.runForEach(stream, (item) =>
Effect.sync(() => {
const prev = ctx.get(resultsAtom)
ctx.set(resultsAtom, [...prev, item])
})
)
// Finalize
ctx.set(statusAtom, 'complete')
})
)
// React component
function SearchResults() {
const results = useAtomValue(resultsAtom)
const status = useAtomValue(statusAtom)
return (
{status === 'streaming' && }
)
}
```
**Key Pattern:**
1. Create state atoms at module level
2. Stream emits chunks progressively
3. `Stream.runForEach` updates atoms via `ctx.set`
4. React re-renders on each atom update
5. UI shows progressive results
**TMNL Example** (`src/lib/data-manager/v1/atoms/index.ts:206`):
```typescript
export const doSearch = runtimeAtom.fn<{ query: string; limit: number }>()(
({ query, limit }, ctx) =>
Effect.gen(function* () {
const dm = yield* DataManager
const stream = yield* dm.searchStream(query, limit)
ctx.set(statusAtom, 'streaming')
ctx.set(resultsAtom, [])
yield* Stream.runForEach(stream, (result) =>
Effect.sync(() => {
const prev = ctx.get(resultsAtom)
ctx.set(resultsAtom, [...prev, result])
})
)
ctx.set(statusAtom, 'complete')
})
)
```
---
### Pattern 8: Error Handling
**catchAll** — Recover from errors:
```typescript
const recovered = stream.pipe(
Stream.catchAll((error) =>
Stream.succeed({ error: error.message })
)
)
```
**retry** — Retry on failure:
```typescript
const retried = stream.pipe(
Stream.retry(Schedule.exponential('100 millis').pipe(
Schedule.compose(Schedule.recurs(3))
))
)
```
**orElse** — Fallback stream:
```typescript
const withFallback = primaryStream.pipe(
Stream.orElse(() => fallbackStream)
)
```
---
### Pattern 9: Stream Merging & Combining
**mergeAll** — Merge multiple streams:
```typescript
const merged = Stream.mergeAll(
stream1,
stream2,
stream3
)
```
**concat** — Concatenate streams:
```typescript
const concatenated = stream1.pipe(
Stream.concat(stream2)
)
```
**zip** — Combine elements pairwise:
```typescript
const zipped = stream1.pipe(
Stream.zip(stream2)
)
// Emits: [a1, b1], [a2, b2], ...
```
**interleave** — Alternate between streams:
```typescript
const interleaved = stream1.pipe(
Stream.interleave(stream2)
)
// Emits: a1, b1, a2, b2, a3, b3, ...
```
---
### Pattern 10: Resource-Managed Streams
**Stream.acquireRelease** — Managed resources:
```typescript
const fileStream = Stream.acquireRelease(
Effect.tryPromise(() => fs.open('file.txt')),
(handle) => Effect.sync(() => handle.close())
).pipe(
Stream.flatMap((handle) =>
Stream.fromIterable(handle.readLines())
)
)
```
**Stream.ensuring** — Run effect on completion:
```typescript
const logged = stream.pipe(
Stream.ensuring(
Effect.sync(() => console.log('Stream completed'))
)
)
```
## Examples
### Example 1: Infinite Ticker with Scan
```typescript
import { Stream, Schedule, Effect } from 'effect'
const counter = Stream.fromSchedule(Schedule.spaced('1 second')).pipe(
Stream.scan(0, (count) => count + 1),
Stream.take(10)
)
await Stream.runForEach(counter, (n) =>
Effect.sync(() => console.log(`Tick ${n}`))
).pipe(Effect.runPromise)
```
### Example 2: WebSocket with Error Handling
```typescript
import { Stream, Effect } from 'effect'
const wsStream = Stream.async((emit) => {
const ws = new WebSocket('wss://example.com')
ws.onmessage = (event) => emit.single(event.data)
ws.onerror = () => emit.fail(new Error('Connection failed'))
ws.onclose = () => emit.end()
return Effect.sync(() => ws.close())
}).pipe(
Stream.retry(Schedule.exponential('1 second').pipe(
Schedule.compose(Schedule.recurs(3))
)),
Stream.catchAll((error) =>
Stream.succeed(`Error: ${error.message}`)
)
)
```
### Example 3: Batched API Polling
```typescript
import { Stream, Schedule, Effect } from 'effect'
const pollUsers = Stream.fromSchedule(Schedule.spaced('5 seconds')).pipe(
Stream.mapEffect(() =>
Effect.tryPromise(() =>
fetch('/api/users').then(r => r.json())
)
),
Stream.take(10),
Stream.rechunk(3) // Batch 3 responses together
)
await Stream.runForEach(pollUsers, (batch) =>
Effect.sync(() => console.log(`Batch:`, batch))
).pipe(Effect.runPromise)
```
### Example 4: Stream-to-Atom (TMNL Testbed)
```typescript
import { Atom } from '@effect-atom/atom-react'
import { Stream, Schedule, Effect } from 'effect'
// Create stream atom
const tickerAtom = Atom.make(
Stream.fromSchedule(Schedule.spaced('1 second')).pipe(
Stream.scan(0, (n) => n + 1),
Stream.take(10)
)
)
// React component
function Ticker() {
const result = useAtomValue(tickerAtom)
if (Result.isInitial(result)) return Starting...
if (Result.isSuccess(result)) return Count: {result.value}
return Error: {result.error.message}
}
```
## Anti-Patterns
### 1. Not Consuming Streams
```typescript
// WRONG — Stream is lazy, nothing happens
const stream = Stream.fromIterable([1, 2, 3]).pipe(
Stream.map((n) => n * 2)
)
// CORRECT — Must consume
const result = await Stream.runCollect(stream).pipe(Effect.runPromise)
```
### 2. Ignoring Errors
```typescript
// WRONG — Errors crash stream
const stream = Stream.fromIterable(urls).pipe(
Stream.mapEffect((url) => Effect.tryPromise(() => fetch(url)))
)
// CORRECT — Handle errors
const stream = Stream.fromIterable(urls).pipe(
Stream.mapEffect((url) =>
Effect.tryPromise(() => fetch(url))
),
Stream.catchAll((error) =>
Stream.succeed({ error: error.message })
)
)
```
### 3. Blocking Operations in map
```typescript
// WRONG — Blocking sync operation
const stream = Stream.fromIterable(items).pipe(
Stream.map((item) => {
const result = await fetchData(item) // ❌ Can't await in map
return result
})
)
// CORRECT — Use mapEffect
const stream = Stream.fromIterable(items).pipe(
Stream.mapEffect((item) =>
Effect.tryPromise(() => fetchData(item))
)
)
```
### 4. Not Cleaning Up Resources
```typescript
// WRONG — No cleanup
Stream.async((emit) => {
const ws = new WebSocket('wss://example.com')
ws.onmessage = (e) => emit.single(e.data)
// No cleanup!
})
// CORRECT — Return cleanup Effect
Stream.async((emit) => {
const ws = new WebSocket('wss://example.com')
ws.onmessage = (e) => emit.single(e.data)
return Effect.sync(() => ws.close())
})
```
## Quick Reference
| Need | Constructor | Example |
|------|-------------|---------|
| From array | `Stream.fromIterable` | `Stream.fromIterable([1, 2, 3])` |
| From async callback | `Stream.async` | `Stream.async((emit) => { ... })` |
| Ticker | `Stream.fromSchedule` | `Stream.fromSchedule(Schedule.spaced('1 second'))` |
| Single Effect | `Stream.fromEffect` | `Stream.fromEffect(fetchUser)` |
| Transform | `Stream.map` | `stream.pipe(Stream.map((n) => n * 2))` |
| Filter | `Stream.filter` | `stream.pipe(Stream.filter((n) => n > 0))` |
| Limit | `Stream.take` | `stream.pipe(Stream.take(10))` |
| Accumulate | `Stream.scan` | `stream.pipe(Stream.scan(0, (s, n) => s + n))` |
| Collect all | `Stream.runCollect` | `Stream.runCollect(stream)` |
| Side effects | `Stream.runForEach` | `Stream.runForEach(stream, (x) => Effect.log(x))` |
| Fold | `Stream.runFold` | `Stream.runFold(stream, 0, (s, n) => s + n)` |
## Related Skills
- **effect-atom-integration** — Integrate streams with React atoms
- **effect-service-authoring** — Use streams in service methods
- **effect-testing-patterns** — Test stream-based code