Skip to content

Commit ab85374

Browse files
Merge pull request #1721 from abersnaze/onBackpressure-request-max-0.20
Bug in the onBackpressure operators
2 parents 0e80124 + b36d833 commit ab85374

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public void request(long n) {
4848
// don't pass through subscriber as we are async and doing queue draining
4949
// a parent being unsubscribed should not affect the children
5050
Subscriber<T> parent = new Subscriber<T>() {
51+
@Override
52+
public void onStart() {
53+
request(Long.MAX_VALUE);
54+
}
5155

5256
@Override
5357
public void onCompleted() {

rxjava-core/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ public void request(long n) {
3636

3737
});
3838
return new Subscriber<T>(child) {
39+
@Override
40+
public void onStart() {
41+
request(Long.MAX_VALUE);
42+
}
3943

4044
@Override
4145
public void onCompleted() {

rxjava-core/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919

20-
import java.util.concurrent.CountDownLatch;
21-
2220
import org.junit.Test;
2321

2422
import rx.Observable;
2523
import rx.Observable.OnSubscribe;
2624
import rx.Observer;
2725
import rx.Subscriber;
26+
import rx.internal.util.RxRingBuffer;
2827
import rx.observers.TestSubscriber;
2928
import rx.schedulers.Schedulers;
3029

30+
import java.util.concurrent.CountDownLatch;
31+
3132
public class OperatorOnBackpressureDropTest {
3233

3334
@Test
@@ -42,6 +43,13 @@ public void testNoBackpressureSupport() {
4243
ts.assertNoErrors();
4344
}
4445

46+
@Test(timeout = 500)
47+
public void testWithObserveOn() throws InterruptedException {
48+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
49+
Observable.range(0, RxRingBuffer.SIZE * 10).onBackpressureDrop().onBackpressureDrop().observeOn(Schedulers.io()).subscribe(ts);
50+
ts.awaitTerminalEvent();
51+
}
52+
4553
@Test(timeout = 500)
4654
public void testFixBackpressureWithBuffer() throws InterruptedException {
4755
final CountDownLatch l1 = new CountDownLatch(100);

0 commit comments

Comments
 (0)