Skip to content

Commit f74ad55

Browse files
committed
Update Clojure examples to use new Observable/create
1 parent d7df829 commit f74ad55

File tree

3 files changed

+96
-94
lines changed

3 files changed

+96
-94
lines changed

language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/http_examples.clj

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
;
22
; Copyright 2013 Netflix, Inc.
3-
;
3+
;
44
; Licensed under the Apache License, Version 2.0 (the "License");
55
; you may not use this file except in compliance with the License.
66
; You may obtain a copy of the License at
77
;
88
; http://www.apache.org/licenses/LICENSE-2.0
9-
;
9+
;
1010
; Unless required by applicable law or agreed to in writing, software
1111
; distributed under the License is distributed on an "AS IS" BASIS,
1212
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,14 +27,14 @@
2727
2828
return Observable<String> of HTML"
2929
(Observable/create
30-
(rx/fn [observer]
30+
(rx/action [observer]
3131
(let [f (future
3232
(doseq [articleName wikipediaArticleNames]
3333
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
3434
; after sending response to onnext we complete the sequence
3535
(-> observer .onCompleted))]
3636
; a subscription that cancels the future if unsubscribed
37-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
37+
(.add observer (Subscriptions/create (rx/action [] (future-cancel f))))))))
3838

3939
; To see output
4040
(comment
@@ -52,7 +52,7 @@
5252
5353
return Observable<String> of HTML"
5454
(Observable/create
55-
(rx/fn [observer]
55+
(rx/action [observer]
5656
(let [f (future
5757
(try
5858
(doseq [articleName wikipediaArticleNames]
@@ -62,7 +62,7 @@
6262
; after sending response to onNext we complete the sequence
6363
(-> observer .onCompleted))]
6464
; a subscription that cancels the future if unsubscribed
65-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
65+
(.add observer (Subscriptions/create (rx/action [] (future-cancel f))))))))
6666

6767
; To see output
6868
(comment

language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.clj

Lines changed: 54 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
;
22
; Copyright 2013 Netflix, Inc.
3-
;
3+
;
44
; Licensed under the Apache License, Version 2.0 (the "License");
55
; you may not use this file except in compliance with the License.
66
; You may obtain a copy of the License at
77
;
88
; http://www.apache.org/licenses/LICENSE-2.0
9-
;
9+
;
1010
; Unless required by applicable law or agreed to in writing, software
1111
; distributed under the License is distributed on an "AS IS" BASIS,
1212
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -58,53 +58,37 @@
5858
; Custom Observable
5959
; --------------------------------------------------
6060

61-
(defn customObservableBlocking []
62-
"This example shows a custom Observable that blocks
63-
when subscribed to (does not spawn an extra thread).
61+
(defn customObservable
62+
"This example shows a custom Observable. Note the
63+
.isUnsubscribed check so that it can be stopped early.
6464
6565
returns Observable<String>"
66+
[]
6667
(Observable/create
67-
(rx/fn [observer]
68-
(doseq [x (range 50)] (-> observer (.onNext (str "value_" x))))
68+
(rx/action [^rx.Subscriber s]
69+
(loop [x (range 50)]
70+
(when (and (not (.isUnsubscribed s)) x)
71+
; TODO
72+
(println "HERE " (.isUnsubscribed s) (first x))
73+
(-> s (.onNext (str "value_" (first x))))
74+
(recur (next x))))
6975
; after sending all values we complete the sequence
70-
(-> observer .onCompleted)
71-
; return a NoOpSubsription since this blocks and thus
72-
; can't be unsubscribed from
73-
(Subscriptions/empty))))
76+
(-> s .onCompleted))))
7477

7578
; To see output
7679
(comment
77-
(.subscribe (customObservableBlocking) (rx/action* println)))
78-
79-
(defn customObservableNonBlocking []
80-
"This example shows a custom Observable that does not block
81-
when subscribed to as it spawns a separate thread.
82-
83-
returns Observable<String>"
84-
(Observable/create
85-
(rx/fn [observer]
86-
(let [f (future
87-
(doseq [x (range 50)]
88-
(-> observer (.onNext (str "anotherValue_" x))))
89-
; after sending all values we complete the sequence
90-
(-> observer .onCompleted))]
91-
; return a subscription that cancels the future
92-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
93-
94-
; To see output
95-
(comment
96-
(.subscribe (customObservableNonBlocking) (rx/action* println)))
97-
80+
(.subscribe (customObservable) (rx/action* println)))
9881

9982
; --------------------------------------------------
10083
; Composition - Simple
10184
; --------------------------------------------------
10285

103-
(defn simpleComposition []
104-
"Asynchronously calls 'customObservableNonBlocking' and defines
86+
(defn simpleComposition
87+
"Calls 'customObservable' and defines
10588
a chain of operators to apply to the callback sequence."
89+
[]
10690
(->
107-
(customObservableNonBlocking)
91+
(customObservable)
10892
(.skip 10)
10993
(.take 5)
11094
(.map (rx/fn [v] (str v "_transformed")))
@@ -119,66 +103,73 @@
119103
; Composition - Multiple async calls combined
120104
; --------------------------------------------------
121105

122-
(defn getUser [userId]
106+
(defn getUser
123107
"Asynchronously fetch user data
124108
125109
return Observable<Map>"
110+
[userId]
126111
(Observable/create
127-
(rx/fn [observer]
112+
(rx/action [^rx.Subscriber s]
128113
(let [f (future
129114
(try
130115
; simulate fetching user data via network service call with latency
131116
(Thread/sleep 60)
132-
(-> observer (.onNext {:user-id userId
133-
:name "Sam Harris"
134-
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
135-
(-> observer .onCompleted)
136-
(catch Exception e (-> observer (.onError e))))) ]
117+
(-> s (.onNext {:user-id userId
118+
:name "Sam Harris"
119+
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
120+
(-> s .onCompleted)
121+
(catch Exception e
122+
(-> s (.onError e))))) ]
137123
; a subscription that cancels the future if unsubscribed
138-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
124+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
139125

140-
(defn getVideoBookmark [userId, videoId]
126+
(defn getVideoBookmark
141127
"Asynchronously fetch bookmark for video
142128
143129
return Observable<Integer>"
130+
[userId, videoId]
144131
(Observable/create
145-
(rx/fn [observer]
132+
(rx/action [^rx.Subscriber s]
146133
(let [f (future
147134
(try
148135
; simulate fetching user data via network service call with latency
149136
(Thread/sleep 20)
150-
(-> observer (.onNext {:video-id videoId
151-
; 50/50 chance of giving back position 0 or 0-2500
152-
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
153-
(-> observer .onCompleted)
154-
(catch Exception e (-> observer (.onError e)))))]
137+
(-> s (.onNext {:video-id videoId
138+
; 50/50 chance of giving back position 0 or 0-2500
139+
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
140+
(-> s .onCompleted)
141+
(catch Exception e
142+
(-> s (.onError e)))))]
155143
; a subscription that cancels the future if unsubscribed
156-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
144+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
157145

158-
(defn getVideoMetadata [videoId, preferredLanguage]
146+
(defn getVideoMetadata
159147
"Asynchronously fetch movie metadata for a given language
160148
return Observable<Map>"
149+
[videoId, preferredLanguage]
161150
(Observable/create
162-
(rx/fn [observer]
151+
(rx/action [^rx.Subscriber s]
163152
(let [f (future
153+
(println "getVideoMetadata " videoId)
164154
(try
165155
; simulate fetching video data via network service call with latency
166156
(Thread/sleep 50)
167157
; contrived metadata for en-us or es-us
168158
(if (= "en-us" preferredLanguage)
169-
(-> observer (.onNext {:video-id videoId
159+
(-> s (.onNext {:video-id videoId
170160
:title "House of Cards: Episode 1"
171161
:director "David Fincher"
172162
:duration 3365})))
173163
(if (= "es-us" preferredLanguage)
174-
(-> observer (.onNext {:video-id videoId
164+
(-> s (.onNext {:video-id videoId
175165
:title "Cámara de Tarjetas: Episodio 1"
176166
:director "David Fincher"
177167
:duration 3365})))
178-
(-> observer .onCompleted)
179-
(catch Exception e (-> observer (.onError e))))) ]
168+
(-> s .onCompleted)
169+
(catch Exception e
170+
(-> s (.onError e))))) ]
180171
; a subscription that cancels the future if unsubscribed
181-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
172+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
182173

183174

184175
(defn getVideoForUser [userId videoId]
@@ -188,8 +179,9 @@
188179
- user data
189180
return Observable<Map>"
190181
(let [user-observable (-> (getUser userId)
191-
(.map (rx/fn [user] {:user-name (:name user)
192-
:language (:preferred-language user)})))
182+
(.map (rx/fn [user]
183+
{:user-name (:name user)
184+
:language (:preferred-language user)})))
193185
bookmark-observable (-> (getVideoBookmark userId videoId)
194186
(.map (rx/fn [bookmark] {:viewed-position (:position bookmark)})))
195187
; getVideoMetadata requires :language from user-observable so nest inside map function
@@ -218,8 +210,6 @@
218210
;
219211
(comment
220212
(-> (getVideoForUser 12345 78965)
221-
(.subscribe
222-
(rx/action [x] (println "--- Object ---\n" x))
223-
(rx/action [e] (println "--- Error ---\n" e))
224-
(rx/action [] (println "--- Completed ---")))))
213+
(.toBlockingObservable)
214+
.single))
225215

language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.clj

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
;
22
; Copyright 2013 Netflix, Inc.
3-
;
3+
;
44
; Licensed under the Apache License, Version 2.0 (the "License");
55
; you may not use this file except in compliance with the License.
66
; You may obtain a copy of the License at
77
;
88
; http://www.apache.org/licenses/LICENSE-2.0
9-
;
9+
;
1010
; Unless required by applicable law or agreed to in writing, software
1111
; distributed under the License is distributed on an "AS IS" BASIS,
1212
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -108,10 +108,10 @@
108108
"Returns an observable that executes (f observer) in a future, returning a
109109
subscription that will cancel the future."
110110
[f]
111-
(Observable/create (rx/fn [^Observer observer]
111+
(Observable/create (rx/action [^rx.Subscriber s]
112112
(println "Starting f")
113-
(let [f (future (f observer))]
114-
(Subscriptions/create (rx/action [] (future-cancel f)))))))
113+
(let [f (future (f s))]
114+
(.add s (Subscriptions/create (rx/action [] (future-cancel f))))))))
115115

116116
(defn ^Observable get-list-of-lists
117117
"
@@ -120,15 +120,17 @@
120120
Observable<VideoList> is the \"push\" equivalent to List<VideoList>
121121
"
122122
[user-id]
123-
(future-observable (fn [^Observer observer]
123+
(future-observable (fn [^rx.Subscriber s]
124124
(Thread/sleep 180)
125125
(dotimes [i 15]
126-
(.onNext observer (video-list i)))
127-
(.onCompleted observer))))
126+
(.onNext s (video-list i)))
127+
(.onCompleted s))))
128128

129129

130-
(comment (.subscribe (get-list-of-lists 7777)
131-
(rx/action* println)))
130+
(comment (-> (get-list-of-lists 7777)
131+
.toList
132+
.toBlockingObservable
133+
.single))
132134

133135
(defn video-list
134136
[position]
@@ -137,24 +139,28 @@
137139

138140
(defn ^Observable video-list->videos
139141
[{:keys [position] :as video-list}]
140-
(Observable/create (rx/fn [^Observer observer]
142+
(Observable/create (rx/action [^rx.Subscriber s]
141143
(dotimes [i 50]
142-
(.onNext observer (+ (* position 1000) i)))
143-
(.onCompleted observer)
144-
(Subscriptions/empty))))
144+
(.onNext s (+ (* position 1000) i)))
145+
(.onCompleted s))))
145146

146-
(comment (.subscribe (video-list->videos (video-list 2)) (rx/action* println)))
147+
(comment (-> (video-list->videos (video-list 2))
148+
.toList
149+
.toBlockingObservable
150+
.single))
147151

148152
(defn ^Observable video->metadata
149153
[video-id]
150-
(Observable/create (rx/fn [^Observer observer]
151-
(.onNext observer {:title (str "video-" video-id "-title")
152-
:actors ["actor1" "actor2"]
153-
:duration 5428 })
154-
(.onCompleted observer)
155-
(Subscriptions/empty))))
154+
(Observable/create (rx/action [^rx.Subscriber s]
155+
(.onNext s {:title (str "video-" video-id "-title")
156+
:actors ["actor1" "actor2"]
157+
:duration 5428 })
158+
(.onCompleted s))))
156159

157-
(comment (.subscribe (video->metadata 10) (rx/action* println)))
160+
(comment (-> (video->metadata 10)
161+
.toList
162+
.toBlockingObservable
163+
.single))
158164

159165
(defn ^Observable video->bookmark
160166
[video-id user-id]
@@ -165,7 +171,10 @@
165171
(println "onComplete")
166172
(.onCompleted observer))))
167173

168-
(comment (.subscribe (video->bookmark 112345 99999) (rx/action* println)))
174+
(comment (-> (video->bookmark 112345 99999)
175+
.toList
176+
.toBlockingObservable
177+
.single))
169178

170179
(defn ^Observable video->rating
171180
[video-id user-id]
@@ -178,5 +187,8 @@
178187
:actual-star-rating (rand-int 5) })
179188
(.onCompleted observer))))
180189

181-
(comment (.subscribe (video->rating 234345 8888) (rx/action* println)))
190+
(comment (-> (video->rating 234345 8888)
191+
.toList
192+
.toBlockingObservable
193+
.single))
182194

0 commit comments

Comments
 (0)