Skip to content

Commit 184a17b

Browse files
authored
2.x: Fix window() with start/end selector not disposing/cancelling properly (#6398)
* 2.x: Fix window() with s/e selector not disposing/cancelling properly * Fix cancellation upon backpressure problem/handler crash
1 parent 7fffa00 commit 184a17b

File tree

4 files changed

+119
-17
lines changed

4 files changed

+119
-17
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ static final class WindowBoundaryMainSubscriber<T, B, V>
7171

7272
final AtomicLong windows = new AtomicLong();
7373

74+
final AtomicBoolean stopWindows = new AtomicBoolean();
75+
7476
WindowBoundaryMainSubscriber(Subscriber<? super Flowable<T>> actual,
7577
Publisher<B> open, Function<? super B, ? extends Publisher<V>> close, int bufferSize) {
7678
super(actual, new MpscLinkedQueue<Object>());
@@ -89,14 +91,13 @@ public void onSubscribe(Subscription s) {
8991

9092
downstream.onSubscribe(this);
9193

92-
if (cancelled) {
94+
if (stopWindows.get()) {
9395
return;
9496
}
9597

9698
OperatorWindowBoundaryOpenSubscriber<T, B> os = new OperatorWindowBoundaryOpenSubscriber<T, B>(this);
9799

98100
if (boundary.compareAndSet(null, os)) {
99-
windows.getAndIncrement();
100101
s.request(Long.MAX_VALUE);
101102
open.subscribe(os);
102103
}
@@ -177,7 +178,12 @@ public void request(long n) {
177178

178179
@Override
179180
public void cancel() {
180-
cancelled = true;
181+
if (stopWindows.compareAndSet(false, true)) {
182+
DisposableHelper.dispose(boundary);
183+
if (windows.decrementAndGet() == 0) {
184+
upstream.cancel();
185+
}
186+
}
181187
}
182188

183189
void dispose() {
@@ -236,7 +242,7 @@ void drainLoop() {
236242
continue;
237243
}
238244

239-
if (cancelled) {
245+
if (stopWindows.get()) {
240246
continue;
241247
}
242248

@@ -250,7 +256,7 @@ void drainLoop() {
250256
produced(1);
251257
}
252258
} else {
253-
cancelled = true;
259+
cancel();
254260
a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests"));
255261
continue;
256262
}
@@ -260,7 +266,7 @@ void drainLoop() {
260266
try {
261267
p = ObjectHelper.requireNonNull(close.apply(wo.open), "The publisher supplied is null");
262268
} catch (Throwable e) {
263-
cancelled = true;
269+
cancel();
264270
a.onError(e);
265271
continue;
266272
}

src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ static final class WindowBoundaryMainObserver<T, B, V>
6969

7070
final AtomicLong windows = new AtomicLong();
7171

72+
final AtomicBoolean stopWindows = new AtomicBoolean();
73+
7274
WindowBoundaryMainObserver(Observer<? super Observable<T>> actual,
7375
ObservableSource<B> open, Function<? super B, ? extends ObservableSource<V>> close, int bufferSize) {
7476
super(actual, new MpscLinkedQueue<Object>());
@@ -87,14 +89,13 @@ public void onSubscribe(Disposable d) {
8789

8890
downstream.onSubscribe(this);
8991

90-
if (cancelled) {
92+
if (stopWindows.get()) {
9193
return;
9294
}
9395

9496
OperatorWindowBoundaryOpenObserver<T, B> os = new OperatorWindowBoundaryOpenObserver<T, B>(this);
9597

9698
if (boundary.compareAndSet(null, os)) {
97-
windows.getAndIncrement();
9899
open.subscribe(os);
99100
}
100101
}
@@ -164,12 +165,17 @@ void error(Throwable t) {
164165

165166
@Override
166167
public void dispose() {
167-
cancelled = true;
168+
if (stopWindows.compareAndSet(false, true)) {
169+
DisposableHelper.dispose(boundary);
170+
if (windows.decrementAndGet() == 0) {
171+
upstream.dispose();
172+
}
173+
}
168174
}
169175

170176
@Override
171177
public boolean isDisposed() {
172-
return cancelled;
178+
return stopWindows.get();
173179
}
174180

175181
void disposeBoundary() {
@@ -229,7 +235,7 @@ void drainLoop() {
229235
continue;
230236
}
231237

232-
if (cancelled) {
238+
if (stopWindows.get()) {
233239
continue;
234240
}
235241

@@ -244,7 +250,7 @@ void drainLoop() {
244250
p = ObjectHelper.requireNonNull(close.apply(wo.open), "The ObservableSource supplied is null");
245251
} catch (Throwable e) {
246252
Exceptions.throwIfFatal(e);
247-
cancelled = true;
253+
stopWindows.set(true);
248254
a.onError(e);
249255
continue;
250256
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
import java.util.*;
1919
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import org.junit.*;
2223
import org.reactivestreams.*;
2324

2425
import io.reactivex.*;
25-
import io.reactivex.exceptions.TestException;
26+
import io.reactivex.exceptions.*;
2627
import io.reactivex.functions.*;
2728
import io.reactivex.internal.functions.Functions;
2829
import io.reactivex.internal.subscriptions.BooleanSubscription;
@@ -254,8 +255,8 @@ public Flowable<Integer> apply(Integer t) {
254255

255256
ts.dispose();
256257

257-
// FIXME subject has subscribers because of the open window
258-
assertTrue(open.hasSubscribers());
258+
// Disposing the outer sequence stops the opening of new windows
259+
assertFalse(open.hasSubscribers());
259260
// FIXME subject has subscribers because of the open window
260261
assertTrue(close.hasSubscribers());
261262
}
@@ -430,4 +431,58 @@ protected void subscribeActual(
430431
RxJavaPlugins.reset();
431432
}
432433
}
434+
435+
static Flowable<Integer> flowableDisposed(final AtomicBoolean ref) {
436+
return Flowable.just(1).concatWith(Flowable.<Integer>never())
437+
.doOnCancel(new Action() {
438+
@Override
439+
public void run() throws Exception {
440+
ref.set(true);
441+
}
442+
});
443+
}
444+
445+
@Test
446+
public void mainAndBoundaryDisposeOnNoWindows() {
447+
AtomicBoolean mainDisposed = new AtomicBoolean();
448+
AtomicBoolean openDisposed = new AtomicBoolean();
449+
final AtomicBoolean closeDisposed = new AtomicBoolean();
450+
451+
flowableDisposed(mainDisposed)
452+
.window(flowableDisposed(openDisposed), new Function<Integer, Flowable<Integer>>() {
453+
@Override
454+
public Flowable<Integer> apply(Integer v) throws Exception {
455+
return flowableDisposed(closeDisposed);
456+
}
457+
})
458+
.test()
459+
.assertSubscribed()
460+
.assertNoErrors()
461+
.assertNotComplete()
462+
.dispose();
463+
464+
assertTrue(mainDisposed.get());
465+
assertTrue(openDisposed.get());
466+
assertTrue(closeDisposed.get());
467+
}
468+
469+
@Test
470+
@SuppressWarnings("unchecked")
471+
public void mainWindowMissingBackpressure() {
472+
PublishProcessor<Integer> source = PublishProcessor.create();
473+
PublishProcessor<Integer> boundary = PublishProcessor.create();
474+
475+
TestSubscriber<Flowable<Integer>> ts = source.window(boundary, Functions.justFunction(Flowable.never()))
476+
.test(0L)
477+
;
478+
479+
ts.assertEmpty();
480+
481+
boundary.onNext(1);
482+
483+
ts.assertFailure(MissingBackpressureException.class);
484+
485+
assertFalse(source.hasSubscribers());
486+
assertFalse(boundary.hasSubscribers());
487+
}
433488
}

src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.*;
1919
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import org.junit.*;
2223

@@ -256,8 +257,8 @@ public Observable<Integer> apply(Integer t) {
256257

257258
to.dispose();
258259

259-
// FIXME subject has subscribers because of the open window
260-
assertTrue(open.hasObservers());
260+
// Disposing the outer sequence stops the opening of new windows
261+
assertFalse(open.hasObservers());
261262
// FIXME subject has subscribers because of the open window
262263
assertTrue(close.hasObservers());
263264
}
@@ -423,4 +424,38 @@ protected void subscribeActual(
423424
RxJavaPlugins.reset();
424425
}
425426
}
427+
428+
static Observable<Integer> observableDisposed(final AtomicBoolean ref) {
429+
return Observable.just(1).concatWith(Observable.<Integer>never())
430+
.doOnDispose(new Action() {
431+
@Override
432+
public void run() throws Exception {
433+
ref.set(true);
434+
}
435+
});
436+
}
437+
438+
@Test
439+
public void mainAndBoundaryDisposeOnNoWindows() {
440+
AtomicBoolean mainDisposed = new AtomicBoolean();
441+
AtomicBoolean openDisposed = new AtomicBoolean();
442+
final AtomicBoolean closeDisposed = new AtomicBoolean();
443+
444+
observableDisposed(mainDisposed)
445+
.window(observableDisposed(openDisposed), new Function<Integer, ObservableSource<Integer>>() {
446+
@Override
447+
public ObservableSource<Integer> apply(Integer v) throws Exception {
448+
return observableDisposed(closeDisposed);
449+
}
450+
})
451+
.test()
452+
.assertSubscribed()
453+
.assertNoErrors()
454+
.assertNotComplete()
455+
.dispose();
456+
457+
assertTrue(mainDisposed.get());
458+
assertTrue(openDisposed.get());
459+
assertTrue(closeDisposed.get());
460+
}
426461
}

0 commit comments

Comments
 (0)