Skip to content

Commit 38a87f5

Browse files
Merge pull request #1159 from zsxwing/rxscala-improvement
Rxscala improvement
2 parents dded83c + 8397f74 commit 38a87f5

File tree

3 files changed

+332
-3
lines changed

3 files changed

+332
-3
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.language.implicitConversions
2525

2626
import org.junit.Assert.assertEquals
2727
import org.junit.Assert.assertTrue
28+
import org.junit.Assert.assertFalse
2829
import org.junit.Ignore
2930
import org.junit.Test
3031
import org.scalatest.junit.JUnitSuite
@@ -296,6 +297,28 @@ class RxScalaDemo extends JUnitSuite {
296297
shared.connect
297298
}
298299

300+
@Test def exampleWithPublish2() {
301+
val unshared = Observable.from(1 to 4)
302+
val shared = unshared.publish(0)
303+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
304+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
305+
shared.connect
306+
}
307+
308+
@Test def exampleWithPublish3() {
309+
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2))
310+
o.subscribe(n => println(s"subscriber 1 gets $n"))
311+
o.subscribe(n => println(s"subscriber 2 gets $n"))
312+
Thread.sleep(1000)
313+
}
314+
315+
@Test def exampleWithPublish4() {
316+
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2), -1L)
317+
o.subscribe(n => println(s"subscriber 1 gets $n"))
318+
o.subscribe(n => println(s"subscriber 2 gets $n"))
319+
Thread.sleep(1000)
320+
}
321+
299322
def doLater(waitTime: Duration, action: () => Unit): Unit = {
300323
Observable.interval(waitTime).take(1).subscribe(_ => action())
301324
}
@@ -426,6 +449,15 @@ class RxScalaDemo extends JUnitSuite {
426449
)
427450
}
428451

452+
@Test def dropUntilExample() {
453+
val o = List("Alice", "Bob", "Carlos").toObservable.zip(
454+
Observable.interval(700 millis, IOScheduler())).map(_._1) // emit every 700 millis
455+
val other = List(1).toObservable.delay(1 seconds)
456+
println(
457+
o.dropUntil(other).toBlockingObservable.toList // output List("Bob", "Carlos")
458+
)
459+
}
460+
429461
def square(x: Int): Int = {
430462
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
431463
Thread.sleep(100) // calculating a square is heavy work :)
@@ -550,6 +582,26 @@ class RxScalaDemo extends JUnitSuite {
550582
obs.toBlockingObservable.toIterable.last
551583
}
552584

585+
@Test def doOnTerminateExample(): Unit = {
586+
val o = List("red", "green", "blue").toObservable.doOnTerminate(() => println("terminate"))
587+
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
588+
// red
589+
// green
590+
// blud
591+
// terminate
592+
// onCompleted
593+
}
594+
595+
@Test def finallyDoExample(): Unit = {
596+
val o = List("red", "green", "blue").toObservable.finallyDo(() => println("finally"))
597+
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
598+
// red
599+
// green
600+
// blud
601+
// onCompleted
602+
// finally
603+
}
604+
553605
@Test def timeoutExample(): Unit = {
554606
val other = List(100L, 200L, 300L).toObservable
555607
val result = Observable.interval(100 millis).timeout(50 millis, other).toBlockingObservable.toList
@@ -636,6 +688,24 @@ class RxScalaDemo extends JUnitSuite {
636688
println(m.toBlockingObservable.single)
637689
}
638690

691+
@Test def containsExample(): Unit = {
692+
val o1 = List(1, 2, 3).toObservable.contains(2)
693+
assertTrue(o1.toBlockingObservable.single)
694+
695+
val o2 = List(1, 2, 3).toObservable.contains(4)
696+
assertFalse(o2.toBlockingObservable.single)
697+
}
698+
699+
@Test def repeatExample1(): Unit = {
700+
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6)
701+
assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList)
702+
}
703+
704+
@Test def repeatExample2(): Unit = {
705+
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat(2)
706+
assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList)
707+
}
708+
639709
@Test def retryExample1(): Unit = {
640710
val o : Observable[String] = List("alice", "bob", "carol").toObservable
641711
assertEquals(List("alice", "bob", "carol"), o.retry.toBlockingObservable.toList)
@@ -708,4 +778,27 @@ class RxScalaDemo extends JUnitSuite {
708778
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
709779
}
710780
}
781+
782+
@Test def startWithExample1(): Unit = {
783+
val o1 = List(3, 4).toObservable
784+
val o2 = 1 :: 2 :: o1
785+
assertEquals(List(1, 2, 3, 4), o2.toBlockingObservable.toList)
786+
}
787+
788+
@Test def startWithExample2(): Unit = {
789+
val prepended = List(2, 4).toObservable
790+
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(prepended)
791+
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
792+
}
793+
794+
@Test def startWithExample3(): Unit = {
795+
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(List(2, 4))
796+
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
797+
}
798+
799+
@Test def startWithExample4(): Unit = {
800+
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(Array(2, 4))
801+
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
802+
}
803+
711804
}

0 commit comments

Comments
 (0)