Skip to content

Commit 035b001

Browse files
committed
Merge pull request #3493 from akarnokd/ZipBackpressureFix1x
1.x: fix for zip(Obs<Obs<T>>) backpressure problem
2 parents 2894ed0 + 7f3173b commit 035b001

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

src/main/java/rx/internal/operators/OperatorZip.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,11 @@ public OperatorZip(Func9 f) {
111111
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) {
112112
final Zip<R> zipper = new Zip<R>(child, zipFunction);
113113
final ZipProducer<R> producer = new ZipProducer<R>(zipper);
114-
child.setProducer(producer);
115114
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer);
115+
116+
child.add(subscriber);
117+
child.setProducer(producer);
118+
116119
return subscriber;
117120
}
118121

@@ -124,7 +127,6 @@ private final class ZipSubscriber extends Subscriber<Observable[]> {
124127
final ZipProducer<R> producer;
125128

126129
public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper, ZipProducer<R> producer) {
127-
super(child);
128130
this.child = child;
129131
this.zipper = zipper;
130132
this.producer = producer;

src/test/java/rx/internal/operators/OperatorZipTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,4 +1308,32 @@ public Integer call(Integer t1, Integer t2) {
13081308
ts.assertNoErrors();
13091309
ts.assertReceivedOnNext(Arrays.asList(11));
13101310
}
1311+
1312+
@SuppressWarnings("cast")
1313+
@Test
1314+
public void testZipObservableObservableBackpressure() {
1315+
@SuppressWarnings("unchecked")
1316+
Observable<Integer>[] osArray = new Observable[] {
1317+
Observable.range(0, 10),
1318+
Observable.range(0, 10)
1319+
};
1320+
1321+
Observable<Observable<Integer>> os = (Observable<Observable<Integer>>) Observable.from(osArray);
1322+
Observable<Integer> o1 = Observable.zip(os, new FuncN<Integer>() {
1323+
@Override
1324+
public Integer call(Object... a) {
1325+
return 0;
1326+
}
1327+
});
1328+
1329+
TestSubscriber<Integer> sub1 = TestSubscriber.create(5);
1330+
1331+
o1.subscribe(sub1);
1332+
1333+
sub1.requestMore(5);
1334+
1335+
sub1.assertValueCount(10);
1336+
sub1.assertNoErrors();
1337+
sub1.assertCompleted();
1338+
}
13111339
}

0 commit comments

Comments
 (0)