Skip to content

Commit 886caeb

Browse files
Merge pull request #925 from daveray/rxjava-clojure-bindings-final
Rxjava clojure bindings final
2 parents 4a93b4a + d19bc42 commit 886caeb

File tree

13 files changed

+2767
-32
lines changed

13 files changed

+2767
-32
lines changed

language-adaptors/rxjava-clojure/README.md

Lines changed: 93 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,96 @@
1-
# Clojure Adaptor for RxJava
1+
Clojure bindings for RxJava.
22

3+
# Binaries
4+
5+
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22).
6+
7+
Example for Leiningen:
8+
9+
```clojure
10+
[com.netflix.rxjava/rxjava-clojure "x.y.z"]
11+
```
12+
13+
and for Gradle:
14+
15+
```groovy
16+
compile 'com.netflix.rxjava:rxjava-clojure:x.y.z'
17+
```
18+
19+
and for Maven:
20+
21+
```xml
22+
<dependency>
23+
<groupId>com.netflix.rxjava</groupId>
24+
<artifactId>rxjava-clojure</artifactId>
25+
<version>x.y.z</version>
26+
</dependency>
27+
```
28+
29+
and for Ivy:
30+
31+
```xml
32+
<dependency org="com.netflix.rxjava" name="rxjava-clojure" rev="x.y.z" />
33+
```
34+
35+
# Clojure Bindings
36+
This library provides convenient, idiomatic Clojure bindings for RxJava.
37+
38+
The bindings try to present an API that will be comfortable and familiar to a Clojure programmer that's familiar with the sequence operations in `clojure.core`. It "fixes" several issues with using RxJava with raw Java interop, for example:
39+
40+
* Argument lists are in the "right" order. So in RxJava, the function applied in `Observable.map` is the second argument, while here it's the first argument with one or more Observables as trailing arguments
41+
* Operators take normal Clojure functions as arguments, bypassing need for the interop described below
42+
* Predicates accomodate Clojure's notion of truth
43+
* Operators are generally names as they would be in `clojure.core` rather than the Rx names
44+
45+
There is no object wrapping going on. That is, all functions return normal `rx.Observable` objects, so you can always drop back to Java interop for anything that's missing in this wrapper.
46+
47+
## Basic Usage
48+
Most functionality resides in the `rx.lang.clojure.core` namespace and for the most part looks like normal Clojure sequence manipulation:
49+
50+
```clojure
51+
(require '[rx.lang.clojure.core :as rx])
52+
53+
(->> my-observable
54+
(rx/map (comp clojure.string/lower-case :first-name))
55+
(rx/map clojure.string/lower-case)
56+
(rx/filter #{"bob"})
57+
(rx/distinct)
58+
(rx/into []))
59+
;=> An Observable that emits a single vector of names
60+
```
61+
62+
Blocking operators, which are useful for testing, but should otherwise be avoided, reside in `rx.lang.clojure.blocking`. For example:
63+
64+
```clojure
65+
(require '[rx.lang.clojure.blocking :as rxb])
66+
67+
(rxb/doseq [{:keys [first-name]} users-observable]
68+
(println "Hey," first-name))
69+
;=> nil
70+
```
71+
72+
## Open Issues
73+
74+
* The missing stuff mentioned below
75+
* `group-by` val-fn variant isn't implemented in RxJava
76+
* There are some functions for defining customer Observables and Operators (`subscriber`, `operator*`, `observable*`). I don't think these are really enough for serious operator implementation, but I'm hesitant to guess at an abstraction at this point. These will probably change dramatically.
77+
78+
## What's Missing
79+
This library is an ongoing work in progress driven primarily by the needs of one team at Netflix. As such some things are currently missing:
80+
81+
* Highly-specific operators that we felt cluttered the API and were easily composed from existing operators, especially since we're in not-Java land. For example, `Observable.sumLong()`.
82+
* Most everything involving schedulers
83+
* Most everything involving time
84+
* `Observable.window` and `Observable.buffer`. Who knows which parts of these beasts to wrap?
85+
86+
Of course, contributions that cover these cases are welcome.
87+
88+
# Low-level Interop
389
This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces.
490

5-
# Basic Usage
91+
## Basic Usage
692

7-
## Requiring the interop namespace
93+
### Requiring the interop namespace
894
The first thing to do is to require the namespace:
995

1096
```clojure
@@ -19,7 +105,7 @@ or, at the REPL:
19105
(require '[rx.lang.clojure.interop :as rx])
20106
```
21107

22-
## Using rx/fn
108+
### Using rx/fn
23109
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:
24110

25111
```clojure
@@ -34,7 +120,7 @@ If you already have a plain old Clojure function you'd like to use, you can pass
34120
(.reduce (rx/fn* +)))
35121
```
36122

37-
## Using rx/action
123+
### Using rx/action
38124
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:
39125

40126
```clojure
@@ -46,7 +132,7 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im
46132
(rx/action [] (println "Sequence complete"))))
47133
```
48134

49-
## Using Observable/create
135+
### Using Observable/create
50136
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:
51137

52138
```clojure
@@ -59,35 +145,10 @@ As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnS
59145
(.onCompleted s)))
60146
```
61147

62-
# Gotchas
148+
## Gotchas
63149
Here are a few things to keep in mind when using this interop:
64150

65151
* Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts
66152
* If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose.
67153
* Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`.
68154

69-
# Binaries
70-
71-
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22).
72-
73-
Example for Maven:
74-
75-
```xml
76-
<dependency>
77-
<groupId>com.netflix.rxjava</groupId>
78-
<artifactId>rxjava-clojure</artifactId>
79-
<version>x.y.z</version>
80-
</dependency>
81-
```
82-
83-
and for Ivy:
84-
85-
```xml
86-
<dependency org="com.netflix.rxjava" name="rxjava-clojure" rev="x.y.z" />
87-
```
88-
89-
and for Leiningen:
90-
91-
```clojure
92-
[com.netflix.rxjava/rxjava-clojure "x.y.z"]
93-
```
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
(ns rx.lang.clojure.blocking
2+
"Blocking operators and functions. These should never be used in
3+
production code except at the end of an async chain to convert from
4+
rx land back to sync land. For example, to produce a servlet response.
5+
6+
If you use these, you're a bad person.
7+
"
8+
(:refer-clojure :exclude [first into doseq last])
9+
(:require [rx.lang.clojure.interop :as iop] [rx.lang.clojure.core :as rx])
10+
(:import [rx Observable]
11+
[rx.observables BlockingObservable]))
12+
13+
(def ^:private -ns- *ns*)
14+
(set! *warn-on-reflection* true)
15+
16+
(defmacro ^:private with-ex-unwrap
17+
"The blocking ops wrap errors stuff in RuntimeException because of stupid Java.
18+
This tries to unwrap them so callers get the exceptions they expect."
19+
[& body]
20+
`(try
21+
~@body
22+
(catch RuntimeException e#
23+
(throw (or
24+
(and (identical? RuntimeException (class e#))
25+
(.getCause e#))
26+
e#)))))
27+
28+
(defn ^BlockingObservable ->blocking
29+
"Convert an Observable to a BlockingObservable.
30+
31+
If o is already a BlockingObservable it's returned unchanged.
32+
"
33+
[o]
34+
(if (instance? BlockingObservable o)
35+
o
36+
(.toBlockingObservable ^Observable o)))
37+
38+
(defn o->seq
39+
"Returns a lazy sequence of the items emitted by o
40+
41+
See:
42+
rx.observables.BlockingObservable/getIterator
43+
rx.lang.clojure.core/seq->o
44+
"
45+
[o]
46+
(-> (->blocking o)
47+
(.getIterator)
48+
(iterator-seq)))
49+
50+
(defn first
51+
"*Blocks* and waits for the first value emitted by the given observable.
52+
53+
If the Observable is empty, returns nil
54+
55+
If an error is produced it is thrown.
56+
57+
See:
58+
clojure.core/first
59+
rx/first
60+
rx.observables.BlockingObservable/first
61+
"
62+
[observable]
63+
(with-ex-unwrap
64+
(.firstOrDefault (->blocking observable) nil)))
65+
66+
(defn last
67+
"*Blocks* and waits for the last value emitted by the given observable.
68+
69+
If the Observable is empty, returns nil
70+
71+
If an error is produced it is thrown.
72+
73+
See:
74+
clojure.core/last
75+
rx/last
76+
rx.observable.BlockingObservable/last
77+
"
78+
[observable]
79+
(with-ex-unwrap
80+
(.lastOrDefault (->blocking observable) nil)))
81+
82+
(defn single
83+
"*Blocks* and waits for the first value emitted by the given observable.
84+
85+
An error is thrown if zero or more then one value is produced.
86+
"
87+
[observable]
88+
(with-ex-unwrap
89+
(.single (->blocking observable))))
90+
91+
(defn into
92+
"*Blocks* and pours the elements emitted by the given observables into
93+
to.
94+
95+
If an error is produced it is thrown.
96+
97+
See:
98+
clojure.core/into
99+
rx/into
100+
"
101+
[to from-observable]
102+
(with-ex-unwrap
103+
(clojure.core/into to (o->seq from-observable))))
104+
105+
(defn doseq*
106+
"*Blocks* and executes (f x) for each x emitted by xs
107+
108+
Returns nil.
109+
110+
See:
111+
doseq
112+
clojure.core/doseq
113+
"
114+
[xs f]
115+
(with-ex-unwrap
116+
(-> (->blocking xs)
117+
(.forEach (rx.lang.clojure.interop/action* f)))))
118+
119+
(defmacro doseq
120+
"Like clojure.core/doseq except iterates over an observable in a blocking manner.
121+
122+
Unlike clojure.core/doseq, only supports a single binding
123+
124+
Returns nil.
125+
126+
Example:
127+
128+
(rx-blocking/doseq [{:keys [name]} users-observable]
129+
(println \"User:\" name))
130+
131+
See:
132+
doseq*
133+
clojure.core/doseq
134+
"
135+
[bindings & body]
136+
(when (not= (count bindings) 2)
137+
(throw (IllegalArgumentException. (str "sorry, rx/doseq only supports one binding"))))
138+
(let [[k v] bindings]
139+
`(doseq* ~v (fn [~k] ~@body))))
140+

0 commit comments

Comments
 (0)