我对此进行了一些研究,发现了一些有趣的文章(第一篇和第二篇),然后使用了一些东西pipeline
(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))
(pipeline 4 c2 (filter even?) c1)
(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2
我链接的第二篇文章通过管道函数周围的一些辅助函数使这更清晰:
(defn ncpus []
(.availableProcessors (Runtime/getRuntime)))
(defn parallelism []
(+ (ncpus) 1))
(defn add-transducer
[in xf]
(let [out (chan (buffer 16))]
(pipeline (parallelism) out xf in)
out))
然后您可以简单地将频道与
(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))
为了完成答案,您发现自己可以以类似的方式使用管道:
(defn pipe-trans
[ci xf]
(let [co (chan 1 xf)]
(pipe ci co)
co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))