Skip to content

Commit 7e1c2d4

Browse files
committed
Add 'startWith' to RxScala
1 parent 1b4e2a8 commit 7e1c2d4

File tree

3 files changed

+88
-2
lines changed

3 files changed

+88
-2
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,4 +756,26 @@ class RxScalaDemo extends JUnitSuite {
756756
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
757757
}
758758
}
759+
760+
@Test def startWithExample1(): Unit = {
761+
val o = List(2, 3).toObservable + 1
762+
assertEquals(List(1, 2, 3), o.toBlockingObservable.toList)
763+
}
764+
765+
@Test def startWithExample2(): Unit = {
766+
val prepended = List(2, 4).toObservable
767+
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(prepended)
768+
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
769+
}
770+
771+
@Test def startWithExample3(): Unit = {
772+
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(List(2, 4))
773+
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
774+
}
775+
776+
@Test def startWithExample4(): Unit = {
777+
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(Array(2, 4))
778+
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
779+
}
780+
759781
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import collection.JavaConversions._
7777
*/
7878
trait Observable[+T]
7979
{
80+
import scala.collection.JavaConverters._
8081
import scala.collection.Seq
8182
import scala.concurrent.duration.{Duration, TimeUnit}
8283
import rx.functions._
@@ -238,6 +239,65 @@ trait Observable[+T]
238239
toScalaObservable(rx.Observable.concat(o1, o2))
239240
}
240241

242+
/**
243+
* Returns an Observable that emits a specified item before it begins to emit items emitted by the source Observable.
244+
* <p>
245+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/startWith.png">
246+
*
247+
* @param elem the item to emit
248+
* @return an Observable that emits the specified item before it begins to emit items emitted by the source Observable
249+
*/
250+
def +[U >: T](elem: U): Observable[U] = {
251+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
252+
toScalaObservable(thisJava.startWith(elem))
253+
}
254+
255+
/**
256+
* Returns an Observable that emits the items in a specified `Observable` before it begins to emit
257+
* items emitted by the source Observable.
258+
* <p>
259+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/startWith.o.png">
260+
*
261+
* @param that an Observable that contains the items you want the modified Observable to emit first
262+
* @return an Observable that emits the items in the specified `Observable` and then emits the items
263+
* emitted by the source Observable
264+
*/
265+
def startWith[U >: T](that: Observable[U]): Observable[U] = {
266+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
267+
val thatJava = that.asJavaObservable.asInstanceOf[rx.Observable[U]]
268+
toScalaObservable(thisJava.startWith(thatJava))
269+
}
270+
271+
/**
272+
* Returns an Observable that emits the items in a specified `Iterable` before it begins to emit items
273+
* emitted by the source Observable.
274+
* <p>
275+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/startWith.png">
276+
*
277+
* @param iterable an Iterable that contains the items you want the modified Observable to emit first
278+
* @return an Observable that emits the items in the specified `Iterable` and then emits the items
279+
* emitted by the source Observable
280+
*/
281+
def startWith[U >: T](iterable: Iterable[U]): Observable[U] = {
282+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
283+
toScalaObservable(thisJava.startWith(iterable.asJava))
284+
}
285+
286+
/**
287+
* Returns an Observable that emits the items in a specified `Iterable`, on a specified `Scheduler`, before it begins to emit items emitted by the source Observable.
288+
* <p>
289+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/startWith.s.png">
290+
*
291+
* @param iterable an Iterable that contains the items you want the modified Observable to emit first
292+
* @param scheduler the Scheduler to emit the prepended values on
293+
* @return an Observable that emits the items in the specified `Iterable`, on a specified `Scheduler`, and then emits the items
294+
* emitted by the source Observable
295+
*/
296+
def startWith[U >: T](iterable: Iterable[U], scheduler: Scheduler): Observable[U] = {
297+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
298+
toScalaObservable(thisJava.startWith(iterable.asJava, scalaSchedulerToJavaScheduler(scheduler)))
299+
}
300+
241301
/**
242302
* Returns an Observable that emits the items emitted by several Observables, one after the
243303
* other.

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,11 @@ class CompletenessTest extends JUnitSuite {
100100
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
101101
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
102102
"skipUntil(Observable[U])" -> "dropUntil(Observable[E])",
103-
"startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]",
103+
"startWith(Array[T])" -> "startWith(Iterable[U])",
104+
"startWith(Array[T], Scheduler)" -> "startWith(Iterable[U], Scheduler)",
105+
"startWith(Iterable[T])" -> "startWith(Iterable[U])",
106+
"startWith(Iterable[T], Scheduler)" -> "startWith(Iterable[U], Scheduler)",
107+
"startWith(Observable[T])" -> "startWith(Observable[U])",
104108
"skipLast(Int)" -> "dropRight(Int)",
105109
"skipLast(Long, TimeUnit)" -> "dropRight(Duration)",
106110
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
@@ -159,7 +163,7 @@ class CompletenessTest extends JUnitSuite {
159163
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]"
160164
) ++ List.iterate("T", 9)(s => s + ", T").map(
161165
// all 9 overloads of startWith:
162-
"startWith(" + _ + ")" -> "[unnecessary because we can just use `++` instead]"
166+
"startWith(" + _ + ")" -> "[unnecessary because we can just use `+` instead]"
163167
).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
164168
// concat 2-9
165169
"concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]"

0 commit comments

Comments
 (0)