Skip to content

Commit 7fffa00

Browse files
authored
2.x: Fix concatEager to dispose sources & clean up properly. (#6405)
1 parent 6e266af commit 7fffa00

File tree

4 files changed

+85
-3
lines changed

4 files changed

+85
-3
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,12 @@ void drainAndCancel() {
176176
}
177177

178178
void cancelAll() {
179-
InnerQueuedSubscriber<R> inner;
179+
InnerQueuedSubscriber<R> inner = current;
180+
current = null;
181+
182+
if (inner != null) {
183+
inner.cancel();
184+
}
180185

181186
while ((inner = subscribers.poll()) != null) {
182187
inner.cancel();

src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,21 @@ public void onComplete() {
162162

163163
@Override
164164
public void dispose() {
165+
if (cancelled) {
166+
return;
167+
}
165168
cancelled = true;
169+
upstream.dispose();
170+
171+
drainAndDispose();
172+
}
173+
174+
void drainAndDispose() {
166175
if (getAndIncrement() == 0) {
167-
queue.clear();
168-
disposeAll();
176+
do {
177+
queue.clear();
178+
disposeAll();
179+
} while (decrementAndGet() != 0);
169180
}
170181
}
171182

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,4 +1333,37 @@ public void arrayDelayErrorMaxConcurrencyErrorDelayed() {
13331333

13341334
ts.assertFailure(TestException.class, 1, 2);
13351335
}
1336+
1337+
@Test
1338+
public void cancelActive() {
1339+
PublishProcessor<Integer> pp1 = PublishProcessor.create();
1340+
PublishProcessor<Integer> pp2 = PublishProcessor.create();
1341+
1342+
TestSubscriber<Integer> ts = Flowable
1343+
.concatEager(Flowable.just(pp1, pp2))
1344+
.test();
1345+
1346+
assertTrue(pp1.hasSubscribers());
1347+
assertTrue(pp2.hasSubscribers());
1348+
1349+
ts.cancel();
1350+
1351+
assertFalse(pp1.hasSubscribers());
1352+
assertFalse(pp2.hasSubscribers());
1353+
}
1354+
1355+
@Test
1356+
public void cancelNoInnerYet() {
1357+
PublishProcessor<Flowable<Integer>> pp1 = PublishProcessor.create();
1358+
1359+
TestSubscriber<Integer> ts = Flowable
1360+
.concatEager(pp1)
1361+
.test();
1362+
1363+
assertTrue(pp1.hasSubscribers());
1364+
1365+
ts.cancel();
1366+
1367+
assertFalse(pp1.hasSubscribers());
1368+
}
13361369
}

src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,4 +1141,37 @@ public void arrayDelayErrorMaxConcurrencyErrorDelayed() {
11411141

11421142
to.assertFailure(TestException.class, 1, 2);
11431143
}
1144+
1145+
@Test
1146+
public void cancelActive() {
1147+
PublishSubject<Integer> ps1 = PublishSubject.create();
1148+
PublishSubject<Integer> ps2 = PublishSubject.create();
1149+
1150+
TestObserver<Integer> to = Observable
1151+
.concatEager(Observable.just(ps1, ps2))
1152+
.test();
1153+
1154+
assertTrue(ps1.hasObservers());
1155+
assertTrue(ps2.hasObservers());
1156+
1157+
to.dispose();
1158+
1159+
assertFalse(ps1.hasObservers());
1160+
assertFalse(ps2.hasObservers());
1161+
}
1162+
1163+
@Test
1164+
public void cancelNoInnerYet() {
1165+
PublishSubject<Observable<Integer>> ps1 = PublishSubject.create();
1166+
1167+
TestObserver<Integer> to = Observable
1168+
.concatEager(ps1)
1169+
.test();
1170+
1171+
assertTrue(ps1.hasObservers());
1172+
1173+
to.dispose();
1174+
1175+
assertFalse(ps1.hasObservers());
1176+
}
11441177
}

0 commit comments

Comments
 (0)