Skip to content

Commit baf10bc

Browse files
committed
Fix some type declarations in channels.clj, add timers_test.clj (works)
1 parent b9e8cd6 commit baf10bc

File tree

2 files changed

+42
-15
lines changed

2 files changed

+42
-15
lines changed

src/main/clojure/clojure/core/async/impl/channels.clj

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@
1212
[clojure.core.async.impl.dispatch :as dispatch]
1313
[clojure.core.async.impl.mutex :as mutex])
1414
(:import [System.Collections.Generic |LinkedList`1[System.Object]| |Queue`1[System.Object]|] ;;; [java.util LinkedList Queue]
15-
[clojure.core.async.impl.mutex Lock] ;;; [java.util.concurrent.locks Lock]
15+
[clojure.core.async.impl.mutex ILock] ;;; [java.util.concurrent.locks Lock]
1616
[clojure.lang IDeref]))
1717

1818
(set! *warn-on-reflection* true)
1919

2020
(defmacro assert-unlock [lock test msg]
2121
`(when-not ~test
2222
(.unlock ~lock)
23-
(throw (new InvalidOperationException (str "Assert failed: " ~msg "\n" (pr-str '~test)))))) ;;; AssertionError
23+
(throw (new InvalidOperationException (str "Assert failed: " ~msg "\n" (pr-str '~test)))))) ;;; AssertionError
2424

2525
(defn box [val]
2626
(reify IDeref
@@ -86,31 +86,31 @@
8686
;; this is more straightforward -- enumerator vs iterator again, but no delete
8787
(let [iter (.GetEnumerator puts)]
8888
(when (.MoveNext iter)
89-
(loop [[^Lock putter] (.Current iter)]
89+
(loop [[^ILock putter] (.Current iter)] ;;; ^Lock
9090
(let [put-cb (and (impl/active? putter) (impl/commit putter))]
9191
(.unlock putter)
9292
(when put-cb
9393
(dispatch/run (fn [] (put-cb true))))
9494
(when (.MoveNext iter)
9595
(recur (.Current iter)))))))
96-
(.Clear puts) ;;; .clear
96+
(.Clear puts) ;;; .clear
9797
(impl/close! this))
9898

9999
impl/WritePort
100100
(put!
101101
[this val handler]
102102
(when (nil? val)
103-
(throw (ArgumentException. "Can't put nil on channel"))) ;;; IllegalArgumentException
103+
(throw (ArgumentException. "Can't put nil on channel"))) ;;; IllegalArgumentException
104104
(.lock mutex)
105105
(cleanup this)
106106
(if @closed
107-
(let [^Lock handler handler]
107+
(let [^ILock handler handler] ;;; ^Lock
108108
(.lock handler)
109109
(when (impl/active? handler) (impl/commit handler))
110110
(.unlock handler)
111111
(.unlock mutex)
112112
(box false))
113-
(let [^Lock handler handler]
113+
(let [^ILock handler handler] ;;; ^Lock
114114
(if (and buf (not (impl/full? buf)) (not (= (.Count takes) 0))) ;;; (.isEmpty takes)
115115
(do
116116
(.lock handler)
@@ -135,7 +135,7 @@
135135
[take-cbs (loop [takers []
136136
curr-node (.First takes)]
137137
(if (and curr-node (pos? (count buf)))
138-
(let [^Lock taker curr-node
138+
(let [^ILock taker curr-node ;;; ^Lock
139139
next-node (.Next curr-node)]
140140
(.lock taker)
141141
(let [ret (and (impl/active? taker) (impl/commit taker))]
@@ -166,7 +166,7 @@
166166
nil))))
167167
(let ;;;[iter (.iterator takes)
168168
;;;[put-cb take-cb] (when (.hasNext iter)
169-
;;; (loop [^Lock taker (.next iter)]
169+
;;; (loop [^Lock taker (.next iter)]
170170
;;; (if (< (impl/lock-id handler) (impl/lock-id taker))
171171
;;; (do (.lock handler) (.lock taker))
172172
;;; (do (.lock taker) (.lock handler)))
@@ -181,7 +181,7 @@
181181
;;; (when (.hasNext iter)
182182
;;; (recur (.next iter)))))))]
183183
[[put-cb take-cb] (when (.First takes)
184-
(loop [^Lock taker (.First takes)
184+
(loop [^ILock taker (.First takes) ;;; ^Lock
185185
curr-node (.First takes)]
186186
(if (< (impl/lock-id handler) (impl/lock-id taker))
187187
(do (.lock handler) (.lock taker))
@@ -219,7 +219,7 @@
219219
(do
220220
(when (and (impl/active? handler) (impl/blockable? handler))
221221
(assert-unlock mutex
222-
(< (.Count puts) impl/MAX-QUEUE-SIZE) ;;; .size
222+
(< (.Count puts) impl/MAX-QUEUE-SIZE) ;;; .size
223223
(str "No more than " impl/MAX-QUEUE-SIZE
224224
" pending puts are allowed on a single channel."
225225
" Consider using a windowed buffer."))
@@ -232,7 +232,7 @@
232232
[this handler]
233233
(.lock mutex)
234234
(cleanup this)
235-
(let [^Lock handler handler
235+
(let [^ILock handler handler ;;; ^Lock
236236
commit-handler (fn []
237237
(.lock handler)
238238
(let [take-cb (and (impl/active? handler) (impl/commit handler))]
@@ -260,7 +260,7 @@
260260
(loop [cbs []
261261
curr-node (.First puts)]
262262
(let [next-node (.Next curr-node)
263-
[^Lock putter val] (.Value curr-node)]
263+
[^ILock putter val] (.Value curr-node)] ;;; ^Lock
264264
(.lock putter)
265265
(let [cb (and (impl/active? putter) (impl/commit putter))]
266266
(.unlock putter)
@@ -302,7 +302,7 @@
302302
(when (.First puts)
303303
(loop [curr-node (.First puts)]
304304
(let [next-node (.Next curr-node)
305-
[^Lock putter val] (.Value curr-node)]
305+
[^ILock putter val] (.Value curr-node)] ;;; ^Lock
306306
(if (< (impl/lock-id handler) (impl/lock-id putter))
307307
(do (.lock handler) (.lock putter))
308308
(do (.lock putter) (.lock handler)))
@@ -373,7 +373,7 @@
373373
(when (.First takes)
374374
(loop [curr-node (.First takes)]
375375
(let [next-node (.Next curr-node)
376-
^Lock taker (.Value curr-node)]
376+
^ILock taker (.Value curr-node)] ;;; ^Lock
377377
(.lock taker)
378378
(let [take-cb (and (impl/active? taker) (impl/commit taker))]
379379
(.unlock taker)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
(ns clojure.core.async.timers-test
2+
(:require [clojure.test :refer :all]
3+
[clojure.core.async.impl.timers :refer :all]
4+
[clojure.core.async :as async]))
5+
6+
(deftest timeout-interval-test
7+
(let [start-stamp (.Ticks DateTime/UtcNow) ;;; (System/currentTimeMillis)
8+
test-timeout (timeout 500)]
9+
(is (<= (+ start-stamp (* 500 TimeSpan/TicksPerMillisecond)) ;;; (+ start-stamp 500)
10+
(do (async/<!! test-timeout)
11+
(.Ticks DateTime/UtcNow))) ;;; (System/currentTimeMillis)
12+
"Reading from a timeout channel does not complete until the specified milliseconds have elapsed.")))
13+
14+
(deftest timeout-ordering-test
15+
(let [test-atom (atom [])
16+
timeout-channels [(timeout 800)
17+
(timeout 600)
18+
(timeout 700)
19+
(timeout 500)]
20+
threads (doall (for [i (range 4)]
21+
(let [f #(do (async/<!! (timeout-channels i))
22+
(swap! test-atom conj i))]
23+
(doto (System.Threading.Thread. ^System.Threading.ThreadStart (gen-delegate System.Threading.ThreadStart [] (f))) (.Start)))))] ;;; (Thread. ^Runnable f) (.start)
24+
(doseq [thread threads]
25+
(.Join ^System.Threading.Thread thread)) ;;; .join ^Thread
26+
(is (= @test-atom [3 1 2 0])
27+
"Timeouts close in order determined by their delays, not in order determined by their creation.")))

0 commit comments

Comments
 (0)