[[onyx-core-async]] == onyx-core-async Onyx plugin providing read and write facilities for Clojure core.async. === Installation This plugin is included with Onyx. You do not need to add it as a separate dependency. In your peer boot-up namespace: [source,clojure] ---- (:require [onyx.plugin.core-async]) ---- === Functions ==== read-from-chan Catalog entry: [source,clojure] ---- {:onyx/name :in :onyx/plugin :onyx.plugin.core-async/input :onyx/type :input :onyx/medium :core.async :onyx/batch-size batch-size :onyx/max-peers 1 :onyx/doc "Reads segments from a core.async channel"} ---- Lifecycle entries: [source,clojure] ---- [{:lifecycle/task :your-task-name :lifecycle/calls :my.ns/in-calls} {:lifecycle/task :your-task-name :lifecycle/calls :onyx.plugin.core-async/reader-calls}] ---- There's a little extra baggage with core.async because you need a reference to the channel. Make sure that `my.ns/in-calls` is a map that references a function to inject the channel in: [source,clojure] ---- (def in-chan (chan capacity)) (def in-buffer (atom {})) (defn inject-in-ch [event lifecycle] {:core.async/buffer in-buffer :core.async/chan in-chan}) (def in-calls {:lifecycle/before-task-start inject-in-ch}) ---- ==== write-to-chan Catalog entry: [source,clojure] ---- {:onyx/name :out :onyx/plugin :onyx.plugin.core-async/output :onyx/type :output :onyx/medium :core.async :onyx/batch-size batch-size :onyx/max-peers 1 :onyx/doc "Writes segments to a core.async channel"} ---- Lifecycle entries: [source,clojure] ---- [{:lifecycle/task :your-task-name :lifecycle/calls :my.ns/out-calls} {:lifecycle/task :your-task-name :lifecycle/calls :onyx.plugin.core-async/writer-calls}] ---- Again, as with `read-from-chan`, there's a little extra to do since core.async has some exceptional behavior compared to other plugins: [source,clojure] ---- (def out-chan (chan capacity)) (defn inject-out-ch [event lifecycle] {:core.async/chan out-chan}) (def out-calls {:lifecycle/before-task-start inject-out-ch}) ---- === Utility Functions ==== `take-segments!` This additional function is provided as a utility for removing segments from a channel until nothing is read for a provided timeout (in milliseconds).