--- name: effect-patterns-streams-sinks description: Effect-TS patterns for Streams Sinks. Use when working with streams sinks in Effect-TS applications. --- # Effect-TS Patterns: Streams Sinks This skill provides 6 curated Effect-TS patterns for streams sinks. Use this skill when working on tasks related to: - streams sinks - Best practices in Effect-TS applications - Real-world patterns and solutions --- ## 🟡 Intermediate Patterns ### Sink Pattern 1: Batch Insert Stream Records into Database **Rule:** Batch stream records before database operations to improve throughput and reduce transaction overhead. **Good Example:** This example demonstrates streaming user records from a paginated API and batching them for efficient database insertion. ```typescript import { Effect, Stream, Sink, Chunk } from "effect"; interface User { readonly id: number; readonly name: string; readonly email: string; } interface PaginatedResponse { readonly users: User[]; readonly nextPage: number | null; } // Mock API that returns paginated users const fetchUserPage = ( page: number ): Effect.Effect => Effect.succeed( page < 10 ? { users: Array.from({ length: 50 }, (_, i) => ({ id: page * 50 + i, name: `User ${page * 50 + i}`, email: `user${page * 50 + i}@example.com`, })), nextPage: page + 1, } : { users: [], nextPage: null } ).pipe(Effect.delay("10 millis")); // Mock database insert that takes a batch of users const insertUserBatch = ( users: readonly User[] ): Effect.Effect => Effect.sync(() => { console.log(`Inserting batch of ${users.length} users`); return users.length; }).pipe(Effect.delay("50 millis")); // Create a stream of users from paginated API const userStream: Stream.Stream = Stream.paginateEffect( 0, (page) => fetchUserPage(page).pipe( Effect.map((response) => [ Chunk.fromIterable(response.users), response.nextPage !== null ? Option.some(response.nextPage) : Option.none(), ]) ) ); // Sink that batches users and inserts them const batchInsertSink: Sink.Sink = Sink.fold( 0, (count, chunk: Chunk.Chunk) => Effect.gen(function* () { const users = Chunk.toArray(chunk); const inserted = yield* insertUserBatch(users); return count + inserted; }), (count) => Effect.succeed(count) ).pipe( // Batch into groups of 100 users Sink.withChunking((chunk) => chunk.pipe( Chunk.chunksOf(100), Stream.fromIterable, Stream.runCollect ) ) ); // Run the stream with batching sink const program = Effect.gen(function* () { const totalInserted = yield* userStream.pipe( Stream.run(batchInsertSink) ); console.log(`Total users inserted: ${totalInserted}`); }); Effect.runPromise(program); ``` This pattern: 1. **Creates a stream** of users from a paginated API 2. **Defines a batching sink** that collects users into groups of 100 3. **Inserts each batch** to the database in a single operation 4. **Tracks total count** of inserted records The batching happens automatically—the sink collects elements until the batch size is reached, then processes the complete batch. --- **Rationale:** When consuming a stream of records to persist in a database, collect them into batches using `Sink` before inserting. This reduces the number of database round-trips and transaction overhead, improving overall throughput significantly. --- Inserting records one-by-one is inefficient: - Each insert is a separate database call (network latency, connection overhead) - Each insert may be a separate transaction (ACID overhead) - Resource contention and connection pool exhaustion at scale Batching solves this by: - Grouping N records into a single bulk insert operation - Amortizing database overhead across multiple records - Maintaining throughput even under backpressure - Enabling efficient transaction semantics for the entire batch For example, inserting 10,000 records one-by-one might take 100 seconds. Batching in groups of 100 might take just 2-3 seconds. --- --- ### Sink Pattern 2: Write Stream Events to Event Log **Rule:** Append stream events to an event log with metadata to maintain a complete, ordered record of what happened. **Good Example:** This example demonstrates an event sourcing pattern where a user account stream of events is appended to an event log with metadata. ```typescript import { Effect, Stream, Sink, DateTime, Data } from "effect"; // Event types type AccountEvent = | AccountCreated | MoneyDeposited | MoneyWithdrawn | AccountClosed; class AccountCreated extends Data.TaggedError("AccountCreated")<{ readonly accountId: string; readonly owner: string; readonly initialBalance: number; }> {} class MoneyDeposited extends Data.TaggedError("MoneyDeposited")<{ readonly accountId: string; readonly amount: number; }> {} class MoneyWithdrawn extends Data.TaggedError("MoneyWithdrawn")<{ readonly accountId: string; readonly amount: number; }> {} class AccountClosed extends Data.TaggedError("AccountClosed")<{ readonly accountId: string; }> {} // Event envelope with metadata interface StoredEvent { readonly eventId: string; // Unique identifier per event readonly eventType: string; // Type of event readonly aggregateId: string; // What this event is about readonly aggregateType: string; // What kind of thing (Account) readonly data: any; // Event payload readonly metadata: { readonly timestamp: number; readonly version: number; // Position in log readonly causationId?: string; // What caused this }; } // Mock event log that appends events const eventLog: StoredEvent[] = []; let eventVersion = 0; const appendToEventLog = ( event: AccountEvent, aggregateId: string ): Effect.Effect => Effect.gen(function* () { const now = yield* DateTime.now; const storedEvent: StoredEvent = { eventId: `evt-${eventVersion}-${Date.now()}`, eventType: event._tag, aggregateId, aggregateType: "Account", data: event, metadata: { timestamp: now.toEpochMillis(), version: ++eventVersion, }, }; // Append to log (simulated) eventLog.push(storedEvent); console.log( `[v${storedEvent.metadata.version}] ${storedEvent.eventType}: ${aggregateId}` ); return storedEvent; }); // Simulate a stream of events from various account operations const accountEvents: Stream.Stream<[string, AccountEvent]> = Stream.fromIterable([ [ "acc-1", new AccountCreated({ accountId: "acc-1", owner: "Alice", initialBalance: 1000, }), ], ["acc-1", new MoneyDeposited({ accountId: "acc-1", amount: 500 })], ["acc-1", new MoneyWithdrawn({ accountId: "acc-1", amount: 200 })], [ "acc-2", new AccountCreated({ accountId: "acc-2", owner: "Bob", initialBalance: 2000, }), ], ["acc-2", new MoneyDeposited({ accountId: "acc-2", amount: 1000 })], ["acc-1", new AccountClosed({ accountId: "acc-1" })], ]); // Sink that appends each event to the log const eventLogSink: Sink.Sink = Sink.fold( 0, (count, [aggregateId, event]) => appendToEventLog(event, aggregateId).pipe( Effect.map(() => count + 1) ), (count) => Effect.succeed(count) ); // Run the stream and append all events const program = Effect.gen(function* () { const totalEvents = yield* accountEvents.pipe(Stream.run(eventLogSink)); console.log(`\nTotal events appended: ${totalEvents}`); console.log(`\nEvent log contents:`); eventLog.forEach((event) => { console.log(` [v${event.metadata.version}] ${event.eventType}`); }); }); Effect.runPromise(program); ``` This pattern: 1. **Defines event types** using tagged errors (AccountCreated, MoneyDeposited, etc.) 2. **Creates event envelopes** with metadata (timestamp, version, causation) 3. **Streams events** from various sources 4. **Appends to log** with proper versioning and ordering 5. **Maintains history** for reconstruction and audit --- **Rationale:** When consuming a stream of events that represent changes in your system, append each event to an event log using `Sink`. Event logs provide immutable, ordered records that enable event sourcing, audit trails, and temporal queries. --- Event logs are foundational to many patterns: - **Event Sourcing**: Instead of storing current state, store the sequence of events that led to it - **Audit Trails**: Complete, tamper-proof record of who did what and when - **Temporal Queries**: Reconstruct state at any point in time - **Consistency**: Single source of truth for what happened - **Replay**: Rebuild state or test changes by replaying events Unlike batch inserts which are transactional, event logs are append-only. Each event is immutable once written. This simplicity enables: - Fast appends (no updates, just sequential writes) - Natural ordering (events in write order) - Easy distribution (replicate the log) - Strong consistency (events are facts that don't change) --- --- ### Sink Pattern 4: Send Stream Records to Message Queue **Rule:** Stream records to message queues with proper batching and acknowledgment for reliable distributed data flow. **Good Example:** This example demonstrates streaming sensor readings and publishing them to a message queue with topic-based partitioning. ```typescript import { Effect, Stream, Sink, Chunk } from "effect"; interface SensorReading { readonly sensorId: string; readonly location: string; readonly temperature: number; readonly humidity: number; readonly timestamp: number; } // Mock message queue publisher interface QueuePublisher { readonly publish: ( topic: string, partition: string, messages: readonly SensorReading[] ) => Effect.Effect<{ acknowledged: number; messageIds: string[] }>; } // Create a mock queue publisher const createMockPublisher = (): QueuePublisher => { const publishedMessages: Record = {}; return { publish: (topic, partition, messages) => Effect.gen(function* () { const key = `${topic}/${partition}`; publishedMessages[key] = [ ...(publishedMessages[key] ?? []), ...messages, ]; const messageIds = Array.from({ length: messages.length }, (_, i) => `msg-${Date.now()}-${i}` ); console.log( `Published ${messages.length} messages to ${key} (batch)` ); return { acknowledged: messages.length, messageIds }; }), }; }; // Determine the partition key based on sensor location const getPartitionKey = (reading: SensorReading): string => reading.location; // Route by location for data locality // Simulate a stream of sensor readings const sensorStream: Stream.Stream = Stream.fromIterable([ { sensorId: "temp-1", location: "warehouse-a", temperature: 22.5, humidity: 45, timestamp: Date.now(), }, { sensorId: "temp-2", location: "warehouse-b", temperature: 21.0, humidity: 50, timestamp: Date.now() + 100, }, { sensorId: "temp-3", location: "warehouse-a", temperature: 22.8, humidity: 46, timestamp: Date.now() + 200, }, { sensorId: "temp-4", location: "warehouse-c", temperature: 20.5, humidity: 55, timestamp: Date.now() + 300, }, { sensorId: "temp-5", location: "warehouse-b", temperature: 21.2, humidity: 51, timestamp: Date.now() + 400, }, { sensorId: "temp-6", location: "warehouse-a", temperature: 23.0, humidity: 47, timestamp: Date.now() + 500, }, ]); // Create a sink that batches and publishes to message queue const createQueuePublishSink = ( publisher: QueuePublisher, topic: string, batchSize: number = 100 ): Sink.Sink => Sink.fold( { batches: new Map(), totalPublished: 0 }, (state, reading) => Effect.gen(function* () { const partition = getPartitionKey(reading); const batch = state.batches.get(partition) ?? []; const newBatch = [...batch, reading]; if (newBatch.length >= batchSize) { // Batch is full, publish it const result = yield* publisher.publish(topic, partition, newBatch); const newState = new Map(state.batches); newState.delete(partition); return { ...state, batches: newState, totalPublished: state.totalPublished + result.acknowledged, }; } else { // Add to batch and continue const newState = new Map(state.batches); newState.set(partition, newBatch); return { ...state, batches: newState }; } }), (state) => Effect.gen(function* () { let finalCount = state.totalPublished; // Publish any remaining partial batches for (const [partition, batch] of state.batches) { if (batch.length > 0) { const result = yield* publisher.publish(topic, partition, batch); finalCount += result.acknowledged; } } return finalCount; }) ); // Run the stream and publish to queue const program = Effect.gen(function* () { const publisher = createMockPublisher(); const topic = "sensor-readings"; const published = yield* sensorStream.pipe( Stream.run(createQueuePublishSink(publisher, topic, 50)) // Batch size of 50 ); console.log( `\nTotal messages published to queue: ${published}` ); }); Effect.runPromise(program); ``` This pattern: 1. **Groups readings by partition** (location) for data locality 2. **Batches records** before publishing (50 at a time) 3. **Publishes batches** to the queue with partition key 4. **Flushes partial batches** when stream ends 5. **Tracks acknowledgments** from the queue --- **Rationale:** When consuming a stream of events that need to be distributed to other systems, use `Sink` to publish them to a message queue. Message queues provide reliable, scalable distribution with guarantees like ordering and exactly-once semantics. --- Message queues are the backbone of event-driven architectures: - **Decoupling**: Producers don't wait for consumers - **Scalability**: Multiple subscribers can consume independently - **Durability**: Messages persist even if subscribers are down - **Ordering**: Maintain event sequence (per partition/topic) - **Reliability**: Acknowledgments and retries ensure no message loss Unlike direct writes which block, queue publishing is asynchronous and enables: - High-throughput publishing (batch multiple records per operation) - Backpressure handling (queue manages flow) - Multi-subscriber patterns (fan-out) - Dead letter queues for error handling --- --- ### Sink Pattern 5: Fall Back to Alternative Sink on Failure **Rule:** Implement fallback sinks to handle failures gracefully and ensure data is persisted even when the primary destination is unavailable. **Good Example:** This example demonstrates a system that tries to write order records to a fast in-memory cache first, falls back to database if cache fails, and falls back to a dead letter file if database fails. ```typescript import { Effect, Stream, Sink, Chunk, Either, Data } from "effect"; interface Order { readonly orderId: string; readonly customerId: string; readonly total: number; readonly timestamp: number; } class CacheSinkError extends Data.TaggedError("CacheSinkError")<{ readonly reason: string; }> {} class DatabaseSinkError extends Data.TaggedError("DatabaseSinkError")<{ readonly reason: string; }> {} // Mock in-memory cache sink (fast but limited) const createCacheSink = (): Sink.Sink => { const cache: Order[] = []; const MAX_CACHE_SIZE = 1000; return Sink.fold( 0, (count, order) => Effect.gen(function* () { if (cache.length >= MAX_CACHE_SIZE) { yield* Effect.fail( new CacheSinkError({ reason: `Cache full (${cache.length}/${MAX_CACHE_SIZE})`, }) ); } cache.push(order); console.log(`[CACHE] Cached order ${order.orderId}`); return count + 1; }), (count) => Effect.gen(function* () { console.log(`[CACHE] Final: ${count} orders in cache`); return count; }) ); }; // Mock database sink (slower but reliable) const createDatabaseSink = (): Sink.Sink => { const orders: Order[] = []; return Sink.fold( 0, (count, order) => Effect.gen(function* () { // Simulate occasional database failures if (Math.random() < 0.1) { yield* Effect.fail( new DatabaseSinkError({ reason: "Connection timeout", }) ); } orders.push(order); console.log(`[DATABASE] Persisted order ${order.orderId}`); return count + 1; }), (count) => Effect.gen(function* () { console.log(`[DATABASE] Final: ${count} orders in database`); return count; }) ); }; // Mock file sink (always works but slow) const createDeadLetterSink = (): Sink.Sink => { const deadLetters: Order[] = []; return Sink.fold( 0, (count, order) => Effect.gen(function* () { deadLetters.push(order); console.log( `[DEAD-LETTER] Wrote order ${order.orderId} to dead letter file` ); return count + 1; }), (count) => Effect.gen(function* () { console.log( `[DEAD-LETTER] Final: ${count} orders in dead letter file` ); return count; }) ); }; // Create a fallback sink that tries cache -> database -> file const createFallbackSink = (): Sink.Sink< { readonly cached: number; readonly persisted: number; readonly deadLetters: number }, never, Order > => Sink.fold( { cached: 0, persisted: 0, deadLetters: 0 }, (state, order) => Effect.gen(function* () { // Try cache first const cacheResult = yield* createCacheSink() .pipe(Sink.feed(Chunk.of(order))) .pipe(Effect.either); if (Either.isRight(cacheResult)) { return { ...state, cached: state.cached + cacheResult.right, }; } console.log( `[FALLBACK] Cache failed (${cacheResult.left.reason}), trying database` ); // Cache failed, try database const dbResult = yield* createDatabaseSink() .pipe(Sink.feed(Chunk.of(order))) .pipe(Effect.either); if (Either.isRight(dbResult)) { return { ...state, persisted: state.persisted + dbResult.right, }; } console.log( `[FALLBACK] Database failed (${dbResult.left.reason}), falling back to dead letter` ); // Database failed, use dead letter const dlResult = yield* createDeadLetterSink() .pipe(Sink.feed(Chunk.of(order))); return { ...state, deadLetters: state.deadLetters + dlResult, }; }), (state) => Effect.gen(function* () { console.log(`\n[SUMMARY]`); console.log(` Cached: ${state.cached}`); console.log(` Persisted: ${state.persisted}`); console.log(` Dead Letter: ${state.deadLetters}`); return state; }) ); // Simulate a stream of orders const orderStream: Stream.Stream = Stream.fromIterable([ { orderId: "order-1", customerId: "cust-1", total: 99.99, timestamp: Date.now(), }, { orderId: "order-2", customerId: "cust-2", total: 149.99, timestamp: Date.now() + 100, }, { orderId: "order-3", customerId: "cust-1", total: 49.99, timestamp: Date.now() + 200, }, { orderId: "order-4", customerId: "cust-3", total: 199.99, timestamp: Date.now() + 300, }, { orderId: "order-5", customerId: "cust-2", total: 89.99, timestamp: Date.now() + 400, }, ]); // Run the stream with fallback sink const program = Effect.gen(function* () { const result = yield* orderStream.pipe(Stream.run(createFallbackSink())); console.log(`\nTotal orders processed: ${result.cached + result.persisted + result.deadLetters}`); }); Effect.runPromise(program); ``` This pattern: 1. **Tries cache first** (fast, limited capacity) 2. **Falls back to database** if cache is full 3. **Falls back to dead letter** if database fails 4. **Tracks which sink** was used for each record 5. **Reports summary** of where data went --- **Rationale:** When consuming a stream to a primary destination that might fail, wrap it in a fallback pattern. If the primary sink fails, automatically redirect the stream to an alternative sink. This enables progressive degradation where the system degrades gracefully rather than failing completely. --- Production systems need resilience: - **Primary failures**: Database down, network timeout, quota exceeded - **Progressive degradation**: Keep the system running, even at reduced capacity - **No data loss**: Fallback ensures data is persisted somewhere - **Operational flexibility**: Choose fallback based on failure type - **Monitoring**: Track when fallbacks are used to alert operators Without fallback patterns: - System fails when primary destination fails - Data is lost if primary is unavailable - No clear signal that degradation occurred With fallback sinks: - Stream continues even when primary fails - Data is safely persisted to alternative - Clear audit trail of which sink was used --- --- ### Sink Pattern 6: Retry Failed Stream Operations **Rule:** Implement retry strategies in sinks to handle transient failures and improve resilience without manual intervention. **Good Example:** This example demonstrates retrying database writes with exponential backoff, tracking attempts, and falling back on permanent failures. ```typescript import { Effect, Stream, Sink, Chunk, Duration, Schedule } from "effect"; interface UserRecord { readonly userId: string; readonly name: string; readonly email: string; } class WriteError extends Error { readonly isTransient: boolean; constructor(message: string, isTransient: boolean = true) { super(message); this.name = "WriteError"; this.isTransient = isTransient; } } // Mock database that occasionally fails const database = { failureRate: 0.3, // 30% transient failure rate permanentFailureRate: 0.05, // 5% permanent failure rate insertUser: (user: UserRecord): Effect.Effect => Effect.gen(function* () { const rand = Math.random(); // Permanent failure (e.g., constraint violation) if (rand < database.permanentFailureRate) { throw new WriteError( `Permanent: User ${user.userId} already exists`, false ); } // Transient failure (e.g., connection timeout) if (rand < database.permanentFailureRate + database.failureRate) { throw new WriteError( `Transient: Connection timeout writing ${user.userId}`, true ); } // Success console.log(`✓ Wrote user ${user.userId}`); }), }; // Retry configuration interface RetryConfig { readonly maxAttempts: number; readonly initialDelayMs: number; readonly maxDelayMs: number; readonly backoffFactor: number; } const defaultRetryConfig: RetryConfig = { maxAttempts: 5, initialDelayMs: 100, // Start with 100ms maxDelayMs: 5000, // Cap at 5 seconds backoffFactor: 2, // Double each time }; // Result tracking interface OperationResult { readonly succeeded: number; readonly transientFailures: number; readonly permanentFailures: number; readonly detailedStats: Array<{ readonly userId: string; readonly attempts: number; readonly status: "success" | "transient-failed" | "permanent-failed"; }>; } // Create a sink with retry logic const createRetrySink = (config: RetryConfig): Sink.Sink => Sink.fold( { succeeded: 0, transientFailures: 0, permanentFailures: 0, detailedStats: [], }, (state, user) => Effect.gen(function* () { let lastError: WriteError | null = null; let attempts = 0; // Retry loop for (attempts = 1; attempts <= config.maxAttempts; attempts++) { try { yield* database.insertUser(user); // Success! console.log( `[${user.userId}] Success on attempt ${attempts}/${config.maxAttempts}` ); return { ...state, succeeded: state.succeeded + 1, detailedStats: [ ...state.detailedStats, { userId: user.userId, attempts, status: "success", }, ], }; } catch (error) { lastError = error as WriteError; if (!lastError.isTransient) { // Permanent failure, don't retry console.log( `[${user.userId}] Permanent failure: ${lastError.message}` ); return { ...state, permanentFailures: state.permanentFailures + 1, detailedStats: [ ...state.detailedStats, { userId: user.userId, attempts, status: "permanent-failed", }, ], }; } // Transient failure, retry if attempts remain if (attempts < config.maxAttempts) { // Calculate delay with exponential backoff let delayMs = config.initialDelayMs * Math.pow(config.backoffFactor, attempts - 1); delayMs = Math.min(delayMs, config.maxDelayMs); // Add jitter (±10%) const jitter = delayMs * 0.1; delayMs = delayMs + (Math.random() - 0.5) * 2 * jitter; console.log( `[${user.userId}] Transient failure (attempt ${attempts}/${config.maxAttempts}): ${lastError.message}` ); console.log(` Retrying in ${Math.round(delayMs)}ms...`); yield* Effect.sleep(Duration.millis(Math.round(delayMs))); } } } // All retries exhausted console.log( `[${user.userId}] Failed after ${config.maxAttempts} attempts` ); return { ...state, transientFailures: state.transientFailures + 1, detailedStats: [ ...state.detailedStats, { userId: user.userId, attempts: config.maxAttempts, status: "transient-failed", }, ], }; }), (state) => Effect.gen(function* () { console.log(`\n[SUMMARY]`); console.log(` Succeeded: ${state.succeeded}`); console.log(` Transient Failures: ${state.transientFailures}`); console.log(` Permanent Failures: ${state.permanentFailures}`); console.log(` Total: ${state.detailedStats.length}`); // Show detailed stats const failed = state.detailedStats.filter((s) => s.status !== "success"); if (failed.length > 0) { console.log(`\n[FAILURES]`); failed.forEach((stat) => { console.log( ` ${stat.userId}: ${stat.attempts} attempts (${stat.status})` ); }); } return state; }) ); // Simulate a stream of users to insert const userStream: Stream.Stream = Stream.fromIterable([ { userId: "user-1", name: "Alice", email: "alice@example.com" }, { userId: "user-2", name: "Bob", email: "bob@example.com" }, { userId: "user-3", name: "Charlie", email: "charlie@example.com" }, { userId: "user-4", name: "Diana", email: "diana@example.com" }, { userId: "user-5", name: "Eve", email: "eve@example.com" }, ]); // Run the stream with retry sink const program = Effect.gen(function* () { const result = yield* userStream.pipe(Stream.run(createRetrySink(defaultRetryConfig))); console.log(`\nProcessing complete.`); }); Effect.runPromise(program); ``` This pattern: 1. **Attempts operation** up to max retries 2. **Distinguishes transient vs. permanent failures** 3. **Uses exponential backoff** to space retries 4. **Adds jitter** to prevent thundering herd 5. **Tracks detailed stats** for monitoring 6. **Reports summary** of outcomes --- **Rationale:** When consuming a stream to a destination that may experience transient failures (network timeouts, rate limiting, temporary unavailability), wrap the sink operation with a retry policy. Use exponential backoff to avoid overwhelming a recovering system while still recovering quickly. --- Transient failures are common in distributed systems: - **Network timeouts**: Temporary connectivity issues resolve themselves - **Rate limiting**: Service recovers once rate limit window resets - **Temporary unavailability**: Services restart or scale up - **Circuit breaker trips**: Service recovers after backoff period Without retry logic: - Every transient failure causes data loss or stream interruption - Manual intervention required to restart - System appears less reliable than it actually is With intelligent retry logic: - Automatic recovery from transient failures - Exponential backoff prevents thundering herd - Clear visibility into which operations failed permanently - Data flows continuously despite temporary issues --- --- ### Sink Pattern 3: Write Stream Lines to File **Rule:** Write streaming lines to a file efficiently using buffered output and proper resource management. **Good Example:** This example demonstrates streaming log entries and writing them to a file with buffering. ```typescript import { Effect, Stream, Sink, Chunk, FileSystem } from "effect"; interface LogEntry { readonly level: "debug" | "info" | "warn" | "error"; readonly message: string; readonly timestamp: number; } // Format a log entry as a line const formatLogLine = (entry: LogEntry): string => { const iso = new Date(entry.timestamp).toISOString(); return `[${iso}] ${entry.level.toUpperCase()}: ${entry.message}`; }; // Simulate a stream of log entries const logStream: Stream.Stream = Stream.fromIterable([ { level: "info", message: "Server starting", timestamp: Date.now() }, { level: "debug", message: "Loading config", timestamp: Date.now() + 100 }, { level: "info", message: "Connected to database", timestamp: Date.now() + 200 }, { level: "warn", message: "High memory usage detected", timestamp: Date.now() + 300 }, { level: "info", message: "Processing request", timestamp: Date.now() + 400 }, { level: "error", message: "Connection timeout", timestamp: Date.now() + 500 }, { level: "info", message: "Retrying connection", timestamp: Date.now() + 600 }, { level: "info", message: "Connection restored", timestamp: Date.now() + 700 }, ]); // Create a file writer sink with buffering const createFileWriteSink = ( filePath: string, bufferSize: number = 100 ): Sink.Sink => Effect.scoped( Effect.gen(function* () { // Open file in append mode const fs = yield* FileSystem.FileSystem; const handle = yield* fs.open(filePath, "a"); let buffer: string[] = []; let lineCount = 0; // Flush buffered lines to disk const flush = Effect.gen(function* () { if (buffer.length === 0) return; const content = buffer.join("\n") + "\n"; yield* fs.write(handle, content); buffer = []; }); // Return the sink return Sink.fold( 0, (count, line: string) => Effect.gen(function* () { buffer.push(line); const newCount = count + 1; // Flush when buffer reaches size limit if (buffer.length >= bufferSize) { yield* flush; } return newCount; }), (count) => Effect.gen(function* () { // Flush any remaining lines before closing yield* flush; yield* fs.close(handle); return count; }) ); }) ).pipe( Effect.flatten ); // Process the log stream const program = Effect.gen(function* () { const fs = yield* FileSystem.FileSystem; const filePath = "/tmp/app.log"; // Clear the file first yield* fs.writeFileString(filePath, ""); // Stream logs, format them, and write to file const written = yield* logStream.pipe( Stream.map(formatLogLine), Stream.run(createFileWriteSink(filePath, 50)) // Buffer 50 lines before flush ); console.log(`Wrote ${written} log lines to ${filePath}`); // Read back the file to verify const content = yield* fs.readFileString(filePath); console.log("\nFile contents:"); console.log(content); }); Effect.runPromise(program); ``` This pattern: 1. **Opens a file** for appending 2. **Buffers log lines** in memory (50 lines before flush) 3. **Flushes periodically** when buffer fills or stream ends 4. **Closes the file** safely using scopes 5. **Tracks line count** for confirmation --- **Rationale:** When consuming a stream of data to persist as lines in a file, use `Sink` with a file writer. Buffer the output for efficiency and ensure proper resource cleanup using Effect's scope management. --- Writing stream data to files requires: - **Buffering**: Writing one line at a time is slow. Buffer multiple lines before flushing to disk - **Efficiency**: Reduce system calls and I/O overhead by batching writes - **Resource Management**: Ensure file handles are properly closed even on errors - **Ordering**: Maintain the order of lines as they appear in the stream This pattern is essential for: - Log files and audit trails - CSV/JSON Line export - Streaming data archival - Data pipelines with file intermediates --- ---