Skip to content

Commit 45563a3

Browse files
committed
RxScala: Add more operators to match RxJava
1 parent 9c44701 commit 45563a3

File tree

5 files changed

+335
-16
lines changed

5 files changed

+335
-16
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

+112
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,18 @@ class RxScalaDemo extends JUnitSuite {
379379
waitFor(Olympics.yearTicks)
380380
}
381381

382+
@Test def groupByExample2() {
383+
val medalByYear = Olympics.mountainBikeMedals.groupBy(medal => medal.year, medal => medal.country)
384+
385+
for ((year, countries) <- medalByYear; country <- countries) {
386+
println(s"${year}: ${country}")
387+
}
388+
389+
Olympics.yearTicks.subscribe(year => println(s"\nYear $year starts."))
390+
391+
waitFor(Olympics.yearTicks)
392+
}
393+
382394
@Test def groupByUntilExample() {
383395
val numbers = Observable.interval(250 millis).take(14)
384396
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
@@ -1510,4 +1522,104 @@ class RxScalaDemo extends JUnitSuite {
15101522
o.take(3).toBlocking.foreach(println)
15111523
}
15121524

1525+
@Test def collectExample() {
1526+
val o = Observable.just(1, 1.0, "a", 2, 2.0, "b")
1527+
o.collect { case s: String => "Item: " + s }.foreach(println(_))
1528+
}
1529+
1530+
@Test def usingExample() {
1531+
import scala.io.{Codec, Source}
1532+
1533+
Observable.using { new java.net.URL("http://rxscala.github.io/").openStream() }(
1534+
input => Source.fromInputStream(input)(Codec.UTF8).getLines().toList.toObservable,
1535+
input => input.close
1536+
).foreach(println(_))
1537+
}
1538+
1539+
def createFastObservable: Observable[Int] = {
1540+
Observable {
1541+
subscriber: Subscriber[Int] => {
1542+
(0 to 2000).takeWhile(_ => !subscriber.isUnsubscribed).foreach(subscriber.onNext(_))
1543+
subscriber.onCompleted()
1544+
}
1545+
}
1546+
}
1547+
1548+
@Test def withoutBackpressureExample() {
1549+
val o = createFastObservable
1550+
val l = new CountDownLatch(1)
1551+
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
1552+
override def onStart() {
1553+
request(1)
1554+
}
1555+
1556+
override def onNext(n: Int) {
1557+
println(n)
1558+
Thread.sleep(10) // emulate a slow subscriber
1559+
request(1)
1560+
}
1561+
1562+
override def onError(e: Throwable) {
1563+
e.printStackTrace()
1564+
l.countDown()
1565+
}
1566+
1567+
override def onCompleted() {
1568+
l.countDown()
1569+
}
1570+
})
1571+
l.await()
1572+
}
1573+
1574+
@Test def onBackpressureDropExample() {
1575+
val o = createFastObservable.onBackpressureDrop
1576+
val l = new CountDownLatch(1)
1577+
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
1578+
override def onStart() {
1579+
request(1)
1580+
}
1581+
1582+
override def onNext(n: Int) {
1583+
println(n)
1584+
Thread.sleep(10) // emulate a slow subscriber
1585+
request(1)
1586+
}
1587+
1588+
override def onError(e: Throwable) {
1589+
e.printStackTrace()
1590+
l.countDown()
1591+
}
1592+
1593+
override def onCompleted() {
1594+
l.countDown()
1595+
}
1596+
})
1597+
l.await()
1598+
}
1599+
1600+
@Test def onBackpressureBufferExample() {
1601+
val o = createFastObservable.onBackpressureBuffer
1602+
val l = new CountDownLatch(1)
1603+
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
1604+
override def onStart() {
1605+
request(1)
1606+
}
1607+
1608+
override def onNext(n: Int) {
1609+
println(n)
1610+
Thread.sleep(10) // emulate a slow subscriber
1611+
request(1)
1612+
}
1613+
1614+
override def onError(e: Throwable) {
1615+
e.printStackTrace()
1616+
l.countDown()
1617+
}
1618+
1619+
override def onCompleted() {
1620+
l.countDown()
1621+
}
1622+
})
1623+
l.await()
1624+
}
15131625
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala

+8
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,12 @@ object JavaConversions {
6262
}
6363
}
6464
}
65+
66+
implicit def toJavaTransformer[T, R](transformer: Observable[T] => Observable[R]): rx.Observable.Transformer[T, R] = {
67+
new rx.Observable.Transformer[T, R] {
68+
override def call(o: rx.Observable[_ <: T]): rx.Observable[R] = {
69+
transformer(toScalaObservable(o)).asJavaObservable.asInstanceOf[rx.Observable[R]]
70+
}
71+
}
72+
}
6573
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

+198
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,38 @@ trait Observable[+T]
864864
: Observable[Observable[T]] // SI-7818
865865
}
866866

867+
/**
868+
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
869+
* Observable starts a new window periodically, as determined by the `timeshift` argument or a maximum
870+
* size as specified by the `count` argument (whichever is reached first). It emits
871+
* each window after a fixed timespan, specified by the `timespan` argument. When the source
872+
* Observable completes or Observable completes or encounters an error, the resulting Observable emits the
873+
* current window and propagates the notification from the source Observable.
874+
*
875+
* <img width="640" height="335" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window7.s.png" alt="">
876+
*
877+
* ===Backpressure Support:===
878+
* This operator does not support backpressure as it uses time to control data flow.
879+
*
880+
* ===Scheduler:===
881+
* you specify which `Scheduler` this operator will use
882+
*
883+
* @param timespan the period of time each window collects items before it should be emitted
884+
* @param timeshift the period of time after which a new window will be created
885+
* @param count the maximum size of each window before it should be emitted
886+
* @param scheduler the `Scheduler` to use when determining the end and start of a window
887+
* @return an Observable that emits new windows periodically as a fixed timespan elapses
888+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava wiki: window</a>
889+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window.aspx">MSDN: Observable.Window</a>
890+
*/
891+
def sliding(timespan: Duration, timeshift: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
892+
val span: Long = timespan.length
893+
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
894+
val unit: TimeUnit = timespan.unit
895+
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit, count, scheduler))
896+
: Observable[Observable[T]] // SI-7818
897+
}
898+
867899
/**
868900
* Returns an Observable which only emits those items for which a given predicate holds.
869901
*
@@ -1577,6 +1609,41 @@ trait Observable[+T]
15771609
toScalaObservable[T](asJavaObservable.cache())
15781610
}
15791611

1612+
/**
1613+
* Caches emissions from the source Observable and replays them in order to any subsequent Subscribers.
1614+
* This method has similar behavior to [[Observable.replay]] except that this auto-subscribes to the source
1615+
* Observable rather than returning a [[ConnectableObservable]] for which you must call
1616+
* `connect` to activate the subscription.
1617+
* <p>
1618+
* <img width="640" height="410" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/cache.png" alt="">
1619+
* <p>
1620+
* This is useful when you want an Observable to cache responses and you can't control the
1621+
* `subscribe/unsubscribe` behavior of all the [[Subscriber]]s.
1622+
* <p>
1623+
* When you call `cache`, it does not yet subscribe to the source Observable and so does not yet
1624+
* begin cacheing items. This only happens when the first Subscriber calls the resulting Observable's
1625+
* `subscribe` method.
1626+
* <p>
1627+
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the `cache`
1628+
* Observer so be careful not to use this Observer on Observables that emit an infinite or very large number
1629+
* of items that will use up memory.
1630+
*
1631+
* ===Backpressure Support:===
1632+
* This operator does not support upstream backpressure as it is purposefully requesting and caching everything emitted.
1633+
*
1634+
* ===Scheduler:===
1635+
* `cache` does not operate by default on a particular `Scheduler`.
1636+
*
1637+
* @param capacity hint for number of items to cache (for optimizing underlying data structure)
1638+
* @return an Observable that, when first subscribed to, caches all of its items and notifications for the
1639+
* benefit of subsequent subscribers
1640+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#cache">RxJava wiki: cache</a>
1641+
* @since 0.20
1642+
*/
1643+
def cache(capacity: Int): Observable[T] = {
1644+
toScalaObservable[T](asJavaObservable.cache(capacity))
1645+
}
1646+
15801647
/**
15811648
* Returns a new [[Observable]] that multicasts (shares) the original [[Observable]]. As long a
15821649
* there is more than 1 [[Subscriber]], this [[Observable]] will be subscribed and emitting data.
@@ -2176,6 +2243,41 @@ trait Observable[+T]
21762243
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
21772244
}
21782245

2246+
/**
2247+
* Groups the items emitted by an [[Observable]] according to a specified criterion, and emits these
2248+
* grouped items as `(key, observable)` pairs.
2249+
*
2250+
* <img width="640" height="360" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png" alt="">
2251+
*
2252+
* Note: A `(key, observable)` will cache the items it is to emit until such time as it
2253+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
2254+
* `(key, observable)` pairs that do not concern you. Instead, you can signal to them that they may
2255+
* discard their buffers by applying an operator like `take(0)` to them.
2256+
*
2257+
* ===Backpressure Support:===
2258+
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable"
2259+
* and blocking any one group would block the entire parent stream. If you need backpressure on individual groups
2260+
* then you should use operators such as `nBackpressureDrop` or `@link #onBackpressureBuffer`.</dd>
2261+
* ===Scheduler:===
2262+
* groupBy` does not operate by default on a particular `Scheduler`.
2263+
*
2264+
* @param keySelector a function that extracts the key for each item
2265+
* @param valueSelector a function that extracts the return element for each item
2266+
* @tparam K the key type
2267+
* @tparam V the value type
2268+
* @return an [[Observable]] that emits `(key, observable)` pairs, each of which corresponds to a
2269+
* unique key value and each of which emits those items from the source Observable that share that
2270+
* key value
2271+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
2272+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
2273+
*/
2274+
def groupBy[K, V](keySelector: T => K, valueSelector: T => V): Observable[(K, Observable[V])] = {
2275+
val jo: rx.Observable[rx.observables.GroupedObservable[K, V]] = asJavaObservable.groupBy[K, V](keySelector, valueSelector)
2276+
toScalaObservable[rx.observables.GroupedObservable[K, V]](jo).map {
2277+
go: rx.observables.GroupedObservable[K, V] => (go.getKey, toScalaObservable[V](go))
2278+
}
2279+
}
2280+
21792281
/**
21802282
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
21812283
* according to a function.
@@ -4298,6 +4400,75 @@ trait Observable[+T]
42984400
def nonEmpty: Observable[Boolean] = {
42994401
isEmpty.map(!_)
43004402
}
4403+
4404+
/**
4405+
* Transform an Observable by applying a particular Transformer function to it.
4406+
*
4407+
* This method operates on the Observable itself whereas [[Observable.lift]] operates on the Observable's
4408+
* Subscribers or Observers.
4409+
*
4410+
* If the operator you are creating is designed to act on the individual items emitted by a source
4411+
* Observable, use [[Observable.lift]]. If your operator is designed to transform the source Observable as a whole
4412+
* (for instance, by applying a particular set of existing RxJava operators to it) use `compose`.
4413+
*
4414+
* ===Scheduler:===
4415+
* `compose` does not operate by default on a particular [[Scheduler]].
4416+
*
4417+
* @param transformer implements the function that transforms the source Observable
4418+
* @return the source Observable, transformed by the transformer function
4419+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
4420+
*/
4421+
def compose[R](transformer: Observable[T] => Observable[R]): Observable[R] = {
4422+
toScalaObservable[R](asJavaObservable.compose(toJavaTransformer(transformer)))
4423+
}
4424+
4425+
/**
4426+
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer these
4427+
* items indefinitely until they can be emitted.
4428+
*
4429+
* <img width="640" height="300" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
4430+
*
4431+
* ===Scheduler:===
4432+
* `onBackpressureBuffer` does not operate by default on a particular `Scheduler`.
4433+
*
4434+
* @return the source Observable modified to buffer items to the extent system resources allow
4435+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
4436+
*/
4437+
def onBackpressureBuffer: Observable[T] = {
4438+
toScalaObservable[T](asJavaObservable.onBackpressureBuffer)
4439+
}
4440+
4441+
/**
4442+
* Use this operator when the upstream does not natively support backpressure and you wish to drop
4443+
* `onNext` when unable to handle further events.
4444+
*
4445+
* <img width="640" height="245" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
4446+
*
4447+
* If the downstream request count hits 0 then `onNext` will be dropped until `request(long n)`
4448+
* is invoked again to increase the request count.
4449+
*
4450+
* ===Scheduler:===
4451+
* onBackpressureDrop` does not operate by default on a particular `Scheduler`.
4452+
*
4453+
* @return the source Observable modified to drop `onNext` notifications on overflow
4454+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
4455+
*/
4456+
def onBackpressureDrop: Observable[T] = {
4457+
toScalaObservable[T](asJavaObservable.onBackpressureDrop)
4458+
}
4459+
4460+
/**
4461+
* Return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
4462+
* on which the function is defined.
4463+
*
4464+
* @tparam R the element type of the returned [[Observable]].
4465+
* @param pf the partial function which filters and maps the [[Observable]].
4466+
* @return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
4467+
* on which the function is defined.
4468+
*/
4469+
def collect[R](pf: PartialFunction[T, R]): Observable[R] = {
4470+
filter(pf.isDefinedAt(_)).map(pf)
4471+
}
43014472
}
43024473

43034474
/**
@@ -4746,6 +4917,7 @@ object Observable {
47464917
* @param observableFactory the factory function to obtain an Observable
47474918
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
47484919
*/
4920+
@deprecated("Use `using(=> Resource)(Resource => Observable[T], Resource => Unit)` instead", "0.20.1")
47494921
def using[T, Resource <: Subscription](resourceFactory: () => Resource, observableFactory: Resource => Observable[T]): Observable[T] = {
47504922
class ResourceSubscription(val resource: Resource) extends rx.Subscription {
47514923
def unsubscribe = resource.unsubscribe
@@ -4759,6 +4931,32 @@ object Observable {
47594931
))
47604932
}
47614933

4934+
/**
4935+
* Constructs an Observable that creates a dependent resource object.
4936+
*
4937+
* <img width="640" height="400" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/using.png" alt="" />
4938+
*
4939+
* ===Scheduler:===
4940+
* `using` does not operate by default on a particular `Scheduler`.
4941+
*
4942+
* @param resourceFactory the factory function to create a resource object that depends on the Observable.
4943+
* Note: this is a by-name parameter.
4944+
* @param observableFactory the factory function to create an Observable
4945+
* @param dispose the function that will dispose of the resource
4946+
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
4947+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#using">RxJava wiki: using</a>
4948+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
4949+
*/
4950+
def using[T, Resource](resourceFactory: => Resource)(observableFactory: Resource => Observable[T], dispose: Resource => Unit): Observable[T] = {
4951+
val jResourceFactory = new rx.functions.Func0[Resource] {
4952+
override def call: Resource = resourceFactory
4953+
}
4954+
val jObservableFactory = new rx.functions.Func1[Resource, rx.Observable[_ <: T]] {
4955+
override def call(r: Resource) = observableFactory(r).asJavaObservable
4956+
}
4957+
toScalaObservable[T](rx.Observable.using[T, Resource](jResourceFactory, jObservableFactory, dispose))
4958+
}
4959+
47624960
/**
47634961
* Mirror the one Observable in an Iterable of several Observables that first emits an item.
47644962
*

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,7 @@ private[scala] class WithFilter[+T] (p: T => Boolean, asJava: rx.Observable[_ <:
3838
toScalaObservable[T](asJava.filter((x: T) => p(x) && q(x)))
3939
}
4040

41-
// there is no foreach here, that's only available on BlockingObservable
41+
def foreach(onNext: T => Unit): Unit = {
42+
toScalaObservable[T](asJava.filter(p)).foreach(onNext)
43+
}
4244
}

0 commit comments

Comments
 (0)