Skip to content

Commit 8bbe51d

Browse files
authored
2.x: Fix concatMap{Single|Maybe} null emission on dispose race (#6060)
1 parent 869c2aa commit 8bbe51d

8 files changed

+126
-0
lines changed

src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ void drain() {
218218
if (cancelled) {
219219
queue.clear();
220220
item = null;
221+
break;
221222
}
222223

223224
int s = state;

src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ void drain() {
213213
if (cancelled) {
214214
queue.clear();
215215
item = null;
216+
break;
216217
}
217218

218219
int s = state;

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ void drain() {
199199
if (cancelled) {
200200
queue.clear();
201201
item = null;
202+
break;
202203
}
203204

204205
int s = state;

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ void drain() {
194194
if (cancelled) {
195195
queue.clear();
196196
item = null;
197+
break;
197198
}
198199

199200
int s = state;

src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,35 @@ public void cancelNoConcurrentClean() {
394394

395395
assertTrue(operator.queue.isEmpty());
396396
}
397+
398+
@Test
399+
public void innerSuccessDisposeRace() {
400+
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
401+
402+
final MaybeSubject<Integer> ms = MaybeSubject.create();
403+
404+
final TestSubscriber<Integer> ts = Flowable.just(1)
405+
.hide()
406+
.concatMapMaybe(Functions.justFunction(ms))
407+
.test();
408+
409+
Runnable r1 = new Runnable() {
410+
@Override
411+
public void run() {
412+
ms.onSuccess(1);
413+
}
414+
};
415+
Runnable r2 = new Runnable() {
416+
@Override
417+
public void run() {
418+
ts.dispose();
419+
}
420+
};
421+
422+
TestHelper.race(r1, r2);
423+
424+
ts.assertNoErrors();
425+
}
426+
}
427+
397428
}

src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,4 +309,34 @@ public void cancelNoConcurrentClean() {
309309

310310
assertTrue(operator.queue.isEmpty());
311311
}
312+
313+
@Test
314+
public void innerSuccessDisposeRace() {
315+
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
316+
317+
final SingleSubject<Integer> ss = SingleSubject.create();
318+
319+
final TestSubscriber<Integer> ts = Flowable.just(1)
320+
.hide()
321+
.concatMapSingle(Functions.justFunction(ss))
322+
.test();
323+
324+
Runnable r1 = new Runnable() {
325+
@Override
326+
public void run() {
327+
ss.onSuccess(1);
328+
}
329+
};
330+
Runnable r2 = new Runnable() {
331+
@Override
332+
public void run() {
333+
ts.dispose();
334+
}
335+
};
336+
337+
TestHelper.race(r1, r2);
338+
339+
ts.assertNoErrors();
340+
}
341+
}
312342
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,4 +416,34 @@ public void checkUnboundedInnerQueue() {
416416

417417
to.assertResult(1, 2, 3, 4);
418418
}
419+
420+
@Test
421+
public void innerSuccessDisposeRace() {
422+
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
423+
424+
final MaybeSubject<Integer> ms = MaybeSubject.create();
425+
426+
final TestObserver<Integer> to = Observable.just(1)
427+
.hide()
428+
.concatMapMaybe(Functions.justFunction(ms))
429+
.test();
430+
431+
Runnable r1 = new Runnable() {
432+
@Override
433+
public void run() {
434+
ms.onSuccess(1);
435+
}
436+
};
437+
Runnable r2 = new Runnable() {
438+
@Override
439+
public void run() {
440+
to.dispose();
441+
}
442+
};
443+
444+
TestHelper.race(r1, r2);
445+
446+
to.assertNoErrors();
447+
}
448+
}
419449
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,4 +353,35 @@ public void checkUnboundedInnerQueue() {
353353

354354
to.assertResult(1, 2, 3, 4);
355355
}
356+
357+
@Test
358+
public void innerSuccessDisposeRace() {
359+
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
360+
361+
final SingleSubject<Integer> ss = SingleSubject.create();
362+
363+
final TestObserver<Integer> to = Observable.just(1)
364+
.hide()
365+
.concatMapSingle(Functions.justFunction(ss))
366+
.test();
367+
368+
Runnable r1 = new Runnable() {
369+
@Override
370+
public void run() {
371+
ss.onSuccess(1);
372+
}
373+
};
374+
Runnable r2 = new Runnable() {
375+
@Override
376+
public void run() {
377+
to.dispose();
378+
}
379+
};
380+
381+
TestHelper.race(r1, r2);
382+
383+
to.assertNoErrors();
384+
}
385+
}
386+
356387
}

0 commit comments

Comments
 (0)