Skip to content

Commit 84c88bd

Browse files
Merge pull request #1726 from benjchristensen/fix-merge-1723
Fix Merge: backpressure + scalarValueQueue don't play nicely
2 parents 67df049 + 4565858 commit 84c88bd

File tree

2 files changed

+117
-10
lines changed

2 files changed

+117
-10
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
7272
* I'd love to have contributions that improve this class, but keep in mind the performance and GC pressure.
7373
* The benchmarks I use are in the JMH OperatorMergePerf class. GC memory pressure is tested using Java Flight Recorder
7474
* to track object allocation.
75-
*
76-
* TODO There is still a known concurrency bug somewhere either in this class, in SubscriptionIndexedRingBuffer or their relationship.
77-
* See https://github.com/ReactiveX/RxJava/issues/1420 for more information on this.
7875
*/
7976

8077
public OperatorMerge() {
@@ -426,7 +423,7 @@ public void onCompleted() {
426423
boolean c = false;
427424
synchronized (this) {
428425
completed = true;
429-
if (wip == 0) {
426+
if (wip == 0 && (scalarValueQueue == null || scalarValueQueue.isEmpty())) {
430427
c = true;
431428
}
432429
}
@@ -496,7 +493,17 @@ public void request(long n) {
496493
requested = Long.MAX_VALUE;
497494
} else {
498495
REQUESTED.getAndAdd(this, n);
499-
ms.drainQueuesIfNeeded();
496+
if (ms.drainQueuesIfNeeded()) {
497+
boolean sendComplete = false;
498+
synchronized (this) {
499+
if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
500+
sendComplete = true;
501+
}
502+
}
503+
if (sendComplete) {
504+
ms.drainAndComplete();
505+
}
506+
}
500507
}
501508
}
502509

src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static java.util.Arrays.asList;
1819
import static org.junit.Assert.assertEquals;
1920
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertTrue;
@@ -39,13 +40,9 @@
3940
import org.mockito.Mock;
4041
import org.mockito.MockitoAnnotations;
4142

42-
import rx.Observable;
43+
import rx.*;
4344
import rx.Observable.OnSubscribe;
44-
import rx.Observer;
45-
import rx.Scheduler;
4645
import rx.Scheduler.Worker;
47-
import rx.Subscriber;
48-
import rx.Subscription;
4946
import rx.functions.Action0;
5047
import rx.functions.Action1;
5148
import rx.functions.Func1;
@@ -1005,4 +1002,107 @@ public void call(Subscriber<? super Integer> s) {
10051002
assertEquals(10000, ts.getOnNextEvents().size());
10061003
}
10071004

1005+
@Test
1006+
public void shouldCompleteAfterApplyingBackpressure_NormalPath() {
1007+
Observable<Integer> source = Observable.mergeDelayError(Observable.just(Observable.range(1, 2)));
1008+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1009+
subscriber.requestMore(0);
1010+
source.subscribe(subscriber);
1011+
subscriber.requestMore(3); // 1, 2, <complete> - with requestMore(2) we get the 1 and 2 but not the <complete>
1012+
subscriber.assertReceivedOnNext(asList(1, 2));
1013+
subscriber.assertTerminalEvent();
1014+
}
1015+
1016+
@Test
1017+
public void shouldCompleteAfterApplyingBackpressure_FastPath() {
1018+
Observable<Integer> source = Observable.mergeDelayError(Observable.just(Observable.just(1)));
1019+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1020+
subscriber.requestMore(0);
1021+
source.subscribe(subscriber);
1022+
subscriber.requestMore(2); // 1, <complete> - should work as per .._NormalPath above
1023+
subscriber.assertReceivedOnNext(asList(1));
1024+
subscriber.assertTerminalEvent();
1025+
}
1026+
1027+
@Test
1028+
public void shouldNotCompleteIfThereArePendingScalarSynchronousEmissionsWhenTheLastInnerSubscriberCompletes() {
1029+
TestScheduler scheduler = Schedulers.test();
1030+
Observable<Long> source = Observable.mergeDelayError(Observable.just(1L), Observable.timer(1, TimeUnit.SECONDS, scheduler).skip(1));
1031+
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
1032+
subscriber.requestMore(0);
1033+
source.subscribe(subscriber);
1034+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
1035+
subscriber.assertReceivedOnNext(Collections.<Long>emptyList());
1036+
assertEquals(Collections.<Notification<Long>>emptyList(), subscriber.getOnCompletedEvents());
1037+
subscriber.requestMore(1);
1038+
subscriber.assertReceivedOnNext(asList(1L));
1039+
assertEquals(Collections.<Notification<Long>>emptyList(), subscriber.getOnCompletedEvents());
1040+
subscriber.requestMore(1);
1041+
subscriber.assertTerminalEvent();
1042+
}
1043+
1044+
@Test
1045+
public void delayedErrorsShouldBeEmittedWhenCompleteAfterApplyingBackpressure_NormalPath() {
1046+
Throwable exception = new Throwable();
1047+
Observable<Integer> source = Observable.mergeDelayError(Observable.range(1, 2), Observable.<Integer>error(exception));
1048+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1049+
subscriber.requestMore(0);
1050+
source.subscribe(subscriber);
1051+
subscriber.requestMore(3); // 1, 2, <error>
1052+
subscriber.assertReceivedOnNext(asList(1, 2));
1053+
subscriber.assertTerminalEvent();
1054+
assertEquals(asList(exception), subscriber.getOnErrorEvents());
1055+
}
1056+
1057+
@Test
1058+
public void delayedErrorsShouldBeEmittedWhenCompleteAfterApplyingBackpressure_FastPath() {
1059+
Throwable exception = new Throwable();
1060+
Observable<Integer> source = Observable.mergeDelayError(Observable.just(1), Observable.<Integer>error(exception));
1061+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1062+
subscriber.requestMore(0);
1063+
source.subscribe(subscriber);
1064+
subscriber.requestMore(2); // 1, <error>
1065+
subscriber.assertReceivedOnNext(asList(1));
1066+
subscriber.assertTerminalEvent();
1067+
assertEquals(asList(exception), subscriber.getOnErrorEvents());
1068+
}
1069+
1070+
@Test
1071+
public void shouldNotCompleteWhileThereAreStillScalarSynchronousEmissionsInTheQueue() {
1072+
Observable<Integer> source = Observable.merge(Observable.just(1), Observable.just(2));
1073+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1074+
subscriber.requestMore(1);
1075+
source.subscribe(subscriber);
1076+
subscriber.assertReceivedOnNext(asList(1));
1077+
subscriber.requestMore(1);
1078+
subscriber.assertReceivedOnNext(asList(1, 2));
1079+
}
1080+
1081+
@Test
1082+
public void shouldNotReceivedDelayedErrorWhileThereAreStillScalarSynchronousEmissionsInTheQueue() {
1083+
Throwable exception = new Throwable();
1084+
Observable<Integer> source = Observable.mergeDelayError(Observable.just(1), Observable.just(2), Observable.<Integer>error(exception));
1085+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1086+
subscriber.requestMore(1);
1087+
source.subscribe(subscriber);
1088+
subscriber.assertReceivedOnNext(asList(1));
1089+
assertEquals(Collections.<Throwable>emptyList(), subscriber.getOnErrorEvents());
1090+
subscriber.requestMore(1);
1091+
subscriber.assertReceivedOnNext(asList(1, 2));
1092+
assertEquals(asList(exception), subscriber.getOnErrorEvents());
1093+
}
1094+
1095+
@Test
1096+
public void shouldNotReceivedDelayedErrorWhileThereAreStillNormalEmissionsInTheQueue() {
1097+
Throwable exception = new Throwable();
1098+
Observable<Integer> source = Observable.mergeDelayError(Observable.range(1, 2), Observable.range(3, 2), Observable.<Integer>error(exception));
1099+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
1100+
subscriber.requestMore(3);
1101+
source.subscribe(subscriber);
1102+
subscriber.assertReceivedOnNext(asList(1, 2, 3));
1103+
assertEquals(Collections.<Throwable>emptyList(), subscriber.getOnErrorEvents());
1104+
subscriber.requestMore(2);
1105+
subscriber.assertReceivedOnNext(asList(1, 2, 3, 4));
1106+
assertEquals(asList(exception), subscriber.getOnErrorEvents());
1107+
}
10081108
}

0 commit comments

Comments
 (0)