Skip to content

Commit c2baf93

Browse files
committed
Fix Zip race condition
ItemObserver onNext might not acquire the write lock due to an onCompleted being handled by another thread. When handling onCompleted, the ItemObserver does not check for any values that are ready to be emitted, which might cause OperationZip to never emit OnNext or OnCompleted.
1 parent 1573df9 commit c2baf93

File tree

1 file changed

+42
-57
lines changed

1 file changed

+42
-57
lines changed

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 42 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> OnSubscribeFunc<R> zip(Obs
103103
}
104104

105105
public static <R> OnSubscribeFunc<R> zip(Iterable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
106-
ManyObservables<?, R> a = new ManyObservables<Object, R>(ws, zipFunction);
107-
return a;
106+
return new ManyObservables<Object, R>(ws, zipFunction);
108107
}
109108

110109
/**
@@ -246,7 +245,6 @@ public ItemObserver(
246245
this.cancel = cancel;
247246
}
248247

249-
@SuppressWarnings("unchecked")
250248
@Override
251249
public void onNext(T value) {
252250
rwLock.readLock().lock();
@@ -258,43 +256,7 @@ public void onNext(T value) {
258256
} finally {
259257
rwLock.readLock().unlock();
260258
}
261-
// run collector
262-
if (rwLock.writeLock().tryLock()) {
263-
boolean cu = false;
264-
try {
265-
while (true) {
266-
List<T> values = new ArrayList<T>(all.size());
267-
for (ItemObserver<T> io : all) {
268-
if (io.queue.isEmpty()) {
269-
if (io.done) {
270-
observer.onCompleted();
271-
cu = true;
272-
return;
273-
}
274-
continue;
275-
}
276-
Object v = io.queue.peek();
277-
if (v == NULL_SENTINEL) {
278-
v = null;
279-
}
280-
values.add((T) v);
281-
}
282-
if (values.size() == all.size()) {
283-
for (ItemObserver<T> io : all) {
284-
io.queue.poll();
285-
}
286-
observer.onNext(values);
287-
} else {
288-
break;
289-
}
290-
}
291-
} finally {
292-
rwLock.writeLock().unlock();
293-
if (cu) {
294-
cancel.unsubscribe();
295-
}
296-
}
297-
}
259+
runCollector();
298260
}
299261

300262
@Override
@@ -321,23 +283,7 @@ public void onCompleted() {
321283
} finally {
322284
rwLock.readLock().unlock();
323285
}
324-
if (rwLock.writeLock().tryLock()) {
325-
boolean cu = false;
326-
try {
327-
for (ItemObserver<T> io : all) {
328-
if (io.queue.isEmpty() && io.done) {
329-
observer.onCompleted();
330-
cu = true;
331-
return;
332-
}
333-
}
334-
} finally {
335-
rwLock.writeLock().unlock();
336-
if (cu) {
337-
cancel.unsubscribe();
338-
}
339-
}
340-
}
286+
runCollector();
341287
unsubscribe();
342288
}
343289

@@ -351,6 +297,45 @@ public void unsubscribe() {
351297
toSource.unsubscribe();
352298
}
353299

300+
@SuppressWarnings("unchecked")
301+
private void runCollector() {
302+
if (rwLock.writeLock().tryLock()) {
303+
boolean cu = false;
304+
try {
305+
while (true) {
306+
List<T> values = new ArrayList<T>(all.size());
307+
for (ItemObserver<T> io : all) {
308+
if (io.queue.isEmpty()) {
309+
if (io.done) {
310+
observer.onCompleted();
311+
cu = true;
312+
return;
313+
}
314+
continue;
315+
}
316+
Object v = io.queue.peek();
317+
if (v == NULL_SENTINEL) {
318+
v = null;
319+
}
320+
values.add((T) v);
321+
}
322+
if (values.size() == all.size()) {
323+
for (ItemObserver<T> io : all) {
324+
io.queue.poll();
325+
}
326+
observer.onNext(values);
327+
} else {
328+
break;
329+
}
330+
}
331+
} finally {
332+
rwLock.writeLock().unlock();
333+
if (cu) {
334+
cancel.unsubscribe();
335+
}
336+
}
337+
}
338+
}
354339
}
355340
}
356341

0 commit comments

Comments
 (0)