Skip to content

Commit 516c180

Browse files
committed
Fix a little error in buffers, add concurrent, add threadpool
1 parent 3765092 commit 516c180

File tree

4 files changed

+252
-2
lines changed

4 files changed

+252
-2
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
2+
;; The use and distribution terms for this software are covered by the
3+
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
;; which can be found in the file epl-v10.html at the root of this distribution.
5+
;; By using this software in any fashion, you are agreeing to be bound by
6+
;; the terms of this license.
7+
;; You must not remove this notice, or any other, from this software.
8+
9+
(ns ^{:skip-wiki true}
10+
clojure.core.async.impl.buffers
11+
(:require [clojure.core.async.impl.protocols :as impl])
12+
(:import [System.Collections.Generic |LinkedList`1[System.Object]|] ;;; [java.util LinkedList] -- I don't even know why this works. You can use |LinkedList`1| below.
13+
[clojure.lang Counted]))
14+
15+
(set! *warn-on-reflection* true)
16+
17+
(deftype FixedBuffer [^|LinkedList`1| buf ^long n]
18+
impl/ABuffer ;;; Buffer
19+
(full? [_this]
20+
(>= (.Count buf) n)) ;;; .size
21+
(remove! [_this]
22+
(let [v (.Last buf)] (.RemoveLast buf) (when v (.Value v)))) ;;; .removeLast
23+
(add!* [this itm]
24+
(.AddFirst buf itm) ;;; .addFirst
25+
this)
26+
(close-buf! [_this])
27+
Counted
28+
(count [_this]
29+
(.Count buf))) ;;; .size
30+
31+
(defn fixed-buffer [^long n]
32+
(FixedBuffer. (|LinkedList`1|.) n))
33+
34+
35+
(deftype DroppingBuffer [^|LinkedList`1| buf ^long n]
36+
impl/UnblockingBuffer
37+
impl/ABuffer ;;; Buffer
38+
(full? [_this]
39+
false)
40+
(remove! [_this]
41+
(let [v (.Last buf)] (.RemoveLast buf) (when v (.Value v)))) ;;; .removeLast
42+
(add!* [this itm]
43+
(when-not (>= (.Count buf) n) ;;; .size
44+
(.AddFirst buf itm)) ;;; .addFirst
45+
this)
46+
(close-buf! [_this])
47+
Counted
48+
(count [_this]
49+
(.Count buf))) ;;; .size
50+
51+
(defn dropping-buffer [n]
52+
(DroppingBuffer. (|LinkedList`1|.) n))
53+
54+
(deftype SlidingBuffer [^|LinkedList`1| buf ^long n]
55+
impl/UnblockingBuffer
56+
impl/ABuffer ;;; Buffer
57+
(full? [_this]
58+
false)
59+
(remove! [_this]
60+
(let [v (.Last buf)] (.RemoveLast buf) (when v (.Value v)))) ;;; .removeLast
61+
(add!* [this itm]
62+
(when (= (.Count buf) n) ;;; .size
63+
(impl/remove! this))
64+
(.AddFirst buf itm) ;;; .addFirst
65+
this)
66+
(close-buf! [_this])
67+
Counted
68+
(count [_this]
69+
(.Count buf))) ;;; .size
70+
71+
(defn sliding-buffer [n]
72+
(SlidingBuffer. (|LinkedList`1|.) n))
73+
74+
(defonce ^:private NO-VAL (Object.))
75+
(defn- undelivered? [val]
76+
(identical? NO-VAL val))
77+
78+
(deftype PromiseBuffer [^:unsynchronized-mutable val]
79+
impl/UnblockingBuffer
80+
impl/ABuffer ;;; Buffer
81+
(full? [_]
82+
false)
83+
(remove! [_]
84+
val)
85+
(add!* [this itm]
86+
(when (undelivered? val)
87+
(set! val itm))
88+
this)
89+
(close-buf! [_]
90+
(when (undelivered? val)
91+
(set! val nil)))
92+
Counted
93+
(count [_]
94+
(if (undelivered? val) 0 1)))
95+
96+
(defn promise-buffer []
97+
(PromiseBuffer. NO-VAL))

src/main/clojure/clojure/core/async/impl/concurrent.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
ThreadFactory
5959
(newThread [_this runnable]
6060
(let [body (if init-fn
61-
(gen-delegate ThreadStart [] (init-fn) runnable)
62-
(gen-delegate ThreadStart [] runnable))
61+
(gen-delegate ThreadStart [] (init-fn) (runnable))
62+
(gen-delegate ThreadStart [] (runnable)))
6363
t (Thread. ^ThreadStart body)]
6464
(doto t
6565
(.set_Name (format name-format (swap! counter inc)))
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
2+
;; The use and distribution terms for this software are covered by the
3+
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
;; which can be found in the file epl-v10.html at the root of this distribution.
5+
;; By using this software in any fashion, you are agreeing to be bound by
6+
;; the terms of this license.
7+
;; You must not remove this notice, or any other, from this software.
8+
9+
(ns clojure.core.async.impl.exec.threadpool
10+
(:require [clojure.core.async.impl.protocols :as impl]
11+
[clojure.core.async.impl.concurrent :as conc])
12+
(:import [System.Threading ThreadPool WaitCallback])) ;;; [java.util.concurrent Executors]
13+
14+
(set! *warn-on-reflection* true)
15+
16+
(def ^:private pool-size
17+
"Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a
18+
delay so property can be set from code after core.async namespace is loaded but before
19+
any use of the async thread pool."
20+
(delay (let [v (Environment/GetEnvironmentVariable "clojure.core.async.pool-size") ;;; (or (Long/getLong "clojure.core.async.pool-size") 8)
21+
m (long 0)
22+
b (Int64/TryParse ^String v (by-ref m))]
23+
(if b m 8))))
24+
25+
;;;(defn thread-pool-executor
26+
;;; ([]
27+
;;; (thread-pool-executor nil))
28+
;;; ([init-fn]
29+
;;; (let [executor-svc (Executors/newFixedThreadPool
30+
;;; @pool-size
31+
;;; (conc/counted-thread-factory "async-dispatch-%d" true
32+
;;; {:init-fn init-fn}))]
33+
;;; (reify impl/Executor
34+
;;; (impl/exec [_ r]
35+
;;; (.execute executor-svc ^Runnable r))))))
36+
37+
;;; Given that we are not implementing our own fixed-size thread pool but, rather, using the system thread pool,
38+
;;; here we need only reify impl/Executor to queue a method to the thread pool.
39+
40+
(defn thread-pool-executor
41+
([]
42+
(thread-pool-executor nil))
43+
([init-fn]
44+
(reify impl/Executor
45+
(impl/exec [_ r]
46+
(ThreadPool/QueueUserWorkItem (gen-delegate WaitCallback [_] (when init-fn (init-fn)) (r)))))))
47+
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
2+
;; The use and distribution terms for this software are covered by the
3+
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
;; which can be found in the file epl-v10.html at the root of this distribution.
5+
;; By using this software in any fashion, you are agreeing to be bound by
6+
;; the terms of this license.
7+
;; You must not remove this notice, or any other, from this software.
8+
9+
(ns clojure.core.async.buffers-test
10+
(:require [clojure.test :refer :all]
11+
[clojure.core.async.impl.buffers :refer :all]
12+
[clojure.core.async.impl.protocols :refer [full? add! remove! close-buf!]]))
13+
14+
(defmacro throws? [expr]
15+
`(try
16+
~expr
17+
false
18+
(catch Exception _# true))) ;;; Throwable
19+
20+
(deftest fixed-buffer-tests
21+
(let [fb (fixed-buffer 2)]
22+
(is (= 0 (count fb)))
23+
24+
(add! fb :1)
25+
(is (= 1 (count fb)))
26+
27+
(add! fb :2)
28+
(is (= 2 (count fb)))
29+
30+
(is (= :1 (remove! fb)))
31+
(is (not (full? fb)))
32+
33+
(is (= 1 (count fb)))
34+
(is (= :2 (remove! fb)))
35+
36+
(is (= 0 (count fb)))
37+
(is (throws? (remove! fb)))))
38+
39+
(deftest dropping-buffer-tests
40+
(let [fb (dropping-buffer 2)]
41+
(is (= 0 (count fb)))
42+
43+
(add! fb :1)
44+
(is (= 1 (count fb)))
45+
46+
(add! fb :2)
47+
(is (= 2 (count fb)))
48+
49+
(is (not (full? fb)))
50+
(is (not (throws? (add! fb :3))))
51+
(is (= 2 (count fb)))
52+
53+
(is (= :1 (remove! fb)))
54+
(is (not (full? fb)))
55+
56+
(is (= 1 (count fb)))
57+
(is (= :2 (remove! fb)))
58+
59+
(is (= 0 (count fb)))
60+
(is (throws? (remove! fb)))))
61+
62+
(deftest sliding-buffer-tests
63+
(let [fb (sliding-buffer 2)]
64+
(is (= 0 (count fb)))
65+
66+
(add! fb :1)
67+
(is (= 1 (count fb)))
68+
69+
(add! fb :2)
70+
(is (= 2 (count fb)))
71+
72+
(is (not (full? fb)))
73+
(is (not (throws? (add! fb :3))))
74+
(is (= 2 (count fb)))
75+
76+
(is (= :2 (remove! fb)))
77+
(is (not (full? fb)))
78+
79+
(is (= 1 (count fb)))
80+
(is (= :3 (remove! fb)))
81+
82+
(is (= 0 (count fb)))
83+
(is (throws? (remove! fb)))))
84+
85+
(deftest promise-buffer-tests
86+
(let [pb (promise-buffer)]
87+
(is (= 0 (count pb)))
88+
89+
(add! pb :1)
90+
(is (= 1 (count pb)))
91+
92+
(add! pb :2)
93+
(is (= 1 (count pb)))
94+
95+
(is (not (full? pb)))
96+
(is (not (throws? (add! pb :3))))
97+
(is (= 1 (count pb)))
98+
99+
(is (= :1 (remove! pb)))
100+
(is (not (full? pb)))
101+
102+
(is (= 1 (count pb)))
103+
(is (= :1 (remove! pb)))
104+
105+
(is (= nil (close-buf! pb)))
106+
(is (= :1 (remove! pb)))))

0 commit comments

Comments
 (0)