Skip to content

Commit 5c6db66

Browse files
Merge pull request #834 from daveray/update-clojure-for-0.17
Update clojure for 0.17
2 parents a1df031 + f74ad55 commit 5c6db66

File tree

7 files changed

+151
-96
lines changed

7 files changed

+151
-96
lines changed

language-adaptors/rxjava-clojure/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im
4646
(rx/action [] (println "Sequence complete"))))
4747
```
4848

49+
## Using Observable/create
50+
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:
51+
52+
```clojure
53+
; A simple observable that emits 0..9 taking unsubscribe into account
54+
(Observable/create (rx/action [^rx.Subscriber s]
55+
(loop [i 0]
56+
(when (and (< i 10) (.isUnsubscribed s))
57+
(.onNext s i)
58+
(recur (inc i))))
59+
(.onCompleted s)))
60+
```
61+
4962
# Gotchas
5063
Here are a few things to keep in mind when using this interop:
5164

language-adaptors/rxjava-clojure/build.gradle

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,26 @@ jar {
5050
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
5151
}
5252
}
53+
54+
55+
////////////////////////////////////////////////////////////////////////////////
56+
// Define a task that runs an nrepl server. The port is given with the nreplPort
57+
// property:
58+
// gradlew nrepl -PnreplPort=9999
59+
// or put the property in ~/.gradle/gradle.properties
60+
61+
def nreplPortValue = (project.hasProperty('nreplPort') && !project.nreplPort.isEmpty()) ? project.nreplPort : '9999'
62+
configurations { nrepl }
63+
dependencies { nrepl 'org.clojure:tools.nrepl:0.2.2' }
64+
task nrepl(type: JavaExec) {
65+
classpath configurations.nrepl,
66+
project.sourceSets.main.clojure.srcDirs,
67+
project.sourceSets.test.clojure.srcDirs,
68+
sourceSets.main.runtimeClasspath,
69+
sourceSets.test.runtimeClasspath
70+
71+
main = "clojure.main"
72+
args '--eval', "(ns gradle-nrepl (:require [clojure.tools.nrepl.server :refer (start-server stop-server)]))",
73+
'--eval', "(println \"Starting nrepl server on port $nreplPortValue\")",
74+
'--eval', "(def server (start-server :port $nreplPortValue))"
75+
}

language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/http_examples.clj

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
;
22
; Copyright 2013 Netflix, Inc.
3-
;
3+
;
44
; Licensed under the Apache License, Version 2.0 (the "License");
55
; you may not use this file except in compliance with the License.
66
; You may obtain a copy of the License at
77
;
88
; http://www.apache.org/licenses/LICENSE-2.0
9-
;
9+
;
1010
; Unless required by applicable law or agreed to in writing, software
1111
; distributed under the License is distributed on an "AS IS" BASIS,
1212
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,14 +27,14 @@
2727
2828
return Observable<String> of HTML"
2929
(Observable/create
30-
(rx/fn [observer]
30+
(rx/action [observer]
3131
(let [f (future
3232
(doseq [articleName wikipediaArticleNames]
3333
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
3434
; after sending response to onnext we complete the sequence
3535
(-> observer .onCompleted))]
3636
; a subscription that cancels the future if unsubscribed
37-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
37+
(.add observer (Subscriptions/create (rx/action [] (future-cancel f))))))))
3838

3939
; To see output
4040
(comment
@@ -52,7 +52,7 @@
5252
5353
return Observable<String> of HTML"
5454
(Observable/create
55-
(rx/fn [observer]
55+
(rx/action [observer]
5656
(let [f (future
5757
(try
5858
(doseq [articleName wikipediaArticleNames]
@@ -62,7 +62,7 @@
6262
; after sending response to onNext we complete the sequence
6363
(-> observer .onCompleted))]
6464
; a subscription that cancels the future if unsubscribed
65-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
65+
(.add observer (Subscriptions/create (rx/action [] (future-cancel f))))))))
6666

6767
; To see output
6868
(comment

language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.clj

Lines changed: 54 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
;
22
; Copyright 2013 Netflix, Inc.
3-
;
3+
;
44
; Licensed under the Apache License, Version 2.0 (the "License");
55
; you may not use this file except in compliance with the License.
66
; You may obtain a copy of the License at
77
;
88
; http://www.apache.org/licenses/LICENSE-2.0
9-
;
9+
;
1010
; Unless required by applicable law or agreed to in writing, software
1111
; distributed under the License is distributed on an "AS IS" BASIS,
1212
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -58,53 +58,37 @@
5858
; Custom Observable
5959
; --------------------------------------------------
6060

61-
(defn customObservableBlocking []
62-
"This example shows a custom Observable that blocks
63-
when subscribed to (does not spawn an extra thread).
61+
(defn customObservable
62+
"This example shows a custom Observable. Note the
63+
.isUnsubscribed check so that it can be stopped early.
6464
6565
returns Observable<String>"
66+
[]
6667
(Observable/create
67-
(rx/fn [observer]
68-
(doseq [x (range 50)] (-> observer (.onNext (str "value_" x))))
68+
(rx/action [^rx.Subscriber s]
69+
(loop [x (range 50)]
70+
(when (and (not (.isUnsubscribed s)) x)
71+
; TODO
72+
(println "HERE " (.isUnsubscribed s) (first x))
73+
(-> s (.onNext (str "value_" (first x))))
74+
(recur (next x))))
6975
; after sending all values we complete the sequence
70-
(-> observer .onCompleted)
71-
; return a NoOpSubsription since this blocks and thus
72-
; can't be unsubscribed from
73-
(Subscriptions/empty))))
76+
(-> s .onCompleted))))
7477

7578
; To see output
7679
(comment
77-
(.subscribe (customObservableBlocking) (rx/action* println)))
78-
79-
(defn customObservableNonBlocking []
80-
"This example shows a custom Observable that does not block
81-
when subscribed to as it spawns a separate thread.
82-
83-
returns Observable<String>"
84-
(Observable/create
85-
(rx/fn [observer]
86-
(let [f (future
87-
(doseq [x (range 50)]
88-
(-> observer (.onNext (str "anotherValue_" x))))
89-
; after sending all values we complete the sequence
90-
(-> observer .onCompleted))]
91-
; return a subscription that cancels the future
92-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
93-
94-
; To see output
95-
(comment
96-
(.subscribe (customObservableNonBlocking) (rx/action* println)))
97-
80+
(.subscribe (customObservable) (rx/action* println)))
9881

9982
; --------------------------------------------------
10083
; Composition - Simple
10184
; --------------------------------------------------
10285

103-
(defn simpleComposition []
104-
"Asynchronously calls 'customObservableNonBlocking' and defines
86+
(defn simpleComposition
87+
"Calls 'customObservable' and defines
10588
a chain of operators to apply to the callback sequence."
89+
[]
10690
(->
107-
(customObservableNonBlocking)
91+
(customObservable)
10892
(.skip 10)
10993
(.take 5)
11094
(.map (rx/fn [v] (str v "_transformed")))
@@ -119,66 +103,73 @@
119103
; Composition - Multiple async calls combined
120104
; --------------------------------------------------
121105

122-
(defn getUser [userId]
106+
(defn getUser
123107
"Asynchronously fetch user data
124108
125109
return Observable<Map>"
110+
[userId]
126111
(Observable/create
127-
(rx/fn [observer]
112+
(rx/action [^rx.Subscriber s]
128113
(let [f (future
129114
(try
130115
; simulate fetching user data via network service call with latency
131116
(Thread/sleep 60)
132-
(-> observer (.onNext {:user-id userId
133-
:name "Sam Harris"
134-
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
135-
(-> observer .onCompleted)
136-
(catch Exception e (-> observer (.onError e))))) ]
117+
(-> s (.onNext {:user-id userId
118+
:name "Sam Harris"
119+
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
120+
(-> s .onCompleted)
121+
(catch Exception e
122+
(-> s (.onError e))))) ]
137123
; a subscription that cancels the future if unsubscribed
138-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
124+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
139125

140-
(defn getVideoBookmark [userId, videoId]
126+
(defn getVideoBookmark
141127
"Asynchronously fetch bookmark for video
142128
143129
return Observable<Integer>"
130+
[userId, videoId]
144131
(Observable/create
145-
(rx/fn [observer]
132+
(rx/action [^rx.Subscriber s]
146133
(let [f (future
147134
(try
148135
; simulate fetching user data via network service call with latency
149136
(Thread/sleep 20)
150-
(-> observer (.onNext {:video-id videoId
151-
; 50/50 chance of giving back position 0 or 0-2500
152-
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
153-
(-> observer .onCompleted)
154-
(catch Exception e (-> observer (.onError e)))))]
137+
(-> s (.onNext {:video-id videoId
138+
; 50/50 chance of giving back position 0 or 0-2500
139+
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
140+
(-> s .onCompleted)
141+
(catch Exception e
142+
(-> s (.onError e)))))]
155143
; a subscription that cancels the future if unsubscribed
156-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
144+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
157145

158-
(defn getVideoMetadata [videoId, preferredLanguage]
146+
(defn getVideoMetadata
159147
"Asynchronously fetch movie metadata for a given language
160148
return Observable<Map>"
149+
[videoId, preferredLanguage]
161150
(Observable/create
162-
(rx/fn [observer]
151+
(rx/action [^rx.Subscriber s]
163152
(let [f (future
153+
(println "getVideoMetadata " videoId)
164154
(try
165155
; simulate fetching video data via network service call with latency
166156
(Thread/sleep 50)
167157
; contrived metadata for en-us or es-us
168158
(if (= "en-us" preferredLanguage)
169-
(-> observer (.onNext {:video-id videoId
159+
(-> s (.onNext {:video-id videoId
170160
:title "House of Cards: Episode 1"
171161
:director "David Fincher"
172162
:duration 3365})))
173163
(if (= "es-us" preferredLanguage)
174-
(-> observer (.onNext {:video-id videoId
164+
(-> s (.onNext {:video-id videoId
175165
:title "Cámara de Tarjetas: Episodio 1"
176166
:director "David Fincher"
177167
:duration 3365})))
178-
(-> observer .onCompleted)
179-
(catch Exception e (-> observer (.onError e))))) ]
168+
(-> s .onCompleted)
169+
(catch Exception e
170+
(-> s (.onError e))))) ]
180171
; a subscription that cancels the future if unsubscribed
181-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
172+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
182173

183174

184175
(defn getVideoForUser [userId videoId]
@@ -188,8 +179,9 @@
188179
- user data
189180
return Observable<Map>"
190181
(let [user-observable (-> (getUser userId)
191-
(.map (rx/fn [user] {:user-name (:name user)
192-
:language (:preferred-language user)})))
182+
(.map (rx/fn [user]
183+
{:user-name (:name user)
184+
:language (:preferred-language user)})))
193185
bookmark-observable (-> (getVideoBookmark userId videoId)
194186
(.map (rx/fn [bookmark] {:viewed-position (:position bookmark)})))
195187
; getVideoMetadata requires :language from user-observable so nest inside map function
@@ -218,8 +210,6 @@
218210
;
219211
(comment
220212
(-> (getVideoForUser 12345 78965)
221-
(.subscribe
222-
(rx/action [x] (println "--- Object ---\n" x))
223-
(rx/action [e] (println "--- Error ---\n" e))
224-
(rx/action [] (println "--- Completed ---")))))
213+
(.toBlockingObservable)
214+
.single))
225215

0 commit comments

Comments
 (0)