Skip to content

Commit df2cdb7

Browse files
authored
3.x: Fix concurrent clear in observeOn while output-fused (#6708)
1 parent 1e7abf2 commit df2cdb7

File tree

4 files changed

+69
-3
lines changed

4 files changed

+69
-3
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public final void cancel() {
154154
upstream.cancel();
155155
worker.dispose();
156156

157-
if (getAndIncrement() == 0) {
157+
if (!outputFused && getAndIncrement() == 0) {
158158
queue.clear();
159159
}
160160
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void dispose() {
145145
disposed = true;
146146
upstream.dispose();
147147
worker.dispose();
148-
if (getAndIncrement() == 0) {
148+
if (!outputFused && getAndIncrement() == 0) {
149149
queue.clear();
150150
}
151151
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn.BaseObserveOnSubscriber;
3636
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
3737
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
38+
import io.reactivex.rxjava3.observers.TestObserver;
3839
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3940
import io.reactivex.rxjava3.processors.*;
4041
import io.reactivex.rxjava3.schedulers.*;
@@ -1938,4 +1939,37 @@ public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() {
19381939

19391940
assertEquals(1, s.disposedCount.get());
19401941
}
1942+
1943+
@Test
1944+
public void fusedNoConcurrentCleanDueToCancel() {
1945+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
1946+
List<Throwable> errors = TestHelper.trackPluginErrors();
1947+
try {
1948+
final UnicastProcessor<Integer> up = UnicastProcessor.create();
1949+
1950+
TestObserver<Integer> to = up.hide()
1951+
.observeOn(Schedulers.io())
1952+
.observeOn(Schedulers.single())
1953+
.unsubscribeOn(Schedulers.computation())
1954+
.firstOrError()
1955+
.test();
1956+
1957+
for (int i = 0; up.hasSubscribers() && i < 10000; i++) {
1958+
up.onNext(i);
1959+
}
1960+
1961+
to
1962+
.awaitDone(5, TimeUnit.SECONDS)
1963+
;
1964+
1965+
if (!errors.isEmpty()) {
1966+
throw new CompositeException(errors);
1967+
}
1968+
1969+
to.assertResult(0);
1970+
} finally {
1971+
RxJavaPlugins.reset();
1972+
}
1973+
}
1974+
}
19411975
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOnTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.reactivex.rxjava3.core.Observable;
3030
import io.reactivex.rxjava3.core.Observer;
3131
import io.reactivex.rxjava3.disposables.*;
32-
import io.reactivex.rxjava3.exceptions.TestException;
32+
import io.reactivex.rxjava3.exceptions.*;
3333
import io.reactivex.rxjava3.functions.*;
3434
import io.reactivex.rxjava3.internal.fuseable.*;
3535
import io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOnTest.DisposeTrackingScheduler;
@@ -815,4 +815,36 @@ public void workerNotDisposedPrematurelyNormalInAsyncOut() {
815815
assertEquals(1, s.disposedCount.get());
816816
}
817817

818+
@Test
819+
public void fusedNoConcurrentCleanDueToCancel() {
820+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
821+
List<Throwable> errors = TestHelper.trackPluginErrors();
822+
try {
823+
final UnicastSubject<Integer> us = UnicastSubject.create();
824+
825+
TestObserver<Integer> to = us.hide()
826+
.observeOn(Schedulers.io())
827+
.observeOn(Schedulers.single())
828+
.unsubscribeOn(Schedulers.computation())
829+
.firstOrError()
830+
.test();
831+
832+
for (int i = 0; us.hasObservers() && i < 10000; i++) {
833+
us.onNext(i);
834+
}
835+
836+
to
837+
.awaitDone(5, TimeUnit.SECONDS)
838+
;
839+
840+
if (!errors.isEmpty()) {
841+
throw new CompositeException(errors);
842+
}
843+
844+
to.assertResult(0);
845+
} finally {
846+
RxJavaPlugins.reset();
847+
}
848+
}
849+
}
818850
}

0 commit comments

Comments
 (0)