Skip to content

Commit d3a7350

Browse files
Merge pull request #732 from chrisgrimm/master
Ported groupByUntil function to scala-adapter
2 parents 4f9afe8 + 83aa857 commit d3a7350

File tree

4 files changed

+42
-7
lines changed

4 files changed

+42
-7
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

+12-1
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ class RxScalaDemo extends JUnitSuite {
242242
waitFor(firstMedalOfEachCountry)
243243
}
244244

245+
@Test def groupByUntilExample() {
246+
val numbers = Observable.interval(250 millis) take 14
247+
val grouped = numbers.groupByUntil[Long, Long](
248+
{case x => x % 2},
249+
{case (key, obs) => obs filter {case x => x == 7}}
250+
)
251+
val sequenced = (grouped map {case (key, obs) => obs.toSeq}).flatten
252+
sequenced subscribe {x => println(s"Emitted group: $x")}
253+
}
254+
255+
245256
@Test def olympicsExample() {
246257
val medals = Olympics.mountainBikeMedals.publish
247258
medals.subscribe(println(_))
@@ -449,4 +460,4 @@ class RxScalaDemo extends JUnitSuite {
449460
obs.toBlockingObservable.toIterable.last
450461
}
451462

452-
}
463+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

+24
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,30 @@ trait Observable[+T]
13251325
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
13261326
}
13271327

1328+
/**
1329+
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
1330+
* according to a function.
1331+
*
1332+
* @param f
1333+
* a function that extracts the key from an item
1334+
* @param closings
1335+
* the function that accepts the key of a given group and an observable representing that group, and returns
1336+
* an observable that emits a single Closing when the group should be closed.
1337+
* @tparam K
1338+
* the type of the keys returned by the discriminator function.
1339+
* @tparam Closing
1340+
* the type of the element emitted from the closings observable.
1341+
* @return an Observable that emits `(key, observable)` pairs, where `observable`
1342+
* contains all items for which `f` returned `key` before `closings` emits a value.
1343+
*/
1344+
def groupByUntil[K, Closing](f: T => K, closings: (K, Observable[T])=>Observable[Closing]): Observable[(K, Observable[T])] = {
1345+
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Closing]] =
1346+
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
1347+
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
1348+
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
1349+
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
1350+
}
1351+
13281352
/**
13291353
* Given an Observable that emits Observables, creates a single Observable that
13301354
* emits the items emitted by the most recently published of those Observables.

rxjava-core/src/main/java/rx/Observable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -8008,7 +8008,7 @@ public <U> Observable<T> skipUntil(Observable<U> other) {
80088008
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupByUntil()</a>
80098009
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211932.aspx">MSDN: Observable.GroupByUntil</a>
80108010
*/
8011-
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<TDuration>> durationSelector) {
8011+
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<? extends TDuration>> durationSelector) {
80128012
return groupByUntil(keySelector, Functions.<T>identity(), durationSelector);
80138013
}
80148014

@@ -8029,7 +8029,7 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
80298029
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupByUntil()</a>
80308030
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
80318031
*/
8032-
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
8032+
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<? extends TDuration>> durationSelector) {
80338033
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
80348034
}
80358035

rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ public class OperationGroupByUntil<TSource, TKey, TResult, TDuration> implements
4242
final Observable<TSource> source;
4343
final Func1<? super TSource, ? extends TKey> keySelector;
4444
final Func1<? super TSource, ? extends TResult> valueSelector;
45-
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector;
45+
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<? extends TDuration>> durationSelector;
4646
public OperationGroupByUntil(Observable<TSource> source,
4747
Func1<? super TSource, ? extends TKey> keySelector,
4848
Func1<? super TSource, ? extends TResult> valueSelector,
49-
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector) {
49+
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<? extends TDuration>> durationSelector) {
5050
this.source = source;
5151
this.keySelector = keySelector;
5252
this.valueSelector = valueSelector;
@@ -107,7 +107,7 @@ public void onNext(TSource args) {
107107
}
108108

109109
if (newGroup) {
110-
Observable<TDuration> duration;
110+
Observable<? extends TDuration> duration;
111111
try {
112112
duration = durationSelector.call(g);
113113
} catch (Throwable t) {
@@ -234,4 +234,4 @@ public void onCompleted() {
234234
}
235235

236236
}
237-
}
237+
}

0 commit comments

Comments
 (0)