Skip to content

Commit 80d70db

Browse files
Merge pull request #1597 from GeorgiKhomeriki/master
reactivex.io: RxScala getting started examples
2 parents e37047f + dfc789c commit 80d70db

File tree

9 files changed

+175
-0
lines changed

9 files changed

+175
-0
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import rx.lang.scala.Observable
2+
3+
object AsyncObservable extends App {
4+
/**
5+
* This example shows a custom Observable that does not block
6+
* when subscribed to as it spawns a separate thread.
7+
*/
8+
def customObservableNonBlocking(): Observable[String] = {
9+
Observable(
10+
/*
11+
* This 'call' method will be invoked when the Observable is subscribed to.
12+
*
13+
* It spawns a thread to do it asynchronously.
14+
*/
15+
subscriber => {
16+
// For simplicity this example uses a Thread instead of an ExecutorService/ThreadPool
17+
new Thread(new Runnable() {
18+
def run() {
19+
for (i <- 0 to 75) {
20+
if (subscriber.isUnsubscribed) {
21+
return
22+
}
23+
subscriber.onNext("value_" + i)
24+
}
25+
// after sending all values we complete the sequence
26+
if (!subscriber.isUnsubscribed) {
27+
subscriber.onCompleted()
28+
}
29+
}
30+
}).start()
31+
}
32+
)
33+
}
34+
35+
// To see output:
36+
customObservableNonBlocking().subscribe(println(_))
37+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import java.net.URL
2+
import java.util.Scanner
3+
4+
import rx.lang.scala.Observable
5+
6+
object AsyncWiki extends App {
7+
/*
8+
* Fetch a list of Wikipedia articles asynchronously.
9+
*/
10+
def fetchWikipediaArticleAsynchronously(wikipediaArticleNames: String*): Observable[String] = {
11+
Observable(subscriber => {
12+
new Thread(new Runnable() {
13+
def run() {
14+
for (articleName <- wikipediaArticleNames) {
15+
if (subscriber.isUnsubscribed) {
16+
return
17+
}
18+
val url = "http://en.wikipedia.org/wiki/" + articleName
19+
val art = new Scanner(new URL(url).openStream()).useDelimiter("\\A").next()
20+
subscriber.onNext(art)
21+
}
22+
if (!subscriber.isUnsubscribed) {
23+
subscriber.onCompleted()
24+
}
25+
}
26+
}).start()
27+
})
28+
}
29+
30+
fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
31+
.subscribe(art => println("--- Article ---\n" + art.substring(0, 125)))
32+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import java.net.URL
2+
import java.util.Scanner
3+
4+
import rx.lang.scala.Observable
5+
6+
object AsyncWikiErrorHandling extends App {
7+
/*
8+
* Fetch a list of Wikipedia articles asynchronously, with error handling.
9+
*/
10+
def fetchWikipediaArticleAsynchronously(wikipediaArticleNames: String*): Observable[String] = {
11+
Observable(subscriber => {
12+
new Thread(new Runnable() {
13+
def run() {
14+
try {
15+
for (articleName <- wikipediaArticleNames) {
16+
if (subscriber.isUnsubscribed) {
17+
return
18+
}
19+
val url = "http://en.wikipedia.org/wiki/" + articleName
20+
val art = new Scanner(new URL(url).openStream()).useDelimiter("\\A").next()
21+
subscriber.onNext(art)
22+
}
23+
if (!subscriber.isUnsubscribed) {
24+
subscriber.onCompleted()
25+
}
26+
} catch {
27+
case t: Throwable => subscriber.onError(t)
28+
}
29+
}
30+
}).start()
31+
})
32+
}
33+
34+
fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
35+
.subscribe(
36+
art => println("--- Article ---\n" + art.substring(0, 125)),
37+
e => println("--- Error ---\n" + e.getMessage) )
38+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import rx.lang.scala.Observable
2+
3+
object CreateFromSource {
4+
5+
def create() {
6+
val o1 = Observable.just("a", "b", "c")
7+
8+
def list = List(5, 6, 7, 8)
9+
val o2 = Observable.from(list)
10+
11+
val o3 = Observable.just("one object")
12+
}
13+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import rx.lang.scala.Observable
2+
3+
object Hello {
4+
5+
def hello(names: String*) {
6+
Observable.from(names) subscribe { n =>
7+
println(s"Hello $n!")
8+
}
9+
}
10+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import rx.lang.scala.Observable
2+
3+
object SyncObservable extends App {
4+
/**
5+
* This example shows a custom Observable that blocks
6+
* when subscribed to (does not spawn an extra thread).
7+
*/
8+
def customObservableBlocking(): Observable[String] = {
9+
Observable(aSubscriber => {
10+
for (i <- 0 to 50) {
11+
if (!aSubscriber.isUnsubscribed) {
12+
aSubscriber.onNext("value_" + i)
13+
}
14+
}
15+
// after sending all values we complete the sequence
16+
if (!aSubscriber.isUnsubscribed) {
17+
aSubscriber.onCompleted()
18+
}
19+
})
20+
}
21+
22+
// To see output:
23+
customObservableBlocking().subscribe(println(_))
24+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
onNext => value_10_xform
2+
onNext => value_11_xform
3+
onNext => value_12_xform
4+
onNext => value_13_xform
5+
onNext => value_14_xform
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
object Transforming extends App {
2+
/**
3+
* Asynchronously calls 'customObservableNonBlocking' and defines
4+
* a chain of operators to apply to the callback sequence.
5+
*/
6+
def simpleComposition() {
7+
AsyncObservable.customObservableNonBlocking().drop(10).take(5)
8+
.map(stringValue => stringValue + "_xform")
9+
.subscribe(s => println("onNext => " + s))
10+
}
11+
12+
simpleComposition()
13+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
hello("Ben", "George")
2+
Hello Ben!
3+
Hello George!

0 commit comments

Comments
 (0)