Skip to content

Commit 5103945

Browse files
committed
Add lab.clj and lab_test.clj -- fails on two multiplexer tests, leave for later.
1 parent 0508695 commit 5103945

File tree

2 files changed

+197
-0
lines changed

2 files changed

+197
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
2+
;; The use and distribution terms for this software are covered by the
3+
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
;; which can be found in the file epl-v10.html at the root of this distribution.
5+
;; By using this software in any fashion, you are agreeing to be bound by
6+
;; the terms of this license.
7+
;; You must not remove this notice, or any other, from this software.
8+
9+
(ns ^{:skip-wiki true} clojure.core.async.lab
10+
"core.async HIGHLY EXPERIMENTAL feature exploration
11+
12+
Caveats:
13+
14+
1. Everything defined in this namespace is experimental, and subject
15+
to change or deletion without warning.
16+
17+
2. Many features provided by this namespace are highly coupled to
18+
implementation details of core.async. Potential features which
19+
operate at higher levels of abstraction are suitable for inclusion
20+
in the examples.
21+
22+
3. Features provided by this namespace MAY be promoted to
23+
clojure.core.async at a later point in time, but there is no
24+
guarantee any of them will."
25+
(:require [clojure.core.async :as async]
26+
[clojure.core.async.impl.protocols :as impl]
27+
[clojure.core.async.impl.mutex :as mutex]
28+
[clojure.core.async.impl.dispatch :as dispatch]
29+
[clojure.core.async.impl.channels :as channels])
30+
(:import [System.Collections.Generic |HashSet`1[System.Object]| |ISet`1[System.Object]| ] ;;; [java.util HashSet Set Collection] -- I don't even know why this works. You can use |HashSet`1| below.
31+
[clojure.core.async.impl.mutex ILock])) ;;; [java.util.concurrent.locks Lock]
32+
33+
(deftype MultiplexingReadPort
34+
[^ILock mutex ^|ISet`1| read-ports] ;;; ^Lock ^Set
35+
impl/ReadPort
36+
(take! [this handler]
37+
(if (empty? read-ports)
38+
(channels/box nil)
39+
(do
40+
(.lock mutex)
41+
(let [^ILock handler handler ;;; ^Lock
42+
commit-handler (fn []
43+
(.lock handler)
44+
(let [take-cb (and (impl/active? handler) (impl/commit handler))]
45+
(.unlock handler)
46+
take-cb))
47+
fret (fn [[val alt-port]]
48+
(if (nil? val)
49+
(do (.lock mutex)
50+
(.Remove read-ports alt-port) ;;; .remove
51+
(.unlock mutex)
52+
(impl/take! this handler))
53+
(when-let [take-cb (commit-handler)]
54+
(dispatch/run #(take-cb val)))))
55+
current-ports (seq read-ports)]
56+
(if-let [alt-res (async/do-alts fret current-ports {})]
57+
(let [[val alt-port] @alt-res]
58+
(if (nil? val)
59+
(do (.Remove read-ports alt-port) ;;; .remove
60+
(.unlock mutex)
61+
(recur handler))
62+
(do (.unlock mutex)
63+
(when-let [take-cb (commit-handler)]
64+
(dispatch/run #(take-cb val))))))
65+
(do
66+
(.unlock mutex)
67+
nil)))))))
68+
69+
(defn multiplex
70+
"Returns a multiplexing read port which, when read from, produces a
71+
value from one of ports.
72+
73+
If at read time only one port is available to be read from, the
74+
multiplexing port will return that value. If multiple ports are
75+
available to be read from, the multiplexing port will return one
76+
value from a port chosen non-deterministicly. If no port is
77+
available to be read from, parks execution until a value is
78+
available."
79+
[& ports]
80+
(->MultiplexingReadPort (mutex/mutex) (|HashSet`1|. ^System.Collections.IEnumerable ports))) ;;; (HashSet. ^Collection ports)
81+
82+
(defn- broadcast-write
83+
[port-set val handler]
84+
(if (= (count port-set) 1)
85+
(impl/put! (first port-set) val handler)
86+
(let [clauses (map (fn [port] [port val]) port-set)
87+
recur-step (fn [[_ port]] (broadcast-write (disj port-set port) val handler))]
88+
(when-let [alt-res (async/do-alts recur-step clauses {})]
89+
(recur (disj port-set (second @alt-res))
90+
val
91+
handler)))))
92+
93+
(deftype BroadcastingWritePort
94+
[write-ports]
95+
impl/WritePort
96+
(put! [port val handler]
97+
(broadcast-write write-ports val handler)))
98+
99+
(defn broadcast
100+
"Returns a broadcasting write port which, when written to, writes
101+
the value to each of ports.
102+
103+
Writes to the broadcasting port will park until the value is written
104+
to each of the ports used to create it. For this reason, it is
105+
strongly advised that each of the underlying ports support buffered
106+
writes."
107+
[& ports]
108+
(->BroadcastingWritePort (set ports)))
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
(ns clojure.core.async.lab-test
2+
(:require
3+
[clojure.test :refer [deftest is]]
4+
[clojure.core.async.lab :refer [broadcast multiplex]]
5+
[clojure.core.async :as async]))
6+
7+
;;; Adding a little helper to deal with all the threading calls.
8+
;;; Someday should add this to the clr-specific functions.
9+
10+
(defn start-thread-on [run-fn]
11+
(doto (System.Threading.Thread.
12+
^System.Threading.ThreadStart
13+
(gen-delegate System.Threading.ThreadStart [] (run-fn)))
14+
(.Start)))
15+
16+
17+
(deftest multiplex-test
18+
#_(is (apply = (let [even-chan (async/chan) ;;; TODO -- debug this -gets a "can't cast Boolean to Future
19+
odd-chan (async/chan)
20+
muxer (multiplex even-chan odd-chan)
21+
odds (filter odd? (range 10))
22+
evens (filter even? (range 10))
23+
odd-fn #(doseq [odd odds]
24+
(async/>!! odd-chan odd))
25+
_odd-pusher (start-thread-on odd-fn) ;;; (doto (Thread. ^Runnable odd-fn) (.start))
26+
even-fn #(doseq [even evens]
27+
(async/>!! even-chan even))
28+
_even-pusher (start-thread-on even-fn) ;;; (doto (Thread. ^Runnable even-fn) (.start))
29+
expected (set (range 10))
30+
observed (set (for [_ (range 10)] (async/<!! muxer)))]
31+
[expected observed]))
32+
"Multiplexing multiple channels returns a channel which returns
33+
the values written to each.")
34+
#_(is (let [short-chan (async/chan) ;;; TODO -- debug this -gets a "can't cast Boolean to Future
35+
long-chan (async/chan)
36+
muxer (multiplex short-chan long-chan)
37+
long-fn #(do (dotimes [i 10000]
38+
(async/>!! long-chan i))
39+
(async/close! short-chan))
40+
_long-pusher (start-thread-on long-fn) ;;; (doto (Thread. ^Runnable long-fn) (.start))
41+
short-fn #(do (dotimes [i 10]
42+
(async/>!! short-chan i))
43+
(async/close! short-chan))
44+
_short-pusher (start-thread-on short-fn) ;;; (doto (Thread. ^Runnable short-fn) (.start))
45+
observed (for [_ (range 10010)] (async/<!! muxer))]
46+
(every? identity observed))
47+
"A closed channel will deliver nil, but the multiplexed channel
48+
will never deliver nil until all channels are closed.")
49+
(is (apply = (let [chans (take 5 (repeatedly #(async/chan)))
50+
muxer (apply multiplex chans)]
51+
(doseq [chan chans]
52+
(async/close! chan))
53+
[nil (async/<!! muxer)]))
54+
"When all of a multiplexer's channels are closed, it behaves
55+
like a closed channel on read."))
56+
57+
(deftest broadcast-test
58+
(is (apply = (let [broadcast-receivers (repeatedly 5 #(async/chan 1))
59+
broadcaster (apply broadcast broadcast-receivers)
60+
_ (async/>!! broadcaster :foo)
61+
expected (repeat 5 :foo)
62+
observed (doall (map async/<!! broadcast-receivers))]
63+
[expected observed]))
64+
"Broadcasting to multiple channels returns a channel which will
65+
write to all the target channels.")
66+
(is (apply = (let [broadcast-receivers (repeatedly 5 async/chan)
67+
broadcaster (apply broadcast broadcast-receivers)
68+
read-channels (take 4 broadcast-receivers)
69+
_ (future (async/>!! broadcaster :foo)
70+
(async/>!! broadcaster :bar))
71+
first-reads (doall (map async/<!! read-channels))
72+
timeout-channel (async/timeout 500)
73+
alt-read (async/alts!! (conj read-channels timeout-channel))
74+
expected [(repeat 4 :foo) [nil timeout-channel]]
75+
observed [first-reads alt-read]]
76+
(async/<!! (last broadcast-receivers))
77+
(doseq [channel broadcast-receivers]
78+
(async/<!! channel))
79+
[expected observed]))
80+
"Broadcasts block further writes if one of the channels cannot
81+
complete its write.")
82+
(is (apply = (let [broadcast-receivers (repeatedly 5 #(async/chan 100))
83+
broadcaster (apply broadcast broadcast-receivers)
84+
_ (future (dotimes [i 100]
85+
(async/>!! broadcaster i)))
86+
observed (for [_ (range 100)]
87+
(async/<!! (first broadcast-receivers)))
88+
expected (range 100)]
89+
[expected observed])) "When all channels are sufficiently buffered, reads on one channel are not throttled by reads from other channels."))

0 commit comments

Comments
 (0)