Skip to content

Commit c56fd02

Browse files
committed
use impl/dispatch executors
1 parent d922995 commit c56fd02

File tree

2 files changed

+8
-15
lines changed

2 files changed

+8
-15
lines changed

src/main/clojure/clojure/core/async/flow.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
"starts the entire flow from init values. The processes start paused.
9393
Call 'resume' or 'resume-proc' to start flow. Returns a map with keys:
9494
95-
:report-chan - a core.async chan for reading.'ping' reponses
95+
:report-chan - a core.async chan for reading.'ping' responses
9696
will show up here, as will any explicit ::flow/report outputs
9797
from :transform
9898

src/main/clojure/clojure/core/async/flow/impl.clj

+7-14
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@
1111
[clojure.core.async.flow :as-alias flow]
1212
[clojure.core.async.flow.spi :as spi]
1313
[clojure.core.async.flow.impl.graph :as graph]
14+
[clojure.core.async.impl.dispatch :as disp]
1415
[clojure.walk :as walk]
1516
[clojure.datafy :as datafy])
1617
(:import [java.util.concurrent Future Executors ExecutorService TimeUnit]
1718
[java.util.concurrent.locks ReentrantLock]))
1819

1920
(set! *warn-on-reflection* true)
2021

21-
;;TODO - something specific, e.g. make aware of JDK version and vthreads
22-
(defonce mixed-exec clojure.lang.Agent/soloExecutor)
23-
(defonce io-exec clojure.lang.Agent/soloExecutor)
24-
(defonce compute-exec clojure.lang.Agent/pooledExecutor)
25-
2622
(defn datafy [x]
2723
(condp instance? x
2824
clojure.lang.Fn (-> x str symbol)
@@ -32,11 +28,9 @@
3228

3329
(defn futurize ^Future [f {:keys [exec]}]
3430
(fn [& args]
35-
(let [^ExecutorService e (case exec
36-
:compute compute-exec
37-
:io io-exec
38-
:mixed mixed-exec
39-
exec)]
31+
(let [^ExecutorService e (if (instance? ExecutorService exec)
32+
exec
33+
(disp/executor-for exec))]
4034
(.submit e ^Callable #(apply f args)))))
4135

4236
(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
@@ -53,12 +47,11 @@
5347

5448
(defn create-flow
5549
"see lib ns for docs"
56-
[{:keys [procs conns mixed-exec io-exec compute-exec]
57-
:or {mixed-exec mixed-exec, io-exec io-exec, compute-exec compute-exec}}]
50+
[{:keys [procs conns mixed-exec io-exec compute-exec]}]
5851
(let [lock (ReentrantLock.)
5952
chans (atom nil)
6053
execs {:mixed mixed-exec :io io-exec :compute compute-exec}
61-
_ (assert (every? #(instance? ExecutorService %) (vals execs))
54+
_ (assert (every? #(or (nil? %) (instance? ExecutorService %)) (vals execs))
6255
"mixed-exe, io-exec and compute-exec must be ExecutorServices")
6356
pdescs (reduce-kv prep-proc {} procs)
6457
allopts (fn [iok] (into {} (mapcat #(map (fn [[k opts]] [[(:pid %) k] opts]) (iok %)) (vals pdescs))))
@@ -136,7 +129,7 @@
136129
resolver (reify spi/Resolver
137130
(get-write-chan [_ coord]
138131
(write-chan coord))
139-
(get-exec [_ context] (execs context)))
132+
(get-exec [_ context] (or (execs context) (disp/executor-for context))))
140133
start-proc
141134
(fn [{:keys [pid proc args ins outs]}]
142135
(try

0 commit comments

Comments
 (0)