Skip to content

Commit ce343e3

Browse files
authored
Add dataflow implementation
Formerly corelli
1 parent 2d11df7 commit ce343e3

File tree

7 files changed

+647
-0
lines changed

7 files changed

+647
-0
lines changed

Diff for: README.md

+4
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ channel. Produces an intermediate state only when consuming from output
153153
channel. Useful for maintaining internal state while processing a
154154
stream.
155155

156+
### Dataflow
157+
158+
See [dataflow](./doc/dataflow.org)
159+
156160
## Usage
157161

158162
### Dependency

Diff for: doc/dataflow.org

+188
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
* Dataflow
2+
3+
This namespace is designed to compose simple core.async processes.
4+
It's probably suitable for micro-services.
5+
6+
The configuration and data model are inspired by [[https://github.com/onyx-platform/onyx][Onyx]].
7+
8+
** Motivation
9+
10+
There is usually little need to go through the logistics and ceremony, when
11+
using core.async, to manage the channels, puts and takes, and in most instances,
12+
lifecycle, as it naturally emerges from the topology.
13+
14+
Processes' topologies are usually an emergent phenomenon and not explicitly stated.
15+
There is a mix between topology, business logic, and low level async APIs.
16+
17+
The idea is to separate the topology of the process from logic as much as
18+
possible by providing a data language to describe the data flow, and functions
19+
and other vars are to be resolved when "compiling" the model.
20+
21+
** Data Model
22+
23+
- Edges: core.async channels
24+
- Vertices: processing units, connected by channels. Can be pipes, drivers, sinks.
25+
26+
The graph is describe in terms of two collections:
27+
28+
- Edges: data describing only the channels, including buffer types, buffer functions, size and transducers.
29+
- Nodes: data describing a pipeline between two channels, mult, producer or consumer.
30+
31+
*** Buffers
32+
33+
#+begin_src clojure
34+
{::buffer/type ::buffer/blocking
35+
::buffer/size 8}
36+
37+
{::buffer/type ::buffer/sliding
38+
::buffer/size 8}
39+
40+
{::buffer/type ::buffer/dropping
41+
::buffer/size 8}
42+
#+end_src
43+
44+
*** Channels
45+
46+
#+begin_src clojure
47+
{::chan/name :a
48+
::chan/type ::chan/simple}
49+
50+
{::chan/name :b
51+
::chan/type ::chan/sized
52+
::chan/size 8}
53+
54+
{::chan/name :c
55+
::chan/type ::chan/buffered
56+
::chan/buffer {::buffer/type ::buffer/blocking
57+
::buffer/size 8}}
58+
#+end_src
59+
60+
** Extension
61+
62+
*** Buffers
63+
64+
#+begin_src clojure
65+
(defmethod buffer/-type ::your-new-type [_] ::spec-for-your-type)
66+
67+
(defmethod buffer/-compile ::your-new-type
68+
[{:keys [:buffer/arg1 :buffer/arg2]}]
69+
(your-buffer-fn arg1 arg2))
70+
71+
;;; Example from buffer namespace
72+
73+
(defmethod -compile ::dropping [{:keys [::size]}] (a/dropping-buffer size))
74+
#+end_src
75+
76+
*** Channels
77+
78+
#+begin_src clojure
79+
(defmethod chan/-type ::your-new-type [_] ::spec-for-your-type)
80+
81+
(defmethod chan/-compile ::your-new-type
82+
[{:keys [:chan/arg1 :chan/arg2]}]
83+
(your-chan-fn arg1 arg2))
84+
85+
;; Example from channel namespace
86+
87+
(defmethod -compile ::buffered [{:keys [::buffer]}] (a/chan (buffer/-compile buffer)))
88+
#+end_src
89+
90+
*** Worker nodes
91+
92+
Worker nodes compilers also take an environment argument which contains the channels
93+
94+
#+begin_src clojure
95+
(defmethod node/-type ::your-new-type [_] ::spec-for-your-type)
96+
97+
(defmethod node/-compile ::your-new-type
98+
[{:keys [:node/arg1 :node/arg2]} env]
99+
(your-node-fn arg1 arg2))
100+
101+
;; Example from node namespace
102+
103+
(defmethod -compile ::pipeline-blocking
104+
[{{to ::to from ::from size ::size xf ::xf} ::pipeline} env]
105+
(a/pipeline-blocking size (env to) xf (env from)))
106+
#+end_src
107+
108+
** Usage
109+
110+
*** Require dataflow namespaces
111+
112+
#+begin_src clojure
113+
(require '[more.async.dataflow.node :as node]
114+
'[more.async.dataflow.channel :as chan]
115+
'[more.async.dataflow.buffer :as buffer]
116+
'[more.async.dataflow :as flow])
117+
#+end_src
118+
119+
*** Define a model
120+
121+
- Define model with channels and nodes (can be verified using spec).
122+
- Define the required vars.
123+
- Validate the model using the ~::flow/model~ spec.
124+
- Try compiling the model using ~compile-model~.
125+
126+
*** Example
127+
128+
#+begin_src clojure
129+
(def model
130+
{::channels
131+
[{::chan/name :in
132+
::chan/type ::chan/sized
133+
::chan/size 1}
134+
{::chan/name :out
135+
::chan/type ::chan/sized
136+
::chan/size 1}]
137+
::nodes
138+
[
139+
140+
{::node/name :producer
141+
::node/type ::node/produce
142+
::node/produce
143+
{::node/to :in
144+
::node/async true
145+
::node/fn (let [a (atom 0)]
146+
(fn drive []
147+
(Thread/sleep 1000)
148+
(swap! a inc)))}}
149+
150+
{::node/name :pipeline
151+
::node/type ::node/pipeline-blocking
152+
::node/pipeline
153+
{::node/from :in
154+
::node/to :out
155+
::node/size 4
156+
::node/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}}
157+
158+
{::node/name :consumer
159+
::node/type ::node/consume
160+
::node/consume
161+
{::node/from :out
162+
::node/fn (fn [x] (println :OUT x))
163+
::node/async? true}}]})
164+
165+
(s/valid? ::flow/channels (::channels model))
166+
167+
(s/valid? ::flow/nodes (::nodes model))
168+
169+
(s/valid? ::flow/model model)
170+
171+
(s/valid? ::flow/connected model)
172+
173+
(def system (compile-model model))
174+
175+
(a/close! (:in (::channels system)))
176+
#+end_src
177+
178+
** Status
179+
180+
Experimental. Looking for user reports.
181+
182+
** Roadmap
183+
184+
- [ ] Tests
185+
- [ ] Analyze the topology to find any dangling channels or disconnected pipes before instancing the pipes.
186+
- [ ] Implement ~select~ based on ~alt!~ and/or ~alts!~.
187+
- [ ] Find an idiomatic way to connect a web handler as driver.
188+
- [ ] Refine specs, currently have no way to differentiate transducers from regular functions.

Diff for: src/main/clojure/more/async/dataflow.clj

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
(ns more.async.dataflow
2+
(:require
3+
[more.async.dataflow.node :as node]
4+
[more.async.dataflow.channel :as chan]
5+
[clojure.spec.alpha :as s]
6+
[clojure.core.async :as a]))
7+
8+
(defn connected?
9+
[node chans]
10+
(every?
11+
#(contains? chans (::node/name %))
12+
(node/ports node)))
13+
14+
(defn connected-model?
15+
[{::keys [channels nodes]}]
16+
(let [chans (into #{} (map ::chan/name) channels)]
17+
(every? #(connected? % chans) nodes)))
18+
19+
(s/def ::connected connected-model?)
20+
21+
(s/def ::node (s/multi-spec node/-type ::node/type))
22+
(s/def ::channel (s/multi-spec chan/-type ::chan/type))
23+
24+
(s/def ::channels (s/+ ::channel))
25+
(s/def ::nodes (s/+ ::node))
26+
27+
(s/def ::model (s/keys :req [::channels ::nodes]))
28+
29+
(s/def ::correct-model (s/and ::connected))
30+
31+
(defn- build
32+
[k f specs]
33+
(reduce
34+
(fn [m spec]
35+
(assoc m (get spec k) (f spec)))
36+
{}
37+
specs))
38+
39+
(defn -env
40+
[chans]
41+
(fn [lookup]
42+
(if-some [ch (get chans lookup)]
43+
ch
44+
(throw (ex-info (format "Channel %s not found" lookup) chans)))))
45+
46+
(defn compile-model
47+
[{::keys [channels nodes env]
48+
:or {env -env}}]
49+
(let [chans (build ::chan/name chan/-compile channels)
50+
env (env chans)
51+
workers (build ::node/name #(node/-compile % env) nodes)]
52+
{::channels chans ::nodes workers}))
53+
54+
(s/fdef compile-model
55+
:args (s/cat :model ::model))
56+
57+
(comment
58+
(def model
59+
{::channels
60+
[{::chan/name :in
61+
::chan/type ::chan/sized
62+
::chan/size 1}
63+
{::chan/name :out
64+
::chan/type ::chan/sized
65+
::chan/size 1}]
66+
::nodes
67+
[
68+
69+
{::node/name :producer
70+
::node/type ::node/produce
71+
::node/produce
72+
{::node/to :in
73+
::node/async true
74+
::node/fn (let [a (atom 0)]
75+
(fn drive []
76+
(Thread/sleep 1000)
77+
(swap! a inc)))}}
78+
79+
{::node/name :pipeline
80+
::node/type ::node/pipeline-blocking
81+
::node/pipeline
82+
{::node/from :in
83+
::node/to :out
84+
::node/size 4
85+
::node/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}}
86+
87+
{::node/name :consumer
88+
::node/type ::node/consume
89+
::node/consume
90+
{::node/from :out
91+
::node/fn (fn [x] (println :OUT x))
92+
::node/async? true}}]})
93+
94+
(s/valid? ::channels (::channels model))
95+
96+
(s/valid? ::nodes (::nodes model))
97+
98+
(s/valid? ::model model)
99+
100+
(s/valid? ::connected model)
101+
102+
(def system (compile-model model))
103+
104+
(a/close! (:in (::channels system))))

Diff for: src/main/clojure/more/async/dataflow/buffer.clj

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
(ns more.async.dataflow.buffer
2+
(:require
3+
[clojure.core.async :as a]
4+
[clojure.spec.alpha :as s]))
5+
6+
7+
(s/def ::size int?)
8+
9+
(s/def ::fixed-buffer (s/keys :req [::size]))
10+
11+
(defmulti -type ::type)
12+
(defmethod -type ::blocking [_] (s/keys :req [::size]))
13+
(defmethod -type ::sliding [_] (s/keys :req [::size]))
14+
(defmethod -type ::dropping [_] (s/keys :req [::size]))
15+
16+
(defmulti -compile ::type)
17+
(defmethod -compile ::blocking [{:keys [::size]}] (a/buffer size))
18+
(defmethod -compile ::sliding [{:keys [::size]}] (a/sliding-buffer size))
19+
(defmethod -compile ::dropping [{:keys [::size]}] (a/dropping-buffer size))
20+
21+
(comment
22+
(-compile {::type ::blocking
23+
::size 1})
24+
(s/explain-data ::type {::type ::simple
25+
::name :in}))

Diff for: src/main/clojure/more/async/dataflow/channel.clj

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
(ns more.async.dataflow.channel
2+
(:require
3+
[more.async.dataflow.buffer :as buffer]
4+
[clojure.spec.alpha :as s]
5+
[clojure.core.async :as a]
6+
[clojure.data]))
7+
8+
(s/def ::name (s/or :keyword keyword?
9+
:string string?
10+
:number number?
11+
:symbol symbol?))
12+
13+
(s/def ::buffer (s/multi-spec buffer/-type ::buffer/type))
14+
15+
(s/def ::buffered-chan (s/keys :req [::name ::buffer]))
16+
17+
(defmulti -type ::type)
18+
19+
(defmethod -type ::simple [_] (s/keys :req [::name]))
20+
(defmethod -type ::sized [_] (s/keys :req [::size ::name]))
21+
(defmethod -type ::buffered [_] (s/keys :req [::name ::buffer]))
22+
23+
(s/def ::chan (s/multi-spec -type ::type))
24+
25+
(defmulti -compile ::type)
26+
27+
(defmethod -compile ::simple [_] (a/chan))
28+
(defmethod -compile ::sized [{:keys [::size]}] (a/chan size))
29+
(defmethod -compile ::buffered [{:keys [::buffer]}] (a/chan (buffer/-compile buffer)))
30+
31+
(comment
32+
(-compile {::type ::buffered
33+
::buffer {::buffer/type ::buffer/sliding
34+
::buffer/size 2}}))
35+
36+
(comment
37+
38+
(s/explain-data ::chan {::type ::simple
39+
::name :in})
40+
41+
(s/explain-data ::type {::type ::sized
42+
::size 1
43+
::name :in})
44+
45+
(s/explain-data ::type {::type ::sized
46+
::name :in
47+
::size 1})
48+
49+
(s/explain-data ::buffer {::buffer/size 1
50+
::buffer/type ::buffer/blocking})
51+
52+
(s/explain-data ::type {::name :out
53+
::type ::buffered
54+
::buffer
55+
{::buffer/size 1
56+
::buffer/type ::buffer/blocking}}))

0 commit comments

Comments
 (0)