Skip to content

Commit 6dcc9d3

Browse files
committed
Add 'publish' variants to RxScala
1 parent 7e1c2d4 commit 6dcc9d3

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,28 @@ class RxScalaDemo extends JUnitSuite {
297297
shared.connect
298298
}
299299

300+
@Test def exampleWithPublish2() {
301+
val unshared = Observable.from(1 to 4)
302+
val shared = unshared.publish(0)
303+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
304+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
305+
shared.connect
306+
}
307+
308+
@Test def exampleWithPublish3() {
309+
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2))
310+
o.subscribe(n => println(s"subscriber 1 gets $n"))
311+
o.subscribe(n => println(s"subscriber 2 gets $n"))
312+
Thread.sleep(1000)
313+
}
314+
315+
@Test def exampleWithPublish4() {
316+
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2), -1L)
317+
o.subscribe(n => println(s"subscriber 1 gets $n"))
318+
o.subscribe(n => println(s"subscriber 2 gets $n"))
319+
Thread.sleep(1000)
320+
}
321+
300322
def doLater(waitTime: Duration, action: () => Unit): Unit = {
301323
Observable.interval(waitTime).take(1).subscribe(_ => action())
302324
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1145,10 +1145,64 @@ trait Observable[+T]
11451145
*
11461146
* @return an [[rx.lang.scala.observables.ConnectableObservable]].
11471147
*/
1148-
def publish: ConnectableObservable[T] = {
1148+
def publish(): ConnectableObservable[T] = {
11491149
new ConnectableObservable[T](asJavaObservable.publish())
11501150
}
11511151

1152+
1153+
/**
1154+
* Returns an Observable that emits `initialValue` followed by the items emitted by a `ConnectableObservable` that shares a single subscription to the source Observable.
1155+
* <p>
1156+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishConnect.i.png">
1157+
*
1158+
* @param initialValue the initial value to be emitted by the resulting Observable
1159+
* @return a `ConnectableObservable` that shares a single subscription to the underlying Observable and starts with `initialValue`
1160+
*/
1161+
def publish[U >: T](initialValue: U): ConnectableObservable[U] = {
1162+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1163+
new ConnectableObservable[U](thisJava.publish(initialValue))
1164+
}
1165+
1166+
/**
1167+
* Returns an Observable that emits the results of invoking a specified selector on items emitted by a `ConnectableObservable`
1168+
* that shares a single subscription to the underlying sequence.
1169+
* <p>
1170+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishConnect.f.png">
1171+
*
1172+
* @param selector a function that can use the multicasted source sequence as many times as needed, without
1173+
* causing multiple subscriptions to the source sequence. Subscribers to the given source will
1174+
* receive all notifications of the source from the time of the subscription forward.
1175+
* @return an Observable that emits the results of invoking the selector on the items emitted by a `ConnectableObservable`
1176+
* that shares a single subscription to the underlying sequence
1177+
*/
1178+
def publish[U >: T, R](selector: Observable[U] => Observable[R]): Observable[R] = {
1179+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1180+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1181+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1182+
toScalaObservable[R](thisJava.publish(fJava))
1183+
}
1184+
1185+
/**
1186+
* Returns an Observable that emits `initialValue` followed by the results of invoking a specified
1187+
* selector on items emitted by a `ConnectableObservable` that shares a single subscription to the
1188+
* source Observable.
1189+
* <p>
1190+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishConnect.if.png">
1191+
*
1192+
* @param selector a function that can use the multicasted source sequence as many times as needed, without
1193+
* causing multiple subscriptions to the source Observable. Subscribers to the source will
1194+
* receive all notifications of the source from the time of the subscription forward
1195+
* @param initialValue the initial value of the underlying `BehaviorSubject`
1196+
* @return an Observable that emits `initialValue` followed by the results of invoking the selector
1197+
* on a `ConnectableObservable` that shares a single subscription to the underlying Observable
1198+
*/
1199+
def publish[U >: T, R](selector: Observable[U] => Observable[R], initialValue: U): Observable[R] = {
1200+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1201+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1202+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1203+
toScalaObservable[R](thisJava.publish(fJava, initialValue))
1204+
}
1205+
11521206
// TODO add Scala-like aggregate function
11531207

11541208
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ class CompletenessTest extends JUnitSuite {
8989
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
9090
"parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])",
9191
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
92+
"publish()" -> "publish()",
93+
"publish(T)" -> "publish(U)",
94+
"publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[U] => Observable[R])",
95+
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)",
9296
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
9397
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
9498
"retry()" -> "retry()",

0 commit comments

Comments
 (0)