21
21
22
22
import rx .*;
23
23
import rx .Observable ;
24
- import rx .exceptions .Exceptions ;
25
- import rx .exceptions .OnErrorThrowable ;
24
+ import rx .exceptions .*;
26
25
import rx .functions .*;
26
+ import rx .internal .util .OpenHashSet ;
27
27
import rx .observables .ConnectableObservable ;
28
28
import rx .schedulers .Timestamped ;
29
29
import rx .subscriptions .Subscriptions ;
@@ -314,8 +314,16 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
314
314
/** Indicates a terminated ReplaySubscriber. */
315
315
static final InnerProducer [] TERMINATED = new InnerProducer [0 ];
316
316
317
- /** Tracks the subscribed producers. */
318
- final AtomicReference <InnerProducer []> producers ;
317
+ /** Indicates no further InnerProducers are accepted. */
318
+ volatile boolean terminated ;
319
+ /** Tracks the subscribed producers. Guarded by itself. */
320
+ final OpenHashSet <InnerProducer <T >> producers ;
321
+ /** Contains a copy of the producers. Modified only from the source side. */
322
+ InnerProducer <T >[] producersCache ;
323
+ /** Contains number of modifications to the producers set.*/
324
+ volatile long producersVersion ;
325
+ /** Contains the number of modifications that the producersCache holds. */
326
+ long producersCacheVersion ;
319
327
/**
320
328
* Atomically changed from false to true by connect to make sure the
321
329
* connection is only performed by one thread.
@@ -335,12 +343,19 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
335
343
/** The upstream producer. */
336
344
volatile Producer producer ;
337
345
346
+ /** The queue that holds producers with request changes that need to be coordinated. */
347
+ List <InnerProducer <T >> coordinationQueue ;
348
+ /** Indicate that all request amounts should be considered. */
349
+ boolean coordinateAll ;
350
+
351
+ @ SuppressWarnings ("unchecked" )
338
352
public ReplaySubscriber (AtomicReference <ReplaySubscriber <T >> current ,
339
353
ReplayBuffer <T > buffer ) {
340
354
this .buffer = buffer ;
341
355
342
356
this .nl = NotificationLite .instance ();
343
- this .producers = new AtomicReference <InnerProducer []>(EMPTY );
357
+ this .producers = new OpenHashSet <InnerProducer <T >>();
358
+ this .producersCache = EMPTY ;
344
359
this .shouldConnect = new AtomicBoolean ();
345
360
// make sure the source doesn't produce values until the child subscribers
346
361
// expressed their request amounts
@@ -351,7 +366,15 @@ void init() {
351
366
add (Subscriptions .create (new Action0 () {
352
367
@ Override
353
368
public void call () {
354
- ReplaySubscriber .this .producers .getAndSet (TERMINATED );
369
+ if (!terminated ) {
370
+ synchronized (producers ) {
371
+ if (!terminated ) {
372
+ producers .terminate ();
373
+ producersVersion ++;
374
+ terminated = true ;
375
+ }
376
+ }
377
+ }
355
378
// unlike OperatorPublish, we can't null out the terminated so
356
379
// late subscribers can still get replay
357
380
// current.compareAndSet(ReplaySubscriber.this, null);
@@ -370,76 +393,34 @@ boolean add(InnerProducer<T> producer) {
370
393
if (producer == null ) {
371
394
throw new NullPointerException ();
372
395
}
373
- // the state can change so we do a CAS loop to achieve atomicity
374
- for (;;) {
375
- // get the current producer array
376
- InnerProducer [] c = producers .get ();
377
- // if this subscriber-to-source reached a terminal state by receiving
378
- // an onError or onCompleted, just refuse to add the new producer
379
- if (c == TERMINATED ) {
396
+ if (terminated ) {
397
+ return false ;
398
+ }
399
+ synchronized (producers ) {
400
+ if (terminated ) {
380
401
return false ;
381
402
}
382
- // we perform a copy-on-write logic
383
- int len = c .length ;
384
- InnerProducer [] u = new InnerProducer [len + 1 ];
385
- System .arraycopy (c , 0 , u , 0 , len );
386
- u [len ] = producer ;
387
- // try setting the producers array
388
- if (producers .compareAndSet (c , u )) {
389
- return true ;
390
- }
391
- // if failed, some other operation succeeded (another add, remove or termination)
392
- // so retry
403
+
404
+ producers .add (producer );
405
+ producersVersion ++;
393
406
}
407
+ return true ;
394
408
}
395
409
396
410
/**
397
411
* Atomically removes the given producer from the producers array.
398
412
* @param producer the producer to remove
399
413
*/
400
414
void remove (InnerProducer <T > producer ) {
401
- // the state can change so we do a CAS loop to achieve atomicity
402
- for (;;) {
403
- // let's read the current producers array
404
- InnerProducer [] c = producers .get ();
405
- // if it is either empty or terminated, there is nothing to remove so we quit
406
- if (c == EMPTY || c == TERMINATED ) {
407
- return ;
408
- }
409
- // let's find the supplied producer in the array
410
- // although this is O(n), we don't expect too many child subscribers in general
411
- int j = -1 ;
412
- int len = c .length ;
413
- for (int i = 0 ; i < len ; i ++) {
414
- if (c [i ].equals (producer )) {
415
- j = i ;
416
- break ;
417
- }
418
- }
419
- // we didn't find it so just quit
420
- if (j < 0 ) {
421
- return ;
422
- }
423
- // we do copy-on-write logic here
424
- InnerProducer [] u ;
425
- // we don't create a new empty array if producer was the single inhabitant
426
- // but rather reuse an empty array
427
- if (len == 1 ) {
428
- u = EMPTY ;
429
- } else {
430
- // otherwise, create a new array one less in size
431
- u = new InnerProducer [len - 1 ];
432
- // copy elements being before the given producer
433
- System .arraycopy (c , 0 , u , 0 , j );
434
- // copy elements being after the given producer
435
- System .arraycopy (c , j + 1 , u , j , len - j - 1 );
436
- }
437
- // try setting this new array as
438
- if (producers .compareAndSet (c , u )) {
415
+ if (terminated ) {
416
+ return ;
417
+ }
418
+ synchronized (producers ) {
419
+ if (terminated ) {
439
420
return ;
440
421
}
441
- // if we failed, it means something else happened
442
- // (a concurrent add/remove or termination), we need to retry
422
+ producers . remove ( producer );
423
+ producersVersion ++;
443
424
}
444
425
}
445
426
@@ -450,7 +431,7 @@ public void setProducer(Producer p) {
450
431
throw new IllegalStateException ("Only a single producer can be set on a Subscriber." );
451
432
}
452
433
producer = p ;
453
- manageRequests ();
434
+ manageRequests (null );
454
435
replay ();
455
436
}
456
437
@@ -493,81 +474,157 @@ public void onCompleted() {
493
474
/**
494
475
* Coordinates the request amounts of various child Subscribers.
495
476
*/
496
- void manageRequests () {
477
+ void manageRequests (InnerProducer < T > inner ) {
497
478
// if the upstream has completed, no more requesting is possible
498
479
if (isUnsubscribed ()) {
499
480
return ;
500
481
}
501
482
synchronized (this ) {
502
483
if (emitting ) {
484
+ if (inner != null ) {
485
+ List <InnerProducer <T >> q = coordinationQueue ;
486
+ if (q == null ) {
487
+ q = new ArrayList <InnerProducer <T >>();
488
+ coordinationQueue = q ;
489
+ }
490
+ q .add (inner );
491
+ } else {
492
+ coordinateAll = true ;
493
+ }
503
494
missed = true ;
504
495
return ;
505
496
}
506
497
emitting = true ;
507
498
}
499
+
500
+ long ri = maxChildRequested ;
501
+ long maxTotalRequested ;
502
+
503
+ if (inner != null ) {
504
+ maxTotalRequested = Math .max (ri , inner .totalRequested .get ());
505
+ } else {
506
+ maxTotalRequested = ri ;
507
+
508
+ InnerProducer <T >[] a = copyProducers ();
509
+ for (InnerProducer <T > rp : a ) {
510
+ if (rp != null ) {
511
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
512
+ }
513
+ }
514
+
515
+ }
516
+ makeRequest (maxTotalRequested , ri );
517
+
508
518
for (;;) {
509
519
// if the upstream has completed, no more requesting is possible
510
520
if (isUnsubscribed ()) {
511
521
return ;
512
522
}
513
523
514
- @ SuppressWarnings ("unchecked" )
515
- InnerProducer <T >[] a = producers .get ();
516
-
517
- long ri = maxChildRequested ;
518
- long maxTotalRequests = ri ;
519
-
520
- for (InnerProducer <T > rp : a ) {
521
- maxTotalRequests = Math .max (maxTotalRequests , rp .totalRequested .get ());
524
+ List <InnerProducer <T >> q ;
525
+ boolean all ;
526
+ synchronized (this ) {
527
+ if (!missed ) {
528
+ emitting = false ;
529
+ return ;
530
+ }
531
+ missed = false ;
532
+ q = coordinationQueue ;
533
+ coordinationQueue = null ;
534
+ all = coordinateAll ;
535
+ coordinateAll = false ;
522
536
}
523
537
524
- long ur = maxUpstreamRequested ;
525
- Producer p = producer ;
538
+ ri = maxChildRequested ;
539
+ maxTotalRequested = ri ;
526
540
527
- long diff = maxTotalRequests - ri ;
528
- if (diff != 0 ) {
529
- maxChildRequested = maxTotalRequests ;
530
- if (p != null ) {
531
- if (ur != 0L ) {
532
- maxUpstreamRequested = 0L ;
533
- p .request (ur + diff );
534
- } else {
535
- p .request (diff );
536
- }
537
- } else {
538
- // collect upstream request amounts until there is a producer for them
539
- long u = ur + diff ;
540
- if (u < 0 ) {
541
- u = Long .MAX_VALUE ;
541
+ if (q != null ) {
542
+ for (InnerProducer <T > rp : q ) {
543
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
544
+ }
545
+ }
546
+
547
+ if (all ) {
548
+ InnerProducer <T >[] a = copyProducers ();
549
+ for (InnerProducer <T > rp : a ) {
550
+ if (rp != null ) {
551
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
542
552
}
543
- maxUpstreamRequested = u ;
544
553
}
545
- } else
546
- // if there were outstanding upstream requests and we have a producer
547
- if (ur != 0L && p != null ) {
548
- maxUpstreamRequested = 0L ;
549
- // fire the accumulated requests
550
- p .request (ur );
551
554
}
552
555
553
- synchronized (this ) {
554
- if (!missed ) {
555
- emitting = false ;
556
- return ;
556
+ makeRequest (maxTotalRequested , ri );
557
+ }
558
+ }
559
+
560
+ InnerProducer <T >[] copyProducers () {
561
+ synchronized (producers ) {
562
+ Object [] a = producers .values ();
563
+ int n = a .length ;
564
+ @ SuppressWarnings ("unchecked" )
565
+ InnerProducer <T >[] result = new InnerProducer [n ];
566
+ System .arraycopy (a , 0 , result , 0 , n );
567
+ return result ;
568
+ }
569
+ }
570
+
571
+ void makeRequest (long maxTotalRequests , long previousTotalRequests ) {
572
+ long ur = maxUpstreamRequested ;
573
+ Producer p = producer ;
574
+
575
+ long diff = maxTotalRequests - previousTotalRequests ;
576
+ if (diff != 0 ) {
577
+ maxChildRequested = maxTotalRequests ;
578
+ if (p != null ) {
579
+ if (ur != 0L ) {
580
+ maxUpstreamRequested = 0L ;
581
+ p .request (ur + diff );
582
+ } else {
583
+ p .request (diff );
557
584
}
558
- missed = false ;
585
+ } else {
586
+ // collect upstream request amounts until there is a producer for them
587
+ long u = ur + diff ;
588
+ if (u < 0 ) {
589
+ u = Long .MAX_VALUE ;
590
+ }
591
+ maxUpstreamRequested = u ;
559
592
}
593
+ } else
594
+ // if there were outstanding upstream requests and we have a producer
595
+ if (ur != 0L && p != null ) {
596
+ maxUpstreamRequested = 0L ;
597
+ // fire the accumulated requests
598
+ p .request (ur );
560
599
}
561
600
}
562
601
563
602
/**
564
603
* Tries to replay the buffer contents to all known subscribers.
565
604
*/
605
+ @ SuppressWarnings ("unchecked" )
566
606
void replay () {
567
- @ SuppressWarnings ("unchecked" )
568
- InnerProducer <T >[] a = producers .get ();
569
- for (InnerProducer <T > rp : a ) {
570
- buffer .replay (rp );
607
+ InnerProducer <T >[] pc = producersCache ;
608
+ if (producersCacheVersion != producersVersion ) {
609
+ synchronized (producers ) {
610
+ pc = producersCache ;
611
+ // if the producers hasn't changed do nothing
612
+ // otherwise make a copy of the current set of producers
613
+ Object [] a = producers .values ();
614
+ int n = a .length ;
615
+ if (pc .length != n ) {
616
+ pc = new InnerProducer [n ];
617
+ producersCache = pc ;
618
+ }
619
+ System .arraycopy (a , 0 , pc , 0 , n );
620
+ producersCacheVersion = producersVersion ;
621
+ }
622
+ }
623
+ ReplayBuffer <T > b = buffer ;
624
+ for (InnerProducer <T > rp : pc ) {
625
+ if (rp != null ) {
626
+ b .replay (rp );
627
+ }
571
628
}
572
629
}
573
630
}
@@ -646,7 +703,7 @@ public void request(long n) {
646
703
addTotalRequested (n );
647
704
// if successful, notify the parent dispatcher this child can receive more
648
705
// elements
649
- parent .manageRequests ();
706
+ parent .manageRequests (this );
650
707
651
708
parent .buffer .replay (this );
652
709
return ;
@@ -727,7 +784,7 @@ public void unsubscribe() {
727
784
// let's assume this child had 0 requested before the unsubscription while
728
785
// the others had non-zero. By removing this 'blocking' child, the others
729
786
// are now free to receive events
730
- parent .manageRequests ();
787
+ parent .manageRequests (this );
731
788
}
732
789
}
733
790
}
0 commit comments