Skip to content

Commit 340bed9

Browse files
matgabrielakarnokd
authored andcommitted
FlowableWithLatestFrom forgets request (#5494)
* FlowableWithLatestFrom forgets request * Add unit test and correct testBackpressure * Revert indentation change * Revert import change
1 parent cd62d62 commit 340bed9

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public void onNext(T t) {
8181
return;
8282
}
8383
actual.onNext(r);
84+
} else{
85+
request(1);
8486
}
8587
}
8688

src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ public void testNoDownstreamUnsubscribe() {
261261

262262
@Test
263263
public void testBackpressure() {
264-
Flowable<Integer> source = Flowable.range(1, 10);
264+
PublishProcessor<Integer> source = PublishProcessor.create();
265265
PublishProcessor<Integer> other = PublishProcessor.create();
266266

267267
Flowable<Integer> result = source.withLatestFrom(other, COMBINER);
@@ -274,17 +274,24 @@ public void testBackpressure() {
274274

275275
ts.request(1);
276276

277+
source.onNext(1);
278+
277279
assertTrue("Other has no observers!", other.hasSubscribers());
278280

279281
ts.assertNoValues();
280282

281283
other.onNext(1);
282284

283-
ts.request(1);
285+
source.onNext(2);
284286

285287
ts.assertValue((2 << 8) + 1);
286288

287289
ts.request(5);
290+
source.onNext(3);
291+
source.onNext(4);
292+
source.onNext(5);
293+
source.onNext(6);
294+
source.onNext(7);
288295
ts.assertValues(
289296
(2 << 8) + 1, (3 << 8) + 1, (4 << 8) + 1, (5 << 8) + 1,
290297
(6 << 8) + 1, (7 << 8) + 1
@@ -717,4 +724,30 @@ public void zeroOtherCombinerReturnsNull() {
717724
.test()
718725
.assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value");
719726
}
727+
728+
@Test
729+
public void testSingleRequestNotForgottenWhenNoData() {
730+
PublishProcessor<Integer> source = PublishProcessor.create();
731+
PublishProcessor<Integer> other = PublishProcessor.create();
732+
733+
Flowable<Integer> result = source.withLatestFrom(other, COMBINER);
734+
735+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L);
736+
737+
result.subscribe(ts);
738+
739+
ts.request(1);
740+
741+
source.onNext(1);
742+
743+
ts.assertNoValues();
744+
745+
other.onNext(1);
746+
747+
ts.assertNoValues();
748+
749+
source.onNext(2);
750+
751+
ts.assertValue((2 << 8) + 1);
752+
}
720753
}

0 commit comments

Comments
 (0)