Skip to content

Commit d7df829

Browse files
committed
Make rx/action implement new OnSubscribe interface
1 parent bae0ffa commit d7df829

File tree

3 files changed

+32
-2
lines changed

3 files changed

+32
-2
lines changed

language-adaptors/rxjava-clojure/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im
4646
(rx/action [] (println "Sequence complete"))))
4747
```
4848

49+
## Using Observable/create
50+
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:
51+
52+
```clojure
53+
; A simple observable that emits 0..9 taking unsubscribe into account
54+
(Observable/create (rx/action [^rx.Subscriber s]
55+
(loop [i 0]
56+
(when (and (< i 10) (.isUnsubscribed s))
57+
(.onNext s i)
58+
(recur (inc i))))
59+
(.onCompleted s)))
60+
```
61+
4962
# Gotchas
5063
Here are a few things to keep in mind when using this interop:
5164

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/interop.clj

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,19 @@
1919
(reify
2020
; If they want Func1, give them onSubscribe as well so Observable/create can be
2121
; used seemlessly with rx/fn.
22+
; TODO remove this when OnSubscriberFunc is removed
2223
~@(if (and (= prefix "rx.util.functions.Func")
2324
(some #{1} arities))
2425
`(rx.Observable$OnSubscribeFunc
2526
(~'onSubscribe [~'this observer#]
2627
(~f-name observer#))))
2728

29+
; OnSubscribe is just an Action1, so add it to the list of implemented interfaces
30+
; so an action cab be used with Observable/create
31+
~@(if (and (= prefix "rx.util.functions.Action")
32+
(some #{1} arities))
33+
`(rx.Observable$OnSubscribe))
34+
2835
~@(mapcat (clojure.core/fn [n]
2936
(let [ifc-sym (symbol (str prefix n))
3037
arg-syms (map #(symbol (str "v" %)) (range n))]
@@ -94,7 +101,8 @@
94101

95102
(defn action*
96103
"Given function f, returns an object that implements rx.util.functions.Action0-3
97-
by delegating to the given function.
104+
by delegating to the given function. Also implements rx.Observable$OnSubscribe which
105+
is just an Action1.
98106
99107
Example:
100108

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
(testing "implements Action0-3"
7171
(let [calls (atom [])
7272
a (rx/action* #(swap! calls conj (vec %&)))]
73+
(is (instance? rx.Observable$OnSubscribe a))
7374
(is (instance? rx.util.functions.Action0 a))
7475
(is (instance? rx.util.functions.Action1 a))
7576
(is (instance? rx.util.functions.Action2 a))
@@ -114,7 +115,7 @@
114115

115116
(deftest test-basic-usage
116117

117-
(testing "can create an observable"
118+
(testing "can create an observable with old style fn"
118119
(is (= 99
119120
(-> (Observable/create (rx/fn [^rx.Observer o]
120121
(.onNext o 99)
@@ -123,6 +124,14 @@
123124
.toBlockingObservable
124125
.single))))
125126

127+
(testing "can create an observable with new-style action"
128+
(is (= 99
129+
(-> (Observable/create (rx/action [^rx.Subscriber s]
130+
(when-not (.isUnsubscribed s)
131+
(.onNext s 99))
132+
(.onCompleted s)))
133+
.toBlockingObservable
134+
.single))))
126135
(testing "can pass rx/fn to map and friends"
127136
(is (= (+ 1 4 9)
128137
(-> (Observable/from [1 2 3])

0 commit comments

Comments
 (0)