@@ -202,11 +202,9 @@ private static final class ItemObserver<T> implements Observer<T>, Subscription
202
202
/** Reader-writer lock. */
203
203
protected final ReadWriteLock rwLock ;
204
204
/** The queue. */
205
- public final Queue <Object > queue = new LinkedList <Object >();
205
+ public final Queue <T > queue = new LinkedList <T >();
206
206
/** The list of the other observers. */
207
207
public final List <ItemObserver <T >> all ;
208
- /** The null sentinel value. */
209
- protected static final Object NULL_SENTINEL = new Object ();
210
208
/** The global cancel. */
211
209
protected final Subscription cancel ;
212
210
/** The subscription to the source. */
@@ -252,7 +250,7 @@ public void onNext(T value) {
252
250
if (done ) {
253
251
return ;
254
252
}
255
- queue .add (value != null ? value : NULL_SENTINEL );
253
+ queue .add (value );
256
254
} finally {
257
255
rwLock .readLock ().unlock ();
258
256
}
@@ -297,7 +295,6 @@ public void unsubscribe() {
297
295
toSource .unsubscribe ();
298
296
}
299
297
300
- @ SuppressWarnings ("unchecked" )
301
298
private void runCollector () {
302
299
if (rwLock .writeLock ().tryLock ()) {
303
300
boolean cu = false ;
@@ -311,13 +308,10 @@ private void runCollector() {
311
308
cu = true ;
312
309
return ;
313
310
}
314
- continue ;
311
+ } else {
312
+ T value = io .queue .peek ();
313
+ values .add (value );
315
314
}
316
- Object v = io .queue .peek ();
317
- if (v == NULL_SENTINEL ) {
318
- v = null ;
319
- }
320
- values .add ((T ) v );
321
315
}
322
316
if (values .size () == all .size ()) {
323
317
for (ItemObserver <T > io : all ) {
0 commit comments