Skip to content

Commit 4e4cb16

Browse files
Merge pull request #1884 from benjchristensen/mutable-collect-scan-reduce
Fix Scan/Reduce/Collect Factory Ambiguity
2 parents c5fd708 + c63c76b commit 4e4cb16

File tree

3 files changed

+45
-75
lines changed

3 files changed

+45
-75
lines changed

src/main/java/rx/Observable.java

+10-68
Original file line numberDiff line numberDiff line change
@@ -3459,7 +3459,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
34593459
* <dd>{@code collect} does not operate by default on a particular {@link Scheduler}.</dd>
34603460
* </dl>
34613461
*
3462-
* @param state
3462+
* @param stateFactory
34633463
* the mutable data structure that will collect the items
34643464
* @param collector
34653465
* a function that accepts the {@code state} and an emitted item, and modifies {@code state}
@@ -3468,7 +3468,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
34683468
* into a single mutable data structure
34693469
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#collect">RxJava wiki: collect</a>
34703470
*/
3471-
public final <R> Observable<R> collect(R state, final Action2<R, ? super T> collector) {
3471+
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
34723472
Func2<R, T, R> accumulator = new Func2<R, T, R>() {
34733473

34743474
@Override
@@ -3478,7 +3478,14 @@ public final R call(R state, T value) {
34783478
}
34793479

34803480
};
3481-
return reduce(state, accumulator);
3481+
3482+
/*
3483+
* Discussion and confirmation of implementation at
3484+
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
3485+
*
3486+
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
3487+
*/
3488+
return lift(new OperatorScan<R, T>(stateFactory, accumulator)).last();
34823489
}
34833490

34843491
/**
@@ -5293,40 +5300,6 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
52935300
return scan(initialValue, accumulator).takeLast(1);
52945301
}
52955302

5296-
/**
5297-
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
5298-
* Observable and a specified seed value, then feeds the result of that function along with the second item
5299-
* emitted by an Observable into the same function, and so on until all items have been emitted by the
5300-
* source Observable, emitting the final result from the final call to your function as its sole item.
5301-
* <p>
5302-
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
5303-
* <p>
5304-
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
5305-
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
5306-
* that does a similar operation on lists.
5307-
* <dl>
5308-
* <dt><b>Backpressure Support:</b></dt>
5309-
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
5310-
* them to a single {@code onNext}.</dd>
5311-
* <dt><b>Scheduler:</b></dt>
5312-
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
5313-
* </dl>
5314-
*
5315-
* @param initialValueFactory
5316-
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
5317-
* @param accumulator
5318-
* an accumulator function to be invoked on each item emitted by the source Observable, the
5319-
* result of which will be used in the next accumulator call
5320-
* @return an Observable that emits a single item that is the result of accumulating the output from the
5321-
* items emitted by the source Observable
5322-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
5323-
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
5324-
*/
5325-
public final <R> Observable<R> reduce(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
5326-
return scan(initialValueFactory, accumulator).takeLast(1);
5327-
}
5328-
5329-
53305303
/**
53315304
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
53325305
* <p>
@@ -6359,37 +6332,6 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
63596332
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
63606333
return lift(new OperatorScan<R, T>(initialValue, accumulator));
63616334
}
6362-
6363-
/**
6364-
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
6365-
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
6366-
* the source Observable into the same function, and so on until all items have been emitted by the source
6367-
* Observable, emitting the result of each of these iterations.
6368-
* <p>
6369-
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
6370-
* <p>
6371-
* This sort of function is sometimes called an accumulator.
6372-
* <p>
6373-
* Note that the Observable that results from this method will emit the item returned from
6374-
* {@code initialValueFactory} as its first emitted item.
6375-
* <dl>
6376-
* <dt><b>Scheduler:</b></dt>
6377-
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
6378-
* </dl>
6379-
*
6380-
* @param initialValueFactory
6381-
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
6382-
* @param accumulator
6383-
* an accumulator function to be invoked on each item emitted by the source Observable, whose
6384-
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
6385-
* next accumulator call
6386-
* @return an Observable that emits the item returned from {@code initialValueFactory} followed by the
6387-
* results of each call to the accumulator function
6388-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
6389-
*/
6390-
public final <R> Observable<R> scan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
6391-
return lift(new OperatorScan<R, T>(initialValueFactory, accumulator));
6392-
}
63936335

63946336
/**
63956337
* Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract

src/test/java/rx/ObservableTests.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import rx.exceptions.OnErrorNotImplementedException;
5151
import rx.functions.Action1;
5252
import rx.functions.Action2;
53+
import rx.functions.Func0;
5354
import rx.functions.Func1;
5455
import rx.functions.Func2;
5556
import rx.observables.ConnectableObservable;
@@ -965,23 +966,47 @@ public void testRangeWithScheduler() {
965966

966967
@Test
967968
public void testCollectToList() {
968-
List<Integer> list = Observable.just(1, 2, 3).collect(new ArrayList<Integer>(), new Action2<List<Integer>, Integer>() {
969+
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
970+
971+
@Override
972+
public List<Integer> call() {
973+
return new ArrayList<Integer>();
974+
}
975+
976+
}, new Action2<List<Integer>, Integer>() {
969977

970978
@Override
971979
public void call(List<Integer> list, Integer v) {
972980
list.add(v);
973981
}
974-
}).toBlocking().last();
982+
});
983+
984+
List<Integer> list = o.toBlocking().last();
975985

976986
assertEquals(3, list.size());
977987
assertEquals(1, list.get(0).intValue());
978988
assertEquals(2, list.get(1).intValue());
979989
assertEquals(3, list.get(2).intValue());
990+
991+
// test multiple subscribe
992+
List<Integer> list2 = o.toBlocking().last();
993+
994+
assertEquals(3, list2.size());
995+
assertEquals(1, list2.get(0).intValue());
996+
assertEquals(2, list2.get(1).intValue());
997+
assertEquals(3, list2.get(2).intValue());
980998
}
981999

9821000
@Test
9831001
public void testCollectToString() {
984-
String value = Observable.just(1, 2, 3).collect(new StringBuilder(), new Action2<StringBuilder, Integer>() {
1002+
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
1003+
1004+
@Override
1005+
public StringBuilder call() {
1006+
return new StringBuilder();
1007+
}
1008+
1009+
}, new Action2<StringBuilder, Integer>() {
9851010

9861011
@Override
9871012
public void call(StringBuilder sb, Integer v) {

src/test/java/rx/internal/operators/OperatorScanTest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import rx.Observable;
3838
import rx.Observer;
3939
import rx.Subscriber;
40+
import rx.functions.Action2;
4041
import rx.functions.Func0;
4142
import rx.functions.Func1;
4243
import rx.functions.Func2;
@@ -269,22 +270,24 @@ public void onNext(Integer t) {
269270
assertEquals(101, count.get());
270271
}
271272

273+
/**
274+
* This uses the public API collect which uses scan under the covers.
275+
*/
272276
@Test
273277
public void testSeedFactory() {
274278
Observable<List<Integer>> o = Observable.range(1, 10)
275-
.scan(new Func0<List<Integer>>() {
279+
.collect(new Func0<List<Integer>>() {
276280

277281
@Override
278282
public List<Integer> call() {
279283
return new ArrayList<Integer>();
280284
}
281285

282-
}, new Func2<List<Integer>, Integer, List<Integer>>() {
286+
}, new Action2<List<Integer>, Integer>() {
283287

284288
@Override
285-
public List<Integer> call(List<Integer> list, Integer t2) {
289+
public void call(List<Integer> list, Integer t2) {
286290
list.add(t2);
287-
return list;
288291
}
289292

290293
}).takeLast(1);

0 commit comments

Comments
 (0)