Skip to content

Commit 7f3173b

Browse files
committed
1.x: fix for zip(Obs<Obs<T>>) backpressure problem
Reported in #3492.
1 parent b293751 commit 7f3173b

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

src/main/java/rx/internal/operators/OperatorZip.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,11 @@ public OperatorZip(Func9 f) {
111111
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) {
112112
final Zip<R> zipper = new Zip<R>(child, zipFunction);
113113
final ZipProducer<R> producer = new ZipProducer<R>(zipper);
114-
child.setProducer(producer);
115114
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer);
115+
116+
child.add(subscriber);
117+
child.setProducer(producer);
118+
116119
return subscriber;
117120
}
118121

@@ -124,7 +127,6 @@ private final class ZipSubscriber extends Subscriber<Observable[]> {
124127
final ZipProducer<R> producer;
125128

126129
public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper, ZipProducer<R> producer) {
127-
super(child);
128130
this.child = child;
129131
this.zipper = zipper;
130132
this.producer = producer;

src/test/java/rx/internal/operators/OperatorZipTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,4 +1313,32 @@ public Integer call(Integer t1, Integer t2) {
13131313
ts.assertNoErrors();
13141314
ts.assertReceivedOnNext(Arrays.asList(11));
13151315
}
1316+
1317+
@SuppressWarnings("cast")
1318+
@Test
1319+
public void testZipObservableObservableBackpressure() {
1320+
@SuppressWarnings("unchecked")
1321+
Observable<Integer>[] osArray = new Observable[] {
1322+
Observable.range(0, 10),
1323+
Observable.range(0, 10)
1324+
};
1325+
1326+
Observable<Observable<Integer>> os = (Observable<Observable<Integer>>) Observable.from(osArray);
1327+
Observable<Integer> o1 = Observable.zip(os, new FuncN<Integer>() {
1328+
@Override
1329+
public Integer call(Object... a) {
1330+
return 0;
1331+
}
1332+
});
1333+
1334+
TestSubscriber<Integer> sub1 = TestSubscriber.create(5);
1335+
1336+
o1.subscribe(sub1);
1337+
1338+
sub1.requestMore(5);
1339+
1340+
sub1.assertValueCount(10);
1341+
sub1.assertNoErrors();
1342+
sub1.assertCompleted();
1343+
}
13161344
}

0 commit comments

Comments
 (0)