Skip to content

Commit 0fe6e55

Browse files
authored
2.x: Fix concatMapSingle & concatMapMaybe dispose-cleanup crash (#5928)
1 parent 63572c7 commit 0fe6e55

8 files changed

+108
-4
lines changed

src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void cancel() {
170170
cancelled = true;
171171
upstream.cancel();
172172
inner.dispose();
173-
if (getAndIncrement() != 0) {
173+
if (getAndIncrement() == 0) {
174174
queue.clear();
175175
item = null;
176176
}

src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void cancel() {
170170
cancelled = true;
171171
upstream.cancel();
172172
inner.dispose();
173-
if (getAndIncrement() != 0) {
173+
if (getAndIncrement() == 0) {
174174
queue.clear();
175175
item = null;
176176
}

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void dispose() {
148148
cancelled = true;
149149
upstream.dispose();
150150
inner.dispose();
151-
if (getAndIncrement() != 0) {
151+
if (getAndIncrement() == 0) {
152152
queue.clear();
153153
item = null;
154154
}

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void dispose() {
148148
cancelled = true;
149149
upstream.dispose();
150150
inner.dispose();
151-
if (getAndIncrement() != 0) {
151+
if (getAndIncrement() == 0) {
152152
queue.clear();
153153
item = null;
154154
}

src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import io.reactivex.exceptions.*;
2828
import io.reactivex.functions.*;
2929
import io.reactivex.internal.functions.Functions;
30+
import io.reactivex.internal.operators.mixed.FlowableConcatMapMaybe.ConcatMapMaybeSubscriber;
3031
import io.reactivex.internal.subscriptions.BooleanSubscription;
32+
import io.reactivex.internal.util.ErrorMode;
3133
import io.reactivex.plugins.RxJavaPlugins;
3234
import io.reactivex.processors.PublishProcessor;
3335
import io.reactivex.schedulers.Schedulers;
@@ -368,4 +370,28 @@ public MaybeSource<? extends Object> apply(Integer v)
368370

369371
assertFalse(pp.hasSubscribers());
370372
}
373+
374+
@Test(timeout = 10000)
375+
public void cancelNoConcurrentClean() {
376+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
377+
ConcatMapMaybeSubscriber<Integer, Integer> operator =
378+
new ConcatMapMaybeSubscriber<Integer, Integer>(
379+
ts, Functions.justFunction(Maybe.<Integer>never()), 16, ErrorMode.IMMEDIATE);
380+
381+
operator.onSubscribe(new BooleanSubscription());
382+
383+
operator.queue.offer(1);
384+
385+
operator.getAndIncrement();
386+
387+
ts.cancel();
388+
389+
assertFalse(operator.queue.isEmpty());
390+
391+
operator.addAndGet(-2);
392+
393+
operator.cancel();
394+
395+
assertTrue(operator.queue.isEmpty());
396+
}
371397
}

src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import io.reactivex.exceptions.*;
2727
import io.reactivex.functions.*;
2828
import io.reactivex.internal.functions.Functions;
29+
import io.reactivex.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber;
2930
import io.reactivex.internal.subscriptions.BooleanSubscription;
31+
import io.reactivex.internal.util.ErrorMode;
3032
import io.reactivex.plugins.RxJavaPlugins;
3133
import io.reactivex.processors.PublishProcessor;
3234
import io.reactivex.subjects.SingleSubject;
@@ -283,4 +285,28 @@ public SingleSource<? extends Object> apply(Integer v)
283285

284286
assertFalse(pp.hasSubscribers());
285287
}
288+
289+
@Test(timeout = 10000)
290+
public void cancelNoConcurrentClean() {
291+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
292+
ConcatMapSingleSubscriber<Integer, Integer> operator =
293+
new ConcatMapSingleSubscriber<Integer, Integer>(
294+
ts, Functions.justFunction(Single.<Integer>never()), 16, ErrorMode.IMMEDIATE);
295+
296+
operator.onSubscribe(new BooleanSubscription());
297+
298+
operator.queue.offer(1);
299+
300+
operator.getAndIncrement();
301+
302+
ts.cancel();
303+
304+
assertFalse(operator.queue.isEmpty());
305+
306+
operator.addAndGet(-2);
307+
308+
operator.cancel();
309+
310+
assertTrue(operator.queue.isEmpty());
311+
}
286312
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import io.reactivex.exceptions.*;
2727
import io.reactivex.functions.*;
2828
import io.reactivex.internal.functions.Functions;
29+
import io.reactivex.internal.operators.mixed.ObservableConcatMapMaybe.ConcatMapMaybeMainObserver;
30+
import io.reactivex.internal.util.ErrorMode;
2931
import io.reactivex.observers.TestObserver;
3032
import io.reactivex.plugins.RxJavaPlugins;
3133
import io.reactivex.schedulers.Schedulers;
@@ -373,4 +375,28 @@ public void scalarEmptySource() {
373375

374376
assertFalse(ms.hasObservers());
375377
}
378+
379+
@Test(timeout = 10000)
380+
public void cancelNoConcurrentClean() {
381+
TestObserver<Integer> to = new TestObserver<Integer>();
382+
ConcatMapMaybeMainObserver<Integer, Integer> operator =
383+
new ConcatMapMaybeMainObserver<Integer, Integer>(
384+
to, Functions.justFunction(Maybe.<Integer>never()), 16, ErrorMode.IMMEDIATE);
385+
386+
operator.onSubscribe(Disposables.empty());
387+
388+
operator.queue.offer(1);
389+
390+
operator.getAndIncrement();
391+
392+
to.dispose();
393+
394+
assertFalse(operator.queue.isEmpty());
395+
396+
operator.addAndGet(-2);
397+
398+
operator.dispose();
399+
400+
assertTrue(operator.queue.isEmpty());
401+
}
376402
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.reactivex.exceptions.*;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.internal.functions.Functions;
28+
import io.reactivex.internal.operators.mixed.ObservableConcatMapSingle.ConcatMapSingleMainObserver;
29+
import io.reactivex.internal.util.ErrorMode;
2830
import io.reactivex.observers.TestObserver;
2931
import io.reactivex.plugins.RxJavaPlugins;
3032
import io.reactivex.subjects.*;
@@ -310,4 +312,28 @@ public void scalarEmptySource() {
310312

311313
assertFalse(ss.hasObservers());
312314
}
315+
316+
@Test(timeout = 10000)
317+
public void cancelNoConcurrentClean() {
318+
TestObserver<Integer> to = new TestObserver<Integer>();
319+
ConcatMapSingleMainObserver<Integer, Integer> operator =
320+
new ConcatMapSingleMainObserver<Integer, Integer>(
321+
to, Functions.justFunction(Single.<Integer>never()), 16, ErrorMode.IMMEDIATE);
322+
323+
operator.onSubscribe(Disposables.empty());
324+
325+
operator.queue.offer(1);
326+
327+
operator.getAndIncrement();
328+
329+
to.cancel();
330+
331+
assertFalse(operator.queue.isEmpty());
332+
333+
operator.addAndGet(-2);
334+
335+
operator.dispose();
336+
337+
assertTrue(operator.queue.isEmpty());
338+
}
313339
}

0 commit comments

Comments
 (0)