Skip to content

Commit 5026999

Browse files
authored
3.x: Fix MulticastProcessor not requesting more after limit is reached (#6714)
* 3.x: Fix MulticastProcessor not requesting more after limit is reached * Test for more prefetch values and patterns.
1 parent e4c4903 commit 5026999

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,7 @@ void drain() {
570570
}
571571
}
572572

573+
consumed = c;
573574
missed = wip.addAndGet(-missed);
574575
if (missed == 0) {
575576
break;

src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,4 +784,41 @@ public void noUpstream() {
784784
assertTrue(mp.hasSubscribers());
785785
}
786786

787+
@Test
788+
public void requestUpstreamPrefetchNonFused() {
789+
for (int j = 1; j < 12; j++) {
790+
MulticastProcessor<Integer> mp = MulticastProcessor.create(j, true);
791+
792+
TestSubscriber<Integer> ts = mp.test(0).withTag("Prefetch: " + j);
793+
794+
Flowable.range(1, 10).hide().subscribe(mp);
795+
796+
ts.assertEmpty()
797+
.requestMore(3)
798+
.assertValuesOnly(1, 2, 3)
799+
.requestMore(3)
800+
.assertValuesOnly(1, 2, 3, 4, 5, 6)
801+
.requestMore(4)
802+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
803+
}
804+
}
805+
806+
@Test
807+
public void requestUpstreamPrefetchNonFused2() {
808+
for (int j = 1; j < 12; j++) {
809+
MulticastProcessor<Integer> mp = MulticastProcessor.create(j, true);
810+
811+
TestSubscriber<Integer> ts = mp.test(0).withTag("Prefetch: " + j);
812+
813+
Flowable.range(1, 10).hide().subscribe(mp);
814+
815+
ts.assertEmpty()
816+
.requestMore(2)
817+
.assertValuesOnly(1, 2)
818+
.requestMore(2)
819+
.assertValuesOnly(1, 2, 3, 4)
820+
.requestMore(6)
821+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
822+
}
823+
}
787824
}

0 commit comments

Comments
 (0)