Skip to content

Commit e784201

Browse files
authored
2.x: Fix mergeWith not canceling other when the main fails (#6599)
* 2.x: Fix mergeWith not canceling other when the main fails * Switch to OpenJDK compilation as OracleJDK is not available * Add more time to refCount testing * More time again * Looks like 250ms is still not enough, let's loop
1 parent 5550b63 commit e784201

15 files changed

+249
-25
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
language: java
22
jdk:
3-
- oraclejdk8
3+
- openjdk8
44

55
# force upgrade Java8 as per https://github.com/travis-ci/travis-ci/issues/4042 (fixes compilation issue)
66
#addons:

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void onNext(T t) {
8686

8787
@Override
8888
public void onError(Throwable ex) {
89-
SubscriptionHelper.cancel(mainSubscription);
89+
DisposableHelper.dispose(otherObserver);
9090
HalfSerializer.onError(downstream, ex, this, error);
9191
}
9292

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void onNext(T t) {
143143
@Override
144144
public void onError(Throwable ex) {
145145
if (error.addThrowable(ex)) {
146-
SubscriptionHelper.cancel(mainSubscription);
146+
DisposableHelper.dispose(otherObserver);
147147
drain();
148148
} else {
149149
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void onNext(T t) {
143143
@Override
144144
public void onError(Throwable ex) {
145145
if (error.addThrowable(ex)) {
146-
SubscriptionHelper.cancel(mainSubscription);
146+
DisposableHelper.dispose(otherObserver);
147147
drain();
148148
} else {
149149
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void onNext(T t) {
8080

8181
@Override
8282
public void onError(Throwable ex) {
83-
DisposableHelper.dispose(mainDisposable);
83+
DisposableHelper.dispose(otherObserver);
8484
HalfSerializer.onError(downstream, ex, this, error);
8585
}
8686

src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void onNext(T t) {
106106
@Override
107107
public void onError(Throwable ex) {
108108
if (error.addThrowable(ex)) {
109-
DisposableHelper.dispose(mainDisposable);
109+
DisposableHelper.dispose(otherObserver);
110110
drain();
111111
} else {
112112
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void onNext(T t) {
106106
@Override
107107
public void onError(Throwable ex) {
108108
if (error.addThrowable(ex)) {
109-
DisposableHelper.dispose(mainDisposable);
109+
DisposableHelper.dispose(otherObserver);
110110
drain();
111111
} else {
112112
RxJavaPlugins.onError(ex);

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,40 @@ public void run() {
136136
ts.assertResult(1);
137137
}
138138
}
139+
140+
@Test
141+
public void cancelOtherOnMainError() {
142+
PublishProcessor<Integer> pp = PublishProcessor.create();
143+
CompletableSubject cs = CompletableSubject.create();
144+
145+
TestSubscriber<Integer> ts = pp.mergeWith(cs).test();
146+
147+
assertTrue(pp.hasSubscribers());
148+
assertTrue(cs.hasObservers());
149+
150+
pp.onError(new TestException());
151+
152+
ts.assertFailure(TestException.class);
153+
154+
assertFalse("main has observers!", pp.hasSubscribers());
155+
assertFalse("other has observers", cs.hasObservers());
156+
}
157+
158+
@Test
159+
public void cancelMainOnOtherError() {
160+
PublishProcessor<Integer> pp = PublishProcessor.create();
161+
CompletableSubject cs = CompletableSubject.create();
162+
163+
TestSubscriber<Integer> ts = pp.mergeWith(cs).test();
164+
165+
assertTrue(pp.hasSubscribers());
166+
assertTrue(cs.hasObservers());
167+
168+
cs.onError(new TestException());
169+
170+
ts.assertFailure(TestException.class);
171+
172+
assertFalse("main has observers!", pp.hasSubscribers());
173+
assertFalse("other has observers", cs.hasObservers());
174+
}
139175
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,4 +401,40 @@ public void onNext(Integer t) {
401401
ts.assertValueCount(Flowable.bufferSize());
402402
ts.assertComplete();
403403
}
404+
405+
@Test
406+
public void cancelOtherOnMainError() {
407+
PublishProcessor<Integer> pp = PublishProcessor.create();
408+
MaybeSubject<Integer> ms = MaybeSubject.create();
409+
410+
TestSubscriber<Integer> ts = pp.mergeWith(ms).test();
411+
412+
assertTrue(pp.hasSubscribers());
413+
assertTrue(ms.hasObservers());
414+
415+
pp.onError(new TestException());
416+
417+
ts.assertFailure(TestException.class);
418+
419+
assertFalse("main has observers!", pp.hasSubscribers());
420+
assertFalse("other has observers", ms.hasObservers());
421+
}
422+
423+
@Test
424+
public void cancelMainOnOtherError() {
425+
PublishProcessor<Integer> pp = PublishProcessor.create();
426+
MaybeSubject<Integer> ms = MaybeSubject.create();
427+
428+
TestSubscriber<Integer> ts = pp.mergeWith(ms).test();
429+
430+
assertTrue(pp.hasSubscribers());
431+
assertTrue(ms.hasObservers());
432+
433+
ms.onError(new TestException());
434+
435+
ts.assertFailure(TestException.class);
436+
437+
assertFalse("main has observers!", pp.hasSubscribers());
438+
assertFalse("other has observers", ms.hasObservers());
439+
}
404440
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,4 +397,40 @@ public void onNext(Integer t) {
397397
ts.assertValueCount(Flowable.bufferSize());
398398
ts.assertComplete();
399399
}
400+
401+
@Test
402+
public void cancelOtherOnMainError() {
403+
PublishProcessor<Integer> pp = PublishProcessor.create();
404+
SingleSubject<Integer> ss = SingleSubject.create();
405+
406+
TestSubscriber<Integer> ts = pp.mergeWith(ss).test();
407+
408+
assertTrue(pp.hasSubscribers());
409+
assertTrue(ss.hasObservers());
410+
411+
pp.onError(new TestException());
412+
413+
ts.assertFailure(TestException.class);
414+
415+
assertFalse("main has observers!", pp.hasSubscribers());
416+
assertFalse("other has observers", ss.hasObservers());
417+
}
418+
419+
@Test
420+
public void cancelMainOnOtherError() {
421+
PublishProcessor<Integer> pp = PublishProcessor.create();
422+
SingleSubject<Integer> ss = SingleSubject.create();
423+
424+
TestSubscriber<Integer> ts = pp.mergeWith(ss).test();
425+
426+
assertTrue(pp.hasSubscribers());
427+
assertTrue(ss.hasObservers());
428+
429+
ss.onError(new TestException());
430+
431+
ts.assertFailure(TestException.class);
432+
433+
assertFalse("main has observers!", pp.hasSubscribers());
434+
assertFalse("other has observers", ss.hasObservers());
435+
}
400436
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,40 @@ protected void subscribeActual(Observer<? super Integer> observer) {
135135
.test()
136136
.assertResult(1);
137137
}
138+
139+
@Test
140+
public void cancelOtherOnMainError() {
141+
PublishSubject<Integer> ps = PublishSubject.create();
142+
CompletableSubject cs = CompletableSubject.create();
143+
144+
TestObserver<Integer> to = ps.mergeWith(cs).test();
145+
146+
assertTrue(ps.hasObservers());
147+
assertTrue(cs.hasObservers());
148+
149+
ps.onError(new TestException());
150+
151+
to.assertFailure(TestException.class);
152+
153+
assertFalse("main has observers!", ps.hasObservers());
154+
assertFalse("other has observers", cs.hasObservers());
155+
}
156+
157+
@Test
158+
public void cancelMainOnOtherError() {
159+
PublishSubject<Integer> ps = PublishSubject.create();
160+
CompletableSubject cs = CompletableSubject.create();
161+
162+
TestObserver<Integer> to = ps.mergeWith(cs).test();
163+
164+
assertTrue(ps.hasObservers());
165+
assertTrue(cs.hasObservers());
166+
167+
cs.onError(new TestException());
168+
169+
to.assertFailure(TestException.class);
170+
171+
assertFalse("main has observers!", ps.hasObservers());
172+
assertFalse("other has observers", cs.hasObservers());
173+
}
138174
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,4 +272,39 @@ public void onNext(Integer t) {
272272
to.assertResult(0, 1, 2, 3, 4);
273273
}
274274

275+
@Test
276+
public void cancelOtherOnMainError() {
277+
PublishSubject<Integer> ps = PublishSubject.create();
278+
MaybeSubject<Integer> ms = MaybeSubject.create();
279+
280+
TestObserver<Integer> to = ps.mergeWith(ms).test();
281+
282+
assertTrue(ps.hasObservers());
283+
assertTrue(ms.hasObservers());
284+
285+
ps.onError(new TestException());
286+
287+
to.assertFailure(TestException.class);
288+
289+
assertFalse("main has observers!", ps.hasObservers());
290+
assertFalse("other has observers", ms.hasObservers());
291+
}
292+
293+
@Test
294+
public void cancelMainOnOtherError() {
295+
PublishSubject<Integer> ps = PublishSubject.create();
296+
MaybeSubject<Integer> ms = MaybeSubject.create();
297+
298+
TestObserver<Integer> to = ps.mergeWith(ms).test();
299+
300+
assertTrue(ps.hasObservers());
301+
assertTrue(ms.hasObservers());
302+
303+
ms.onError(new TestException());
304+
305+
to.assertFailure(TestException.class);
306+
307+
assertFalse("main has observers!", ps.hasObservers());
308+
assertFalse("other has observers", ms.hasObservers());
309+
}
275310
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,40 @@ public void onNext(Integer t) {
263263

264264
to.assertResult(0, 1, 2, 3, 4);
265265
}
266+
267+
@Test
268+
public void cancelOtherOnMainError() {
269+
PublishSubject<Integer> ps = PublishSubject.create();
270+
SingleSubject<Integer> ss = SingleSubject.create();
271+
272+
TestObserver<Integer> to = ps.mergeWith(ss).test();
273+
274+
assertTrue(ps.hasObservers());
275+
assertTrue(ss.hasObservers());
276+
277+
ps.onError(new TestException());
278+
279+
to.assertFailure(TestException.class);
280+
281+
assertFalse("main has observers!", ps.hasObservers());
282+
assertFalse("other has observers", ss.hasObservers());
283+
}
284+
285+
@Test
286+
public void cancelMainOnOtherError() {
287+
PublishSubject<Integer> ps = PublishSubject.create();
288+
SingleSubject<Integer> ss = SingleSubject.create();
289+
290+
TestObserver<Integer> to = ps.mergeWith(ss).test();
291+
292+
assertTrue(ps.hasObservers());
293+
assertTrue(ss.hasObservers());
294+
295+
ss.onError(new TestException());
296+
297+
to.assertFailure(TestException.class);
298+
299+
assertFalse("main has observers!", ps.hasObservers());
300+
assertFalse("other has observers", ss.hasObservers());
301+
}
266302
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ protected void subscribeActual(Observer<? super Integer> observer) {
630630
@Test
631631
public void replayNoLeak() throws Exception {
632632
System.gc();
633-
Thread.sleep(100);
633+
Thread.sleep(250);
634634

635635
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
636636

@@ -646,7 +646,7 @@ public Object call() throws Exception {
646646
source.subscribe();
647647

648648
System.gc();
649-
Thread.sleep(100);
649+
Thread.sleep(250);
650650

651651
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
652652

@@ -657,7 +657,7 @@ public Object call() throws Exception {
657657
@Test
658658
public void replayNoLeak2() throws Exception {
659659
System.gc();
660-
Thread.sleep(100);
660+
Thread.sleep(250);
661661

662662
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
663663

@@ -680,7 +680,7 @@ public Object call() throws Exception {
680680
d2 = null;
681681

682682
System.gc();
683-
Thread.sleep(100);
683+
Thread.sleep(250);
684684

685685
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
686686

@@ -701,7 +701,7 @@ static final class ExceptionData extends Exception {
701701
@Test
702702
public void publishNoLeak() throws Exception {
703703
System.gc();
704-
Thread.sleep(100);
704+
Thread.sleep(250);
705705

706706
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
707707

@@ -716,10 +716,19 @@ public Object call() throws Exception {
716716

717717
source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer());
718718

719-
System.gc();
720-
Thread.sleep(100);
719+
long after = 0L;
721720

722-
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
721+
for (int i = 0; i < 10; i++) {
722+
System.gc();
723+
724+
after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
725+
726+
if (start + 20 * 1000 * 1000 > after) {
727+
break;
728+
}
729+
730+
Thread.sleep(100);
731+
}
723732

724733
source = null;
725734
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
@@ -728,7 +737,7 @@ public Object call() throws Exception {
728737
@Test
729738
public void publishNoLeak2() throws Exception {
730739
System.gc();
731-
Thread.sleep(100);
740+
Thread.sleep(250);
732741

733742
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
734743

@@ -751,7 +760,7 @@ public Object call() throws Exception {
751760
d2 = null;
752761

753762
System.gc();
754-
Thread.sleep(100);
763+
Thread.sleep(250);
755764

756765
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
757766

0 commit comments

Comments
 (0)