Skip to content

Commit 6052a02

Browse files
committed
Merge pull request #3627 from JohnWowUs/mergeDelayIterable
1.x: Added MergeDelay operators for Iterable of Observables
2 parents 48517ed + d179699 commit 6052a02

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

src/main/java/rx/Observable.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2211,6 +2211,65 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends Observable<
22112211
return source.lift(OperatorMerge.<T>instance(true, maxConcurrent));
22122212
}
22132213

2214+
/**
2215+
* Flattens an Iterable of Observables into one Observable, in a way that allows an Observer to receive all
2216+
* successfully emitted items from each of the source Observables without being interrupted by an error
2217+
* notification from one of them.
2218+
* <p>
2219+
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
2220+
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
2221+
* error notification until all of the merged Observables have finished emitting items.
2222+
* <p>
2223+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2224+
* <p>
2225+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2226+
* invoke the {@code onError} method of its Observers once.
2227+
* <dl>
2228+
* <dt><b>Scheduler:</b></dt>
2229+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2230+
* </dl>
2231+
*
2232+
* @param sequences
2233+
* the Iterable of Observables
2234+
* @return an Observable that emits items that are the result of flattening the items emitted by the
2235+
* Observables in the Iterable
2236+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2237+
*/
2238+
public static <T> Observable<T> mergeDelayError(Iterable<? extends Observable<? extends T>> sequences) {
2239+
return mergeDelayError(from(sequences));
2240+
}
2241+
2242+
/**
2243+
* Flattens an Iterable of Observables into one Observable, in a way that allows an Observer to receive all
2244+
* successfully emitted items from each of the source Observables without being interrupted by an error
2245+
* notification from one of them, while limiting the number of concurrent subscriptions to these Observables.
2246+
* <p>
2247+
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
2248+
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
2249+
* error notification until all of the merged Observables have finished emitting items.
2250+
* <p>
2251+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2252+
* <p>
2253+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2254+
* invoke the {@code onError} method of its Observers once.
2255+
* <dl>
2256+
* <dt><b>Scheduler:</b></dt>
2257+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2258+
* </dl>
2259+
*
2260+
* @param sequences
2261+
* the Iterable of Observables
2262+
* @param maxConcurrent
2263+
* the maximum number of Observables that may be subscribed to concurrently
2264+
* @return an Observable that emits items that are the result of flattening the items emitted by the
2265+
* Observables in the Iterable
2266+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2267+
*/
2268+
public static <T> Observable<T> mergeDelayError(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent) {
2269+
return mergeDelayError(from(sequences), maxConcurrent);
2270+
}
2271+
2272+
22142273
/**
22152274
* Flattens two Observables into one Observable, in a way that allows an Observer to receive all
22162275
* successfully emitted items from each of the source Observables without being interrupted by an error

src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,23 @@ public void testMergeList() {
272272
verify(stringObserver, times(2)).onNext("hello");
273273
}
274274

275+
// This is pretty much a clone of testMergeList but with the overloaded MergeDelayError for Iterables
276+
@Test
277+
public void mergeIterable() {
278+
final Observable<String> o1 = Observable.create(new TestSynchronousObservable());
279+
final Observable<String> o2 = Observable.create(new TestSynchronousObservable());
280+
List<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();
281+
listOfObservables.add(o1);
282+
listOfObservables.add(o2);
283+
284+
Observable<String> m = Observable.mergeDelayError(listOfObservables);
285+
m.subscribe(stringObserver);
286+
287+
verify(stringObserver, never()).onError(any(Throwable.class));
288+
verify(stringObserver, times(1)).onCompleted();
289+
verify(stringObserver, times(2)).onNext("hello");
290+
}
291+
275292
@Test
276293
public void testMergeArrayWithThreading() {
277294
final TestASynchronousObservable o1 = new TestASynchronousObservable();
@@ -577,4 +594,4 @@ public void call(Long t1) {
577594
assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException);
578595
assertEquals(Arrays.asList(1L, 1L, 1L), requests);
579596
}
580-
}
597+
}

0 commit comments

Comments
 (0)