Skip to content

Commit fb39b89

Browse files
committed
Merge branch 'master' into dev-io-thread
2 parents 5f6bc42 + 88a7971 commit fb39b89

File tree

3 files changed

+38
-23
lines changed

3 files changed

+38
-23
lines changed

src/main/clojure/clojure/core/async/flow.clj

+36-19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
a set of channels for centralized control, reporting, error-handling,
2222
and execution of the processes
2323
24+
The flow library itself constructs processes, channels and flows. The
25+
user provides configuration data and process logic (step-fns) that
26+
specify how the flow should work.
27+
2428
A flow is constructed from flow configuration data which defines a
2529
directed graph of processes and the connections between
2630
them. Processes describe their I/O requirements and the
@@ -30,9 +34,9 @@
3034
policy decisions regarding process settings, threading, buffering etc.
3135
3236
It is expected that applications will rarely define instances of the
33-
process protocol but instead use the API functions here, 'process'
34-
and 'step-process', that implement the process protocol in terms of
35-
calls to ordinary functions that might include no communication or
37+
process protocol but instead use the API function 'process' that
38+
implements the process protocol in terms of calls to ordinary
39+
functions (step-fns) that might include no communication or
3640
core.async code. In this way the library helps you achieve a strict
3741
separation of your application logic from its execution,
3842
communication, lifecycle, error handling and monitoring.
@@ -44,6 +48,10 @@
4448
the library itself as ::flow/xyz, where ::flow is an alias for
4549
clojure.core.async.flow
4650
51+
Flows support the Clojure 'datafy' protocol to support
52+
observability. See also the 'ping' and 'ping-proc' fns for a live
53+
view of processes.
54+
4755
A process is represented in the flow definition by an implementation
4856
of spi/ProcLauncher that starts it. See the spi docs for
4957
details."
@@ -82,7 +90,7 @@
8290

8391
(defn start
8492
"starts the entire flow from init values. The processes start paused.
85-
Call 'resume' or 'resume-proc' to start flow. returns a map with keys:
93+
Call 'resume' or 'resume-proc' to start flow. Returns a map with keys:
8694
8795
:report-chan - a core.async chan for reading.'ping' reponses
8896
will show up here, as will any explicit ::flow/report outputs
@@ -140,19 +148,25 @@
140148
[g [pid io-id :as coord] msgs] (g/inject g coord msgs))
141149

142150
(defn process
143-
"Given a function of four arities (0-3), or a map of functions
144-
corresponding thereto (described below), returns a launcher that
145-
creates a process compliant with the process protocol (see the
146-
spi/ProcLauncher doc).
147-
148-
The possible arities/entries for fn/map are 0 - :describe, 1
149-
- :init, 2 - :transition and 3 - :transform. This is the core
150-
facility for defining the logic for processes via ordinary
151-
functions. Using a var holding a fn as the 'fn' is the preferred
152-
method for defining a proc, as it enables hot-code-reloading of the
153-
proc logic in a flow, and better names in datafy. You can use the
154-
map form to compose the proc logic from disparate functions or to
155-
leverage the optionality of some of the entry points.
151+
"Given a function of four arities (0-3), aka the 'step-fn', or a map
152+
of functions corresponding thereto (described below), returns a
153+
launcher that creates a process compliant with the process
154+
protocol (see the spi/ProcLauncher doc).
155+
156+
The possible arities/entries for the step-fn/map are
157+
158+
0 - :describe,
159+
1 - :init,
160+
2 - :transition
161+
3 - :transform.
162+
163+
This is the core facility for defining the logic for processes via
164+
ordinary functions. Using a var holding a fn as the 'step-fn' is the
165+
preferred method for defining a proc, as it enables
166+
hot-code-reloading of the proc logic in a flow, and better names in
167+
datafy. You can use the map form to compose the proc logic from
168+
disparate functions or to leverage the optionality of some of the
169+
entry points.
156170
157171
arity 0, or :describe - required, () -> description
158172
where description is a map with keys :params :ins and :outs, each of which
@@ -178,7 +192,10 @@
178192
179193
init will be called once by the process to establish any initial
180194
state. The arg-map will be a map of param->val, as supplied in the
181-
flow def. init must be provided if 'describe' returns :params.
195+
flow def. The key ::flow/pid will be added, mapped to the pid
196+
associated with the process (useful e.g. if the process wants to
197+
refer to itself in reply-to coordinates). init must be provided if
198+
'describe' returns :params.
182199
183200
Optionally, a returned init state may contain the
184201
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
@@ -247,7 +264,7 @@
247264

248265
(defn lift*->step
249266
"given a fn f taking one arg and returning a collection of non-nil
250-
values, create a 'step' fn as needed by step-process, with one input
267+
values, creates a step fn as needed by process, with one input
251268
and one output (named :in and :out), and no state."
252269
[f]
253270
(fn

src/main/clojure/clojure/core/async/flow/impl.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@
143143
(let [chan-map (fn [ks coll] (zipmap (keys ks) (map #(coll [pid %]) (keys ks))))
144144
control-tap (async/chan 10)]
145145
(async/tap control-mult control-tap)
146-
(spi/start proc {:pid pid :args args :resolver resolver
146+
(spi/start proc {:pid pid :args (assoc args ::flow/pid pid) :resolver resolver
147147
:ins (assoc (chan-map ins in-chans)
148148
::flow/control control-tap)
149149
:outs (assoc (chan-map outs out-chans)

src/main/clojure/clojure/core/async/impl/buffers.clj

+1-3
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,4 @@
121121
SlidingBuffer
122122
(datafy [b] (datafy-buffer b))
123123
PromiseBuffer
124-
(datafy [b] (datafy-buffer b))
125-
Object
126-
(datafy [b] nil))
124+
(datafy [b] (datafy-buffer b)))

0 commit comments

Comments
 (0)