@@ -54,7 +54,7 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
54
54
private int wip ;
55
55
private boolean completed ;
56
56
57
- private SubscriptionIndexedRingBuffer <InnerSubscriber <T >> childrenSubscribers ;
57
+ private volatile SubscriptionIndexedRingBuffer <InnerSubscriber <T >> childrenSubscribers ;
58
58
59
59
private RxRingBuffer scalarValueQueue = null ;
60
60
@@ -245,11 +245,12 @@ private boolean drainQueuesIfNeeded() {
245
245
emitted = drainScalarValueQueue ();
246
246
drainChildrenQueues ();
247
247
} finally {
248
- if (!releaseEmitLock ()) {
249
- return true ;
250
- }
248
+ boolean moreToDrain = releaseEmitLock ();
251
249
// request outside of lock
252
250
request (emitted );
251
+ if (!moreToDrain ) {
252
+ return true ;
253
+ }
253
254
// otherwise we'll loop and get whatever was added
254
255
}
255
256
} else {
@@ -351,27 +352,29 @@ public void onCompleted() {
351
352
}
352
353
if (c ) {
353
354
// complete outside of lock
354
- actual . onCompleted ();
355
+ drainAndComplete ();
355
356
}
356
357
}
357
358
358
359
void completeInner (InnerSubscriber <T > s ) {
359
- try {
360
- boolean sendOnComplete = false ;
361
- synchronized (this ) {
362
- wip --;
363
- if (wip == 0 && completed ) {
364
- sendOnComplete = true ;
365
- }
366
- }
367
- if (sendOnComplete ) {
368
- actual .onCompleted ();
360
+ boolean sendOnComplete = false ;
361
+ synchronized (this ) {
362
+ wip --;
363
+ if (wip == 0 && completed ) {
364
+ sendOnComplete = true ;
369
365
}
370
- } finally {
371
- childrenSubscribers .remove (s .sindex );
366
+ }
367
+ childrenSubscribers .remove (s .sindex );
368
+ if (sendOnComplete ) {
369
+ drainAndComplete ();
372
370
}
373
371
}
374
372
373
+ private void drainAndComplete () {
374
+ drainQueuesIfNeeded (); // TODO need to confirm whether this is needed or not
375
+ actual .onCompleted ();
376
+ }
377
+
375
378
}
376
379
377
380
private static final class MergeProducer <T > implements Producer {
@@ -403,9 +406,10 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
403
406
final MergeSubscriber <T > parentSubscriber ;
404
407
final MergeProducer <T > producer ;
405
408
/** Make sure the inner termination events are delivered only once. */
406
- volatile int once ;
409
+ volatile int terminated ;
407
410
@ SuppressWarnings ("rawtypes" )
408
- static final AtomicIntegerFieldUpdater <InnerSubscriber > ONCE_UPDATER = AtomicIntegerFieldUpdater .newUpdater (InnerSubscriber .class , "once" );
411
+ static final AtomicIntegerFieldUpdater <InnerSubscriber > ONCE_TERMINATED = AtomicIntegerFieldUpdater .newUpdater (InnerSubscriber .class , "terminated" );
412
+
409
413
private final RxRingBuffer q = RxRingBuffer .getSpmcInstance ();
410
414
/* protected by emitLock */
411
415
int emitted = 0 ;
@@ -426,14 +430,14 @@ public void onNext(T t) {
426
430
@ Override
427
431
public void onError (Throwable e ) {
428
432
// it doesn't go through queues, it immediately onErrors and tears everything down
429
- if (ONCE_UPDATER .compareAndSet (this , 0 , 1 )) {
433
+ if (ONCE_TERMINATED .compareAndSet (this , 0 , 1 )) {
430
434
parentSubscriber .onError (e );
431
435
}
432
436
}
433
437
434
438
@ Override
435
439
public void onCompleted () {
436
- if (ONCE_UPDATER .compareAndSet (this , 0 , 1 )) {
440
+ if (ONCE_TERMINATED .compareAndSet (this , 0 , 1 )) {
437
441
emit (null , true );
438
442
}
439
443
}
0 commit comments