Skip to content

Commit bd87341

Browse files
Merge pull request #778 from vadims/fix-zip-race
Fix zip race condition
2 parents 1573df9 + f3dbf3c commit bd87341

File tree

1 file changed

+40
-61
lines changed

1 file changed

+40
-61
lines changed

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 40 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> OnSubscribeFunc<R> zip(Obs
103103
}
104104

105105
public static <R> OnSubscribeFunc<R> zip(Iterable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
106-
ManyObservables<?, R> a = new ManyObservables<Object, R>(ws, zipFunction);
107-
return a;
106+
return new ManyObservables<Object, R>(ws, zipFunction);
108107
}
109108

110109
/**
@@ -203,11 +202,9 @@ private static final class ItemObserver<T> implements Observer<T>, Subscription
203202
/** Reader-writer lock. */
204203
protected final ReadWriteLock rwLock;
205204
/** The queue. */
206-
public final Queue<Object> queue = new LinkedList<Object>();
205+
public final Queue<T> queue = new LinkedList<T>();
207206
/** The list of the other observers. */
208207
public final List<ItemObserver<T>> all;
209-
/** The null sentinel value. */
210-
protected static final Object NULL_SENTINEL = new Object();
211208
/** The global cancel. */
212209
protected final Subscription cancel;
213210
/** The subscription to the source. */
@@ -246,55 +243,18 @@ public ItemObserver(
246243
this.cancel = cancel;
247244
}
248245

249-
@SuppressWarnings("unchecked")
250246
@Override
251247
public void onNext(T value) {
252248
rwLock.readLock().lock();
253249
try {
254250
if (done) {
255251
return;
256252
}
257-
queue.add(value != null ? value : NULL_SENTINEL);
253+
queue.add(value);
258254
} finally {
259255
rwLock.readLock().unlock();
260256
}
261-
// run collector
262-
if (rwLock.writeLock().tryLock()) {
263-
boolean cu = false;
264-
try {
265-
while (true) {
266-
List<T> values = new ArrayList<T>(all.size());
267-
for (ItemObserver<T> io : all) {
268-
if (io.queue.isEmpty()) {
269-
if (io.done) {
270-
observer.onCompleted();
271-
cu = true;
272-
return;
273-
}
274-
continue;
275-
}
276-
Object v = io.queue.peek();
277-
if (v == NULL_SENTINEL) {
278-
v = null;
279-
}
280-
values.add((T) v);
281-
}
282-
if (values.size() == all.size()) {
283-
for (ItemObserver<T> io : all) {
284-
io.queue.poll();
285-
}
286-
observer.onNext(values);
287-
} else {
288-
break;
289-
}
290-
}
291-
} finally {
292-
rwLock.writeLock().unlock();
293-
if (cu) {
294-
cancel.unsubscribe();
295-
}
296-
}
297-
}
257+
runCollector();
298258
}
299259

300260
@Override
@@ -321,23 +281,7 @@ public void onCompleted() {
321281
} finally {
322282
rwLock.readLock().unlock();
323283
}
324-
if (rwLock.writeLock().tryLock()) {
325-
boolean cu = false;
326-
try {
327-
for (ItemObserver<T> io : all) {
328-
if (io.queue.isEmpty() && io.done) {
329-
observer.onCompleted();
330-
cu = true;
331-
return;
332-
}
333-
}
334-
} finally {
335-
rwLock.writeLock().unlock();
336-
if (cu) {
337-
cancel.unsubscribe();
338-
}
339-
}
340-
}
284+
runCollector();
341285
unsubscribe();
342286
}
343287

@@ -351,6 +295,41 @@ public void unsubscribe() {
351295
toSource.unsubscribe();
352296
}
353297

298+
private void runCollector() {
299+
if (rwLock.writeLock().tryLock()) {
300+
boolean cu = false;
301+
try {
302+
while (true) {
303+
List<T> values = new ArrayList<T>(all.size());
304+
for (ItemObserver<T> io : all) {
305+
if (io.queue.isEmpty()) {
306+
if (io.done) {
307+
observer.onCompleted();
308+
cu = true;
309+
return;
310+
}
311+
} else {
312+
T value = io.queue.peek();
313+
values.add(value);
314+
}
315+
}
316+
if (values.size() == all.size()) {
317+
for (ItemObserver<T> io : all) {
318+
io.queue.poll();
319+
}
320+
observer.onNext(values);
321+
} else {
322+
break;
323+
}
324+
}
325+
} finally {
326+
rwLock.writeLock().unlock();
327+
if (cu) {
328+
cancel.unsubscribe();
329+
}
330+
}
331+
}
332+
}
354333
}
355334
}
356335

0 commit comments

Comments
 (0)