Skip to content

Commit d2ae945

Browse files
Merge pull request #1598 from jbripley/rxscala-doonunsubscribe
RxScala: Add mapping to RxJava doOnUnsubscribe
2 parents a590895 + 521c23c commit d2ae945

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,16 @@ class RxScalaDemo extends JUnitSuite {
833833
// onCompleted
834834
}
835835

836+
@Test def doOnSubscribeExample(): Unit = {
837+
val o = List("red", "green", "blue").toObservable.doOnSubscribe { println("subscribed") }
838+
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
839+
// subscribed
840+
// red
841+
// green
842+
// blue
843+
// onCompleted
844+
}
845+
836846
@Test def doOnTerminateExample(): Unit = {
837847
val o = List("red", "green", "blue").toObservable.doOnTerminate { println("terminate") }
838848
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
@@ -843,6 +853,16 @@ class RxScalaDemo extends JUnitSuite {
843853
// onCompleted
844854
}
845855

856+
@Test def doOnUnsubscribeExample(): Unit = {
857+
val o = List("red", "green", "blue").toObservable.doOnUnsubscribe { println("unsubscribed") }
858+
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
859+
// red
860+
// green
861+
// blue
862+
// onCompleted
863+
// unsubscribed
864+
}
865+
846866
@Test def finallyDoExample(): Unit = {
847867
val o = List("red", "green", "blue").toObservable.finallyDo { println("finally") }
848868
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3592,6 +3592,28 @@ trait Observable[+T]
35923592
toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted)))
35933593
}
35943594

3595+
/**
3596+
* Modifies the source `Observable` so that it invokes the given action when it is subscribed from
3597+
* its subscribers. Each subscription will result in an invocation of the given action except when the
3598+
* source `Observable` is reference counted, in which case the source `Observable` will invoke
3599+
* the given action for the first subscription.
3600+
* <p>
3601+
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnSubscribe.png" alt="">
3602+
* <dl>
3603+
* <dt><b>Scheduler:</b></dt>
3604+
* <dd>`onSubscribe` does not operate by default on a particular `Scheduler`.</dd>
3605+
* </dl>
3606+
*
3607+
* @param onSubscribe
3608+
* the action that gets called when an observer subscribes to this `Observable`
3609+
* @return the source `Observable` modified so as to call this Action when appropriate
3610+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#doonsubscribe">RxJava wiki: doOnSubscribe</a>
3611+
* @since 0.20
3612+
*/
3613+
def doOnSubscribe(onSubscribe: => Unit): Observable[T] = {
3614+
toScalaObservable[T](asJavaObservable.doOnSubscribe(() => onSubscribe))
3615+
}
3616+
35953617
/**
35963618
* Modifies an Observable so that it invokes an action when it calls `onCompleted` or `onError`.
35973619
* <p>
@@ -3608,6 +3630,28 @@ trait Observable[+T]
36083630
toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate))
36093631
}
36103632

3633+
/**
3634+
* Modifies the source `Observable` so that it invokes the given action when it is unsubscribed from
3635+
* its subscribers. Each un-subscription will result in an invocation of the given action except when the
3636+
* source `Observable` is reference counted, in which case the source `Observable` will invoke
3637+
* the given action for the very last un-subscription.
3638+
* <p>
3639+
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnUnsubscribe.png" alt="">
3640+
* <dl>
3641+
* <dt><b>Scheduler:</b></dt>
3642+
* <dd>`doOnUnsubscribe` does not operate by default on a particular `Scheduler`.</dd>
3643+
* </dl>
3644+
*
3645+
* @param onUnsubscribe
3646+
* the action that gets called when this `Observable` is unsubscribed
3647+
* @return the source `Observable` modified so as to call this Action when appropriate
3648+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#doonunsubscribe">RxJava wiki: doOnUnsubscribe</a>
3649+
* @since 0.20
3650+
*/
3651+
def doOnUnsubscribe(onUnsubscribe: => Unit): Observable[T] = {
3652+
toScalaObservable[T](asJavaObservable.doOnUnsubscribe(() => onUnsubscribe))
3653+
}
3654+
36113655
/**
36123656
* Given two Observables, mirror the one that first emits an item.
36133657
*

0 commit comments

Comments
 (0)