# core-async — Complete API Reference > A standalone, zero-dependency CSP (Communicating Sequential Processes) library for JavaScript. > Coordinate concurrent async workflows through channels instead of callbacks, event emitters, or promise chains. > Works with native async/await. Ships as both ESM and CommonJS. Node.js >= 16. ## Install ``` npm i core-async ``` ## Import ESM (recommended): ```js import { Channel, alts, timeout, filter, map, reduce, compose, merge, delay, subscribe, PubSub, throttle } from 'core-async' ``` CommonJS: ```js const { Channel, utils: { alts, timeout, delay, merge, subscribe, throttle }, transducer: { filter, map, reduce, compose } } = require('core-async') // Or use the flat exports (also works in CJS): const { Channel, alts, timeout, filter, map, PubSub, throttle } = require('core-async') ``` TypeScript — type definitions are included, no @types package needed: ```ts import { Channel, alts, type PutOptions, type TakeOptions } from 'core-async' const chan = new Channel() ``` --- ## Channel The Channel is the core primitive. It provides a communication pipe between concurrent async processes. A process puts data onto a channel and another process takes data off it. When no matching counterpart is ready, the operation blocks (awaits). ### Constructor ```js new Channel() // Unbuffered new Channel(bufferSize) // Buffered new Channel(transducer) // With transducer new Channel(bufferSize, transducer) // Buffered + transducer new Channel(bufferSize, { mode, onClosing }) // Buffered + options new Channel(bufferSize, transducer, { mode }) // All three new Channel(transducer, { onClosing }) // Transducer + options ``` Parameters: - `bufferSize` (Number, optional) — buffer size. Default: 0 (unbuffered). Must be >= 1 when mode is 'sliding' or 'dropping'. - `transducer` (Function, optional) — transducer applied to every value put onto the channel. - `options.mode` (String, optional) — one of 'default', 'sliding', 'dropping'. Default: 'default'. - `'default'`: put blocks when buffer is full (backpressure) - `'sliding'`: oldest buffered value is evicted when buffer is full. put returns true. - `'dropping'`: new value is silently dropped when buffer is full. put returns null. - `options.onClosing` (Function, optional) — callback invoked when close() is called. ### Unbuffered vs Buffered Unbuffered (buffer = 0): every put blocks until a corresponding take is ready, and vice versa. Strict synchronization. Buffered: allows up to `bufferSize` puts to complete immediately. Once full, subsequent puts block until a take frees a slot. ```js import { Channel } from 'core-async' // Unbuffered — put blocks until take const unbuffered = new Channel() ;(async () => { await unbuffered.put('hello') // Blocks here until someone takes console.log('Delivered!') })() ;(async () => { const val = await unbuffered.take() // 'hello' — unblocks the put above })() // Buffered — first 2 puts complete immediately const buffered = new Channel(2) ;(async () => { await buffered.put(1) // Immediate (buffer slot 1) await buffered.put(2) // Immediate (buffer slot 2) await buffered.put(3) // Blocks — buffer full, waits for a take })() ``` ### Dropping and Sliding Modes ```js import { Channel } from 'core-async' // Dropping: keeps oldest, drops new values when full const dropping = new Channel(1, { mode: 'dropping' }) ;(async () => { await dropping.put('first') // true (buffered) await dropping.put('second') // null (dropped — buffer full) console.log(await dropping.take()) // 'first' })() // Sliding: keeps newest, evicts oldest when full const sliding = new Channel(1, { mode: 'sliding' }) ;(async () => { await sliding.put('first') // true (buffered) await sliding.put('second') // true (evicts 'first') console.log(await sliding.take()) // 'second' })() ``` ### chan.put(value, options?) Asynchronously puts a value onto the channel. Returns Promise. - Returns `true` — value successfully delivered to a consumer - Returns `false` — value rejected by a transducer filter - Returns `null` — channel is closed, or value was dropped (dropping mode) - **Putting `null` closes the channel.** null is a special sentinel. You cannot send null as data. Options: - `timeout` (Number) — milliseconds. If not consumed in time, rejects with error (code 408). ```js ;(async () => { const status = await chan.put('hello') console.log(status) // true // With timeout try { await chan.put('hello', { timeout: 5000 }) } catch (err) { console.log(err.code) // 408 console.log(err.message) // "'put' timed out after 5000 ms. No data was added to the channel." } // Close the channel by putting null await chan.put(null) })() ``` ### chan.take(options?) Asynchronously takes a value from the channel. Returns Promise. - Returns the value when one is available - Returns `null` when the channel is closed Options: - `timeout` (Number) — milliseconds. If no value available in time, rejects with error (code 408). ```js ;(async () => { const value = await chan.take() // With timeout try { const value = await chan.take({ timeout: 3000 }) } catch (err) { console.log(err.code) // 408 console.log(err.message) // "'take' timed out after 3000 ms. No data was taken off the channel." } })() ``` ### chan.sput(value) Synchronous put. Attempts to put without blocking. Only succeeds if a taker is already waiting. - Returns `true` — delivered immediately to a waiting taker - Returns `false` — no taker waiting (value is NOT queued) - Returns `Promise` when the channel has a transducer (because the transducer may be async) ```js const chan = new Channel() chan.sput('hello') // false — no taker waiting chan.take() // Sets up a waiting taker chan.sput('hello') // true — delivered immediately ``` ### chan.stake() Synchronous take. Attempts to take without blocking. Only succeeds if a value has already been put. - Returns the value if available - Returns `false` if nothing available ```js const chan = new Channel() chan.stake() // false — nothing available chan.put('hello') // Queues a value chan.stake() // 'hello' ``` ### chan.close() Closes the channel. - All pending (blocked) take operations resolve with `null` - All pending (blocked) put operations resolve with `false` - Future puts resolve with `null` - Future takes resolve with `null` - stake() returns `false` - The onClosing callback (if provided) is invoked Equivalent to `await chan.put(null)`. ```js const chan = new Channel() chan.close() ;(async () => { await chan.put('hello') // null (channel closed) await chan.take() // null (channel closed) })() ``` ### Channel State Properties - `chan.opened` (Boolean) — true until close() is called - `chan.closing` (Boolean) — true after close() is called - `chan.closed` (Boolean) — true after close() finishes resolving all pending operations --- ## Transducers Transducers are composable functions that transform data flowing through a channel. Every value put onto the channel passes through the transducer before being delivered to a taker. All predicate/transform/reduce functions may return Promises. ```js import { filter, map, reduce, compose } from 'core-async' ``` ### filter(predicate) Creates a transducer that only allows matching values through. Signature: `filter(predicate: (value: any, index: number) => boolean | Promise) => Transducer` - Predicate returns true → value passes through - Predicate returns false → value rejected, put resolves with `false` ```js import { Channel, filter } from 'core-async' const chan = new Channel(filter(x => x > 1)) ;(async () => { console.log(await chan.put(1)) // false (filtered out) console.log(await chan.put(2)) // true (accepted) console.log(await chan.put(3)) // true (accepted) })() ;(async () => { console.log(await chan.take()) // 2 console.log(await chan.take()) // 3 })() ``` ### map(transform) Creates a transducer that transforms each value before it enters the channel. Signature: `map(transform: (value: any, index: number) => any | Promise) => Transducer` ```js import { Channel, map } from 'core-async' const chan = new Channel(map(x => x * 10)) ;(async () => { await chan.put(1) await chan.put(2) })() ;(async () => { console.log(await chan.take()) // 10 console.log(await chan.take()) // 20 })() ``` ### reduce(reduceFn, accumulator) Creates a transducer that accumulates values. Each take receives the running accumulation. Signature: `reduce(fn: (accumulator: any, value: any, index: number) => any | Promise, initialAccumulator: any) => Transducer` ```js import { Channel, reduce } from 'core-async' const chan = new Channel(reduce((acc, x, idx) => ({ total: acc.total + x, idx }), { total: 0 })) ;(async () => { await chan.put(1) await chan.put(2) await chan.put(3) })() ;(async () => { console.log(await chan.take()) // { total: 1, idx: 0 } console.log(await chan.take()) // { total: 3, idx: 1 } console.log(await chan.take()) // { total: 6, idx: 2 } })() ``` ### compose(...transducers) Composes multiple transducers into one. Executes left-to-right. If any transducer rejects (e.g., a filter), the rest are skipped and put returns `false`. Signature: `compose(...transducers: Transducer[]) => Transducer` ```js import { Channel, compose, filter, map } from 'core-async' const chan = new Channel(compose( filter(x => x > 1), // Only values > 1 map(x => x + 10), // Add 10 filter(x => x < 13) // Only values < 13 )) ;(async () => { await chan.put(1) // Rejected by first filter (1 is not > 1) await chan.put(2) // Passes: 2 → filter → 2 → map → 12 → filter → accepted (12 < 13) await chan.put(3) // Rejected: 3 → filter → 3 → map → 13 → filter → rejected (13 is not < 13) })() ;(async () => { console.log(await chan.take()) // 12 })() ``` --- ## Utilities ```js import { alts, merge, timeout, delay, subscribe, PubSub, throttle } from 'core-async' ``` ### alts(channels) Race multiple channels. Returns a Promise that resolves when the first value becomes available on any channel. Returns `[value, winnerChannel]`. Signature: `alts(channels: Channel[]) => Promise<[any, Channel]>` When multiple channels have values ready simultaneously, alts takes from the one whose value was put first. ```js import { Channel, alts } from 'core-async' const chan1 = new Channel() const chan2 = new Channel() ;(async () => { const [value, winner] = await alts([chan1, chan2]) if (winner === chan1) console.log(`chan1 won: ${value}`) else console.log(`chan2 won: ${value}`) })() chan1.put('hello') chan2.put('world') // Output: "chan1 won: hello" ``` Common pattern — race data against a deadline: ```js import { Channel, alts, timeout } from 'core-async' const dataChan = new Channel() ;(async () => { const deadline = timeout(3000) const [value, winner] = await alts([dataChan, deadline]) if (winner === deadline) { console.log('Timed out after 3 seconds') } else { console.log(`Got data: ${value}`) } })() ``` ### merge(channels) Merge multiple channels into a single output channel. Values from all input channels are forwarded as they arrive. When an input channel closes, it stops contributing but the output stays open. Signature: `merge(channels: Channel[]) => Channel` ```js import { Channel, merge } from 'core-async' const chan1 = new Channel() const chan2 = new Channel() const merged = merge([chan1, chan2]) chan1.put(1) chan2.put('one') chan1.put(2) chan2.put('two') ;(async () => { console.log(await merged.take()) // 1 console.log(await merged.take()) // 'one' console.log(await merged.take()) // 2 console.log(await merged.take()) // 'two' })() ``` ### timeout(time) Creates a channel that receives the string `'timeout'` after the specified delay. If the channel is closed before the timeout fires, the timer is cancelled. Signature: `timeout(time: number | [min, max]) => Channel` - Number: fires after that many milliseconds - [min, max] array: fires after a random duration between min and max milliseconds ```js import { timeout } from 'core-async' ;(async () => { const t = timeout(5000) console.log('Waiting 5 seconds...') await t.take() console.log('Done!') })() // Random timeout between 1-5 seconds const t = timeout([1000, 5000]) ``` ### delay(ms) Returns a Promise that resolves after the specified milliseconds. Simple async sleep. Signature: `delay(ms: number) => Promise` ```js import { delay } from 'core-async' ;(async () => { console.log('Starting...') await delay(2000) console.log('2 seconds later!') })() ``` ### subscribe(channel, subscribers) Routes values from a source channel to subscriber channels based on predicate rules. Each subscriber has a `chan` (Channel) and a `rule` (predicate). When a value is taken from the source, it is put onto every subscriber whose rule returns true. Closing the source channel stops the subscription loop. Signature: `subscribe(channel: Channel, subscribers: { chan: Channel, rule: (data: any) => boolean } | Array) => void` ```js import { Channel, subscribe } from 'core-async' const source = new Channel() const numbers = new Channel() const strings = new Channel() subscribe(source, [ { chan: numbers, rule: data => typeof data === 'number' }, { chan: strings, rule: data => typeof data === 'string' } ]) ;(async () => { while (true) { const n = await numbers.take() console.log(`Number: ${n}`) } })() ;(async () => { while (true) { const s = await strings.take() console.log(`String: ${s}`) } })() ;[1, 'one', 2, 'two', 3, 'three'].forEach(v => source.put(v)) // Output: // Number: 1 // String: one // Number: 2 // String: two // Number: 3 // String: three ``` ### PubSub Topic-based publish/subscribe system built on channels. Publishers send values to named topics, subscribers receive values from topics they've subscribed to. Constructor: `new PubSub()` #### pubSub.pub(topics, value) Publish a value to one or more topics. Signature: `pub(topics: string | string[], value: any) => void` #### pubSub.sub(topics, subscriber) Subscribe a channel to one or more topics. Signature: `sub(topics: string | string[], subscriber: Channel) => void` #### pubSub.unsub(topics, subscriber) Unsubscribe a channel from one or more topics. Signature: `unsub(topics: string | string[], subscriber: Channel) => void` #### pubSub.close() Closes the PubSub and all internal subscription channels. No further messages will be delivered. Signature: `close() => void` Full example: ```js import { Channel, PubSub } from 'core-async' const pubSub = new PubSub() const errors = new Channel() const metrics = new Channel() pubSub.sub('error', errors) pubSub.sub('metric', metrics) ;(async () => { while (true) { const data = await errors.take() console.log(`Error: ${JSON.stringify(data)}`) } })() ;(async () => { while (true) { const data = await metrics.take() console.log(`Metric: ${JSON.stringify(data)}`) } })() pubSub.pub('error', { code: 500, msg: 'Internal error' }) pubSub.pub('metric', { latency: 42 }) pubSub.pub('error', { code: 404, msg: 'Not found' }) pubSub.pub('metric', { latency: 18 }) // Output: // Error: {"code":500,"msg":"Internal error"} // Metric: {"latency":42} // Error: {"code":404,"msg":"Not found"} // Metric: {"latency":18} // Clean up pubSub.close() ``` ### throttle(tasks, buffer) Execute an array of async tasks with a concurrency limit. Returns a promise that resolves with all results in order. If a task throws, its result is `{ error: }` instead of crashing the whole batch. Signature: `throttle(tasks: (() => Promise)[], buffer: number) => Promise` - `tasks` — array of parameterless functions, each returning a Promise - `buffer` — max concurrent tasks (must be >= 1) ```js import { throttle } from 'core-async' const urls = ['https://api.example.com/1', 'https://api.example.com/2', /* ...100 more */] const tasks = urls.map(url => () => fetch(url).then(r => r.json())) // Max 10 concurrent requests const results = await throttle(tasks, 10) // results[i] is either the response data or { error: } if that task failed ``` --- ## Gotchas 1. **Putting null closes the channel.** `null` is a sentinel. `await chan.put(null)` is equivalent to `chan.close()`. Use `undefined`, `{}`, or a custom value if you need to represent "no value." 2. **sput/stake return false, not throw.** When they can't complete immediately, they silently return `false`. The value is NOT queued. This is fundamentally different from put/take which block. 3. **sput returns a Promise when channel has a transducer.** Because the transducer may be async, sput must return `Promise` instead of `boolean` in this case. 4. **Sliding drops oldest, dropping drops newest.** Sliding mode (keeps most recent): oldest value evicted, put returns `true`. Dropping mode (keeps oldest): new value silently dropped, put returns `null`. 5. **Always close channels.** Open channels with blocked put/take operations keep the Node.js event loop alive. Critical in serverless (Lambda) where the runtime bills while the event loop is active. 6. **Timeout errors have code 408.** When put or take times out via the `{ timeout: ms }` option, the thrown error has `err.code === 408`. 7. **Transducer filter rejection.** When a filter transducer rejects a value, put resolves with `false`. In a compose chain, if any transducer rejects, all subsequent transducers are skipped. 8. **async/await is required for blocking semantics.** put() and take() return Promises. Without await, they fire-and-forget. Always use `await chan.put(x)` and `await chan.take()` for sequential behavior.