Skip to content

Commit 70fe91c

Browse files
authored
2.x: Fix concurrent clear() calls when fused chains are canceled (#6677)
1 parent d4eae73 commit 70fe91c

File tree

8 files changed

+208
-16
lines changed

8 files changed

+208
-16
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public void cancel(K key) {
262262
if (groupCount.decrementAndGet() == 0) {
263263
upstream.cancel();
264264

265-
if (getAndIncrement() == 0) {
265+
if (!outputFused && getAndIncrement() == 0) {
266266
queue.clear();
267267
}
268268
}
@@ -288,7 +288,6 @@ void drainFused() {
288288

289289
for (;;) {
290290
if (cancelled.get()) {
291-
q.clear();
292291
return;
293292
}
294293

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void cancel() {
150150
cancelled = true;
151151
upstream.cancel();
152152

153-
if (getAndIncrement() == 0) {
153+
if (!outputFused && getAndIncrement() == 0) {
154154
queue.clear();
155155
}
156156
}

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,6 @@ void drainFused(Subscriber<? super T> a) {
347347
for (;;) {
348348

349349
if (cancelled) {
350-
q.clear();
351350
downstream.lazySet(null);
352351
return;
353352
}
@@ -550,10 +549,11 @@ public void cancel() {
550549

551550
doTerminate();
552551

553-
if (!enableOperatorFusion) {
554-
if (wip.getAndIncrement() == 0) {
552+
downstream.lazySet(null);
553+
if (wip.getAndIncrement() == 0) {
554+
downstream.lazySet(null);
555+
if (!enableOperatorFusion) {
555556
queue.clear();
556-
downstream.lazySet(null);
557557
}
558558
}
559559
}

src/main/java/io/reactivex/subjects/UnicastSubject.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,6 @@ void drainFused(Observer<? super T> a) {
420420

421421
if (disposed) {
422422
downstream.lazySet(null);
423-
q.clear();
424423
return;
425424
}
426425
boolean d = done;
@@ -558,7 +557,9 @@ public void dispose() {
558557
downstream.lazySet(null);
559558
if (wip.getAndIncrement() == 0) {
560559
downstream.lazySet(null);
561-
queue.clear();
560+
if (!enableOperatorFusion) {
561+
queue.clear();
562+
}
562563
}
563564
}
564565
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.io.IOException;
@@ -29,12 +30,13 @@
2930
import com.google.common.cache.*;
3031

3132
import io.reactivex.*;
32-
import io.reactivex.exceptions.TestException;
33+
import io.reactivex.exceptions.*;
3334
import io.reactivex.flowables.GroupedFlowable;
3435
import io.reactivex.functions.*;
3536
import io.reactivex.internal.functions.Functions;
36-
import io.reactivex.internal.fuseable.QueueFuseable;
37+
import io.reactivex.internal.fuseable.*;
3738
import io.reactivex.internal.subscriptions.BooleanSubscription;
39+
import io.reactivex.plugins.RxJavaPlugins;
3840
import io.reactivex.processors.PublishProcessor;
3941
import io.reactivex.schedulers.Schedulers;
4042
import io.reactivex.subjects.PublishSubject;
@@ -2205,4 +2207,83 @@ public void accept(Object object) {
22052207
}};
22062208
return evictingMapFactory;
22072209
}
2210+
2211+
@Test
2212+
public void fusedNoConcurrentCleanDueToCancel() {
2213+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
2214+
List<Throwable> errors = TestHelper.trackPluginErrors();
2215+
try {
2216+
final PublishProcessor<Integer> pp = PublishProcessor.create();
2217+
2218+
final AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>>();
2219+
2220+
final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
2221+
2222+
pp.groupBy(Functions.identity(), Functions.<Integer>identity(), false, 4)
2223+
.subscribe(new FlowableSubscriber<GroupedFlowable<Object, Integer>>() {
2224+
2225+
boolean once;
2226+
2227+
@Override
2228+
public void onNext(GroupedFlowable<Object, Integer> g) {
2229+
if (!once) {
2230+
try {
2231+
GroupedFlowable<Object, Integer> t = qs.get().poll();
2232+
if (t != null) {
2233+
once = true;
2234+
t.subscribe(ts2);
2235+
}
2236+
} catch (Throwable ignored) {
2237+
// not relevant here
2238+
}
2239+
}
2240+
}
2241+
2242+
@Override
2243+
public void onError(Throwable t) {
2244+
}
2245+
2246+
@Override
2247+
public void onComplete() {
2248+
}
2249+
2250+
@Override
2251+
public void onSubscribe(Subscription s) {
2252+
@SuppressWarnings("unchecked")
2253+
QueueSubscription<GroupedFlowable<Object, Integer>> q = (QueueSubscription<GroupedFlowable<Object, Integer>>)s;
2254+
qs.set(q);
2255+
q.requestFusion(QueueFuseable.ANY);
2256+
q.request(1);
2257+
}
2258+
})
2259+
;
2260+
2261+
Runnable r1 = new Runnable() {
2262+
@Override
2263+
public void run() {
2264+
qs.get().cancel();
2265+
qs.get().clear();
2266+
}
2267+
};
2268+
Runnable r2 = new Runnable() {
2269+
@Override
2270+
public void run() {
2271+
ts2.cancel();
2272+
}
2273+
};
2274+
2275+
for (int i = 0; i < 100; i++) {
2276+
pp.onNext(i);
2277+
}
2278+
2279+
TestHelper.race(r1, r2);
2280+
2281+
if (!errors.isEmpty()) {
2282+
throw new CompositeException(errors);
2283+
}
2284+
} finally {
2285+
RxJavaPlugins.reset();
2286+
}
2287+
}
2288+
}
22082289
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515

1616
import static org.junit.Assert.*;
1717

18+
import java.util.List;
1819
import java.util.concurrent.*;
1920
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import org.junit.Test;
2223
import org.reactivestreams.*;
2324

24-
import io.reactivex.Flowable;
25+
import io.reactivex.*;
2526
import io.reactivex.exceptions.*;
2627
import io.reactivex.functions.*;
28+
import io.reactivex.internal.functions.Functions;
2729
import io.reactivex.internal.fuseable.QueueFuseable;
2830
import io.reactivex.internal.subscriptions.BooleanSubscription;
31+
import io.reactivex.observers.TestObserver;
32+
import io.reactivex.plugins.RxJavaPlugins;
33+
import io.reactivex.processors.PublishProcessor;
2934
import io.reactivex.schedulers.Schedulers;
3035
import io.reactivex.subscribers.*;
3136

@@ -307,4 +312,37 @@ public void fusionRejected() {
307312
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
308313
.assertEmpty();
309314
}
315+
316+
@Test
317+
public void fusedNoConcurrentCleanDueToCancel() {
318+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
319+
List<Throwable> errors = TestHelper.trackPluginErrors();
320+
try {
321+
final PublishProcessor<Integer> pp = PublishProcessor.create();
322+
323+
TestObserver<Integer> to = pp.onBackpressureBuffer(4, false, true)
324+
.observeOn(Schedulers.io())
325+
.map(Functions.<Integer>identity())
326+
.observeOn(Schedulers.single())
327+
.firstOrError()
328+
.test();
329+
330+
for (int i = 0; pp.hasSubscribers(); i++) {
331+
pp.onNext(i);
332+
}
333+
334+
to
335+
.awaitDone(5, TimeUnit.SECONDS)
336+
;
337+
338+
if (!errors.isEmpty()) {
339+
throw new CompositeException(errors);
340+
}
341+
342+
to.assertResult(0);
343+
} finally {
344+
RxJavaPlugins.reset();
345+
}
346+
}
347+
}
310348
}

src/test/java/io/reactivex/processors/UnicastProcessorTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.List;
19+
import java.util.concurrent.TimeUnit;
1920
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import org.junit.Test;
2223

2324
import io.reactivex.*;
2425
import io.reactivex.disposables.Disposable;
25-
import io.reactivex.exceptions.TestException;
26-
import io.reactivex.internal.fuseable.*;
26+
import io.reactivex.exceptions.*;
27+
import io.reactivex.internal.functions.Functions;
28+
import io.reactivex.internal.fuseable.QueueFuseable;
2729
import io.reactivex.internal.subscriptions.BooleanSubscription;
30+
import io.reactivex.observers.TestObserver;
2831
import io.reactivex.plugins.RxJavaPlugins;
32+
import io.reactivex.schedulers.Schedulers;
2933
import io.reactivex.subscribers.*;
3034

3135
public class UnicastProcessorTest extends FlowableProcessorTest<Object> {
@@ -438,4 +442,37 @@ public void unicastSubscriptionBadRequest() {
438442
RxJavaPlugins.reset();
439443
}
440444
}
445+
446+
@Test
447+
public void fusedNoConcurrentCleanDueToCancel() {
448+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
449+
List<Throwable> errors = TestHelper.trackPluginErrors();
450+
try {
451+
final UnicastProcessor<Integer> us = UnicastProcessor.create();
452+
453+
TestObserver<Integer> to = us
454+
.observeOn(Schedulers.io())
455+
.map(Functions.<Integer>identity())
456+
.observeOn(Schedulers.single())
457+
.firstOrError()
458+
.test();
459+
460+
for (int i = 0; us.hasSubscribers(); i++) {
461+
us.onNext(i);
462+
}
463+
464+
to
465+
.awaitDone(5, TimeUnit.SECONDS)
466+
;
467+
468+
if (!errors.isEmpty()) {
469+
throw new CompositeException(errors);
470+
}
471+
472+
to.assertResult(0);
473+
} finally {
474+
RxJavaPlugins.reset();
475+
}
476+
}
477+
}
441478
}

src/test/java/io/reactivex/subjects/UnicastSubjectTest.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
import static org.mockito.Mockito.mock;
1818

1919
import java.util.List;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import org.junit.Test;
2324

2425
import io.reactivex.*;
2526
import io.reactivex.disposables.*;
26-
import io.reactivex.exceptions.TestException;
27-
import io.reactivex.internal.fuseable.*;
27+
import io.reactivex.exceptions.*;
28+
import io.reactivex.internal.functions.Functions;
29+
import io.reactivex.internal.fuseable.QueueFuseable;
2830
import io.reactivex.observers.*;
2931
import io.reactivex.plugins.RxJavaPlugins;
32+
import io.reactivex.schedulers.Schedulers;
3033

3134
public class UnicastSubjectTest extends SubjectTest<Integer> {
3235

@@ -456,4 +459,37 @@ public void drainFusedFailFastEmpty() {
456459

457460
to.assertEmpty();
458461
}
462+
463+
@Test
464+
public void fusedNoConcurrentCleanDueToCancel() {
465+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
466+
List<Throwable> errors = TestHelper.trackPluginErrors();
467+
try {
468+
final UnicastSubject<Integer> us = UnicastSubject.create();
469+
470+
TestObserver<Integer> to = us
471+
.observeOn(Schedulers.io())
472+
.map(Functions.<Integer>identity())
473+
.observeOn(Schedulers.single())
474+
.firstOrError()
475+
.test();
476+
477+
for (int i = 0; us.hasObservers(); i++) {
478+
us.onNext(i);
479+
}
480+
481+
to
482+
.awaitDone(5, TimeUnit.SECONDS)
483+
;
484+
485+
if (!errors.isEmpty()) {
486+
throw new CompositeException(errors);
487+
}
488+
489+
to.assertResult(0);
490+
} finally {
491+
RxJavaPlugins.reset();
492+
}
493+
}
494+
}
459495
}

0 commit comments

Comments
 (0)