Skip to content

Commit b03e07e

Browse files
Merge pull request #919 from benjchristensen/zip-bugfix-868
BugFix: Zip Never Completes When Zero Observables
2 parents 9c5a04c + 243bd3a commit b03e07e

File tree

3 files changed

+110
-2
lines changed

3 files changed

+110
-2
lines changed

rxjava-core/src/main/java/rx/operators/OperatorZip.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,14 @@ public OperatorZip(Func9 f) {
106106
public Subscriber<? super Observable[]> call(final Subscriber<? super R> observer) {
107107
return new Subscriber<Observable[]>(observer) {
108108

109+
boolean started = false;
110+
109111
@Override
110112
public void onCompleted() {
111-
// we only complete once a child Observable completes or errors
113+
if (!started) {
114+
// this means we have not received a valid onNext before termination so we emit the onCompleted
115+
observer.onCompleted();
116+
}
112117
}
113118

114119
@Override
@@ -118,7 +123,12 @@ public void onError(Throwable e) {
118123

119124
@Override
120125
public void onNext(Observable[] observables) {
121-
new Zip<R>(observables, observer, zipFunction).zip();
126+
if (observables == null || observables.length == 0) {
127+
observer.onCompleted();
128+
} else {
129+
started = true;
130+
new Zip<R>(observables, observer, zipFunction).zip();
131+
}
122132
}
123133

124134
};

rxjava-core/src/test/java/rx/ZipTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
package rx;
1717

18+
import static org.junit.Assert.*;
19+
20+
import java.util.Collection;
21+
import java.util.Collections;
1822
import java.util.HashMap;
1923
import java.util.Map;
2024

@@ -31,6 +35,7 @@
3135
import rx.functions.Action1;
3236
import rx.functions.Func1;
3337
import rx.functions.Func2;
38+
import rx.functions.FuncN;
3439
import rx.observables.GroupedObservable;
3540

3641
public class ZipTests {
@@ -91,6 +96,31 @@ public void testCovarianceOfZip() {
9196
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
9297
}
9398

99+
/**
100+
* Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead
101+
* of immediately invoking zip with 0 argument.
102+
*
103+
* We now expect an IllegalArgumentException since last() requires at least one value and nothing will be emitted.
104+
*/
105+
@Test(expected = IllegalArgumentException.class)
106+
public void nonBlockingObservable() {
107+
108+
final Object invoked = new Object();
109+
110+
Collection<Observable<Object>> observables = Collections.emptyList();
111+
112+
Observable<Object> result = Observable.zip(observables, new FuncN<Object>() {
113+
@Override
114+
public Object call(final Object... args) {
115+
System.out.println("received: " + args);
116+
assertEquals("No argument should have been passed", 0, args.length);
117+
return invoked;
118+
}
119+
});
120+
121+
assertSame(invoked, result.toBlockingObservable().last());
122+
}
123+
94124
Func2<Media, Rating, ExtendedResult> combine = new Func2<Media, Rating, ExtendedResult>() {
95125
@Override
96126
public ExtendedResult call(Media m, Rating r) {

rxjava-core/src/test/java/rx/operators/OperatorZipTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.Collection;
25+
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@
4243
import rx.functions.Func3;
4344
import rx.functions.FuncN;
4445
import rx.functions.Functions;
46+
import rx.observers.TestSubscriber;
4547
import rx.subjects.PublishSubject;
4648
import rx.subscriptions.Subscriptions;
4749

@@ -986,6 +988,72 @@ public void call(String s) {
986988
assertEquals("OnCompleted_null-OnCompleted_null", list.get(3));
987989
}
988990

991+
@Test
992+
public void testZipEmptyObservables() {
993+
994+
Observable<String> o = Observable.zip(Observable.<Integer> empty(), Observable.<String> empty(), new Func2<Integer, String, String>() {
995+
996+
@Override
997+
public String call(Integer t1, String t2) {
998+
return t1 + "-" + t2;
999+
}
1000+
1001+
});
1002+
1003+
final ArrayList<String> list = new ArrayList<String>();
1004+
o.subscribe(new Action1<String>() {
1005+
1006+
@Override
1007+
public void call(String s) {
1008+
System.out.println(s);
1009+
list.add(s);
1010+
}
1011+
});
1012+
1013+
assertEquals(0, list.size());
1014+
}
1015+
1016+
@Test
1017+
public void testZipEmptyList() {
1018+
1019+
final Object invoked = new Object();
1020+
Collection<Observable<Object>> observables = Collections.emptyList();
1021+
1022+
Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
1023+
@Override
1024+
public Object call(final Object... args) {
1025+
assertEquals("No argument should have been passed", 0, args.length);
1026+
return invoked;
1027+
}
1028+
});
1029+
1030+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
1031+
o.subscribe(ts);
1032+
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
1033+
ts.assertReceivedOnNext(Collections.emptyList());
1034+
}
1035+
1036+
/**
1037+
* Expect IllegalArgumentException instead of blocking forever as zip should emit onCompleted and no onNext
1038+
* and last() expects at least a single response.
1039+
*/
1040+
@Test(expected = IllegalArgumentException.class)
1041+
public void testZipEmptyListBlocking() {
1042+
1043+
final Object invoked = new Object();
1044+
Collection<Observable<Object>> observables = Collections.emptyList();
1045+
1046+
Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
1047+
@Override
1048+
public Object call(final Object... args) {
1049+
assertEquals("No argument should have been passed", 0, args.length);
1050+
return invoked;
1051+
}
1052+
});
1053+
1054+
o.toBlockingObservable().last();
1055+
}
1056+
9891057
Observable<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());
9901058

9911059
Observable<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {

0 commit comments

Comments
 (0)