1
+ (ns clojure.core.pipeline-test
2
+ (:require [clojure.test :refer (deftest is are )]
3
+ [clojure.core.async :refer [<! <!! >!! go-loop thread chan close! to-chan!
4
+ pipeline pipeline-blocking pipeline-async]]))
5
+
6
+ ; ; in Clojure 1.7, use (map f) instead of this
7
+ (defn mapping [f]
8
+ (fn [f1]
9
+ (fn
10
+ ([] (f1 ))
11
+ ([result] (f1 result))
12
+ ([result input]
13
+ (f1 result (f input)))
14
+ ([result input & inputs]
15
+ (f1 result (apply f input inputs))))))
16
+
17
+ (defn pipeline-tester [pipeline-fn n inputs xf]
18
+ (let [cin (to-chan! inputs)
19
+ cout (chan 1 )]
20
+ (pipeline-fn n cout xf cin)
21
+ (<!! (go-loop [acc []]
22
+ (let [val (<! cout)]
23
+ (if (not (nil? val))
24
+ (recur (conj acc val))
25
+ acc))))))
26
+
27
+ (def identity-mapping (mapping identity))
28
+ (defn identity-async [v ch] (thread (>!! ch v) (close! ch)))
29
+
30
+ (deftest test-sizes
31
+ (are [n size]
32
+ (let [r (range size)]
33
+ (and
34
+ (= r (pipeline-tester pipeline n r identity-mapping))
35
+ (= r (pipeline-tester pipeline-blocking n r identity-mapping))
36
+ (= r (pipeline-tester pipeline-async n r identity-async))))
37
+ 1 0
38
+ 1 10
39
+ 10 10
40
+ 20 10
41
+ 5 1000 ))
42
+
43
+ (deftest test-close?
44
+ (doseq [pf [pipeline pipeline-blocking]]
45
+ (let [cout (chan 1 )]
46
+ (pf 5 cout identity-mapping (to-chan! [1 ]) true )
47
+ (is (= 1 (<!! cout)))
48
+ (is (= nil (<!! cout))))
49
+ (let [cout (chan 1 )]
50
+ (pf 5 cout identity-mapping (to-chan! [1 ]) false )
51
+ (is (= 1 (<!! cout)))
52
+ (>!! cout :more )
53
+ (is (= :more (<!! cout))))
54
+ (let [cout (chan 1 )]
55
+ (pf 5 cout identity-mapping (to-chan! [1 ]) nil )
56
+ (is (= 1 (<!! cout)))
57
+ (>!! cout :more )
58
+ (is (= :more (<!! cout))))))
59
+
60
+ (deftest test-ex-handler
61
+ (doseq [pf [pipeline pipeline-blocking]]
62
+ (let [cout (chan 1 )
63
+ chex (chan 1 )
64
+ ex-mapping (mapping (fn [x] (if (= x 3 ) (throw (ex-info " err" {:data x})) x)))
65
+ ex-handler (fn [e] (>!! chex e) :err )]
66
+ (pf 5 cout ex-mapping (to-chan! [1 2 3 4 ]) true ex-handler)
67
+ (is (= 1 (<!! cout)))
68
+ (is (= 2 (<!! cout)))
69
+ (is (= :err (<!! cout)))
70
+ (is (= 4 (<!! cout)))
71
+ (is (= {:data 3 } (ex-data (<!! chex)))))))
72
+
73
+ (defn multiplier-async [v ch]
74
+ (thread
75
+ (dotimes [i v]
76
+ (>!! ch i))
77
+ (close! ch)))
78
+
79
+ (deftest test-af-multiplier
80
+ (is (= [0 0 1 0 1 2 0 1 2 3 ]
81
+ (pipeline-tester pipeline-async 2 (range 1 5 ) multiplier-async))))
82
+
83
+ (def sleep-mapping (mapping #(do (System.Threading.Thread/Sleep %) %))) ; ;; Thread/sleep
84
+
85
+ (deftest test-blocking
86
+ (let [times [2000 50 1000 100 ]]
87
+ (is (= times (pipeline-tester pipeline-blocking 2 times sleep-mapping)))))
88
+
89
+ (defn slow-fib [n]
90
+ (if (< n 2 ) n (+ (slow-fib (- n 1 )) (slow-fib (- n 2 )))))
91
+
92
+ (deftest test-compute
93
+ (let [input (take 50 (cycle (range 15 38 )))]
94
+ (is (= (slow-fib (last input))
95
+ (last (pipeline-tester pipeline 8 input (mapping slow-fib)))))))
96
+
97
+ (deftest test-async
98
+ (is (= (range 1 101 )
99
+ (pipeline-tester pipeline-async 1 (range 100 )
100
+ (fn [v ch] (future (>!! ch (inc v)) (close! ch)))))))
0 commit comments