Skip to content

Commit 3721666

Browse files
committed
1.x: optimize merge/flatMap for empty sources (#3761)
1 parent 4f2271d commit 3721666

File tree

2 files changed

+119
-0
lines changed

2 files changed

+119
-0
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ public void onNext(Observable<? extends T> t) {
237237
if (t == null) {
238238
return;
239239
}
240+
if (t == Observable.empty()) {
241+
emitEmpty();
242+
} else
240243
if (t instanceof ScalarSynchronousObservable) {
241244
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
242245
} else {
@@ -247,6 +250,16 @@ public void onNext(Observable<? extends T> t) {
247250
}
248251
}
249252

253+
void emitEmpty() {
254+
int produced = scalarEmissionCount + 1;
255+
if (produced == scalarEmissionLimit) {
256+
scalarEmissionCount = 0;
257+
this.requestMore(produced);
258+
} else {
259+
scalarEmissionCount = produced;
260+
}
261+
}
262+
250263
private void reportError() {
251264
List<Throwable> list = new ArrayList<Throwable>(errors);
252265
if (list.size() == 1) {

src/test/java/rx/internal/operators/OperatorFlatMapTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.Observer;
3030
import rx.exceptions.TestException;
3131
import rx.functions.*;
32+
import rx.internal.util.RxRingBuffer;
3233
import rx.observers.TestSubscriber;
3334
import rx.schedulers.Schedulers;
3435

@@ -544,4 +545,109 @@ public Observable<Integer> call(Integer t) {
544545
ts.assertValueCount(n * 2);
545546
}
546547
}
548+
549+
@Test
550+
public void justEmptyMixture() {
551+
TestSubscriber<Integer> ts = TestSubscriber.create();
552+
553+
Observable.range(0, 4 * RxRingBuffer.SIZE)
554+
.flatMap(new Func1<Integer, Observable<Integer>>() {
555+
@Override
556+
public Observable<Integer> call(Integer v) {
557+
return (v & 1) == 0 ? Observable.<Integer>empty() : Observable.just(v);
558+
}
559+
})
560+
.subscribe(ts);
561+
562+
ts.assertValueCount(2 * RxRingBuffer.SIZE);
563+
ts.assertNoErrors();
564+
ts.assertCompleted();
565+
566+
int j = 1;
567+
for (Integer v : ts.getOnNextEvents()) {
568+
Assert.assertEquals(j, v.intValue());
569+
570+
j += 2;
571+
}
572+
}
573+
574+
@Test
575+
public void rangeEmptyMixture() {
576+
TestSubscriber<Integer> ts = TestSubscriber.create();
577+
578+
Observable.range(0, 4 * RxRingBuffer.SIZE)
579+
.flatMap(new Func1<Integer, Observable<Integer>>() {
580+
@Override
581+
public Observable<Integer> call(Integer v) {
582+
return (v & 1) == 0 ? Observable.<Integer>empty() : Observable.range(v, 2);
583+
}
584+
})
585+
.subscribe(ts);
586+
587+
ts.assertValueCount(4 * RxRingBuffer.SIZE);
588+
ts.assertNoErrors();
589+
ts.assertCompleted();
590+
591+
int j = 1;
592+
List<Integer> list = ts.getOnNextEvents();
593+
for (int i = 0; i < list.size(); i += 2) {
594+
Assert.assertEquals(j, list.get(i).intValue());
595+
Assert.assertEquals(j + 1, list.get(i + 1).intValue());
596+
597+
j += 2;
598+
}
599+
}
600+
601+
@Test
602+
public void justEmptyMixtureMaxConcurrent() {
603+
TestSubscriber<Integer> ts = TestSubscriber.create();
604+
605+
Observable.range(0, 4 * RxRingBuffer.SIZE)
606+
.flatMap(new Func1<Integer, Observable<Integer>>() {
607+
@Override
608+
public Observable<Integer> call(Integer v) {
609+
return (v & 1) == 0 ? Observable.<Integer>empty() : Observable.just(v);
610+
}
611+
}, 16)
612+
.subscribe(ts);
613+
614+
ts.assertValueCount(2 * RxRingBuffer.SIZE);
615+
ts.assertNoErrors();
616+
ts.assertCompleted();
617+
618+
int j = 1;
619+
for (Integer v : ts.getOnNextEvents()) {
620+
Assert.assertEquals(j, v.intValue());
621+
622+
j += 2;
623+
}
624+
}
625+
626+
@Test
627+
public void rangeEmptyMixtureMaxConcurrent() {
628+
TestSubscriber<Integer> ts = TestSubscriber.create();
629+
630+
Observable.range(0, 4 * RxRingBuffer.SIZE)
631+
.flatMap(new Func1<Integer, Observable<Integer>>() {
632+
@Override
633+
public Observable<Integer> call(Integer v) {
634+
return (v & 1) == 0 ? Observable.<Integer>empty() : Observable.range(v, 2);
635+
}
636+
}, 16)
637+
.subscribe(ts);
638+
639+
ts.assertValueCount(4 * RxRingBuffer.SIZE);
640+
ts.assertNoErrors();
641+
ts.assertCompleted();
642+
643+
int j = 1;
644+
List<Integer> list = ts.getOnNextEvents();
645+
for (int i = 0; i < list.size(); i += 2) {
646+
Assert.assertEquals(j, list.get(i).intValue());
647+
Assert.assertEquals(j + 1, list.get(i + 1).intValue());
648+
649+
j += 2;
650+
}
651+
}
652+
547653
}

0 commit comments

Comments
 (0)