@@ -322,6 +322,22 @@ where
322
322
}
323
323
}
324
324
325
+ impl < Item , Gap > Drop for UpdatesSubscriber < Item , Gap > {
326
+ fn drop ( & mut self ) {
327
+ // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
328
+ // This is important so that the garbage collector can do its jobs correctly
329
+ // without a dead dangling reader token.
330
+ if let Some ( updates) = self . updates . upgrade ( ) {
331
+ let mut updates = updates. write ( ) . unwrap ( ) ;
332
+
333
+ // Remove the reader token from `UpdatesInner`.
334
+ // It's safe to ignore the result of `remove` here: `None` means the token was
335
+ // already removed (note: it should be unreachable).
336
+ let _ = updates. last_index_per_reader . remove ( & self . token ) ;
337
+ }
338
+ }
339
+ }
340
+
325
341
#[ cfg( test) ]
326
342
mod tests {
327
343
use std:: {
@@ -563,19 +579,19 @@ mod tests {
563
579
}
564
580
}
565
581
566
- # [ test ]
567
- fn test_updates_stream ( ) {
568
- use super :: Update :: * ;
582
+ struct CounterWaker {
583
+ number_of_wakeup : Mutex < usize > ,
584
+ }
569
585
570
- struct CounterWaker {
571
- number_of_wakeup : Mutex < usize > ,
586
+ impl Wake for CounterWaker {
587
+ fn wake ( self : Arc < Self > ) {
588
+ * self . number_of_wakeup . lock ( ) . unwrap ( ) += 1 ;
572
589
}
590
+ }
573
591
574
- impl Wake for CounterWaker {
575
- fn wake ( self : Arc < Self > ) {
576
- * self . number_of_wakeup . lock ( ) . unwrap ( ) += 1 ;
577
- }
578
- }
592
+ #[ test]
593
+ fn test_updates_stream ( ) {
594
+ use super :: Update :: * ;
579
595
580
596
let counter_waker = Arc :: new ( CounterWaker { number_of_wakeup : Mutex :: new ( 0 ) } ) ;
581
597
let waker = counter_waker. clone ( ) . into ( ) ;
@@ -646,4 +662,161 @@ mod tests {
646
662
// Wakers calls have not changed.
647
663
assert_eq ! ( * counter_waker. number_of_wakeup. lock( ) . unwrap( ) , 2 ) ;
648
664
}
665
+
666
+ #[ test]
667
+ fn test_updates_multiple_streams ( ) {
668
+ use super :: Update :: * ;
669
+
670
+ let counter_waker1 = Arc :: new ( CounterWaker { number_of_wakeup : Mutex :: new ( 0 ) } ) ;
671
+ let counter_waker2 = Arc :: new ( CounterWaker { number_of_wakeup : Mutex :: new ( 0 ) } ) ;
672
+
673
+ let waker1 = counter_waker1. clone ( ) . into ( ) ;
674
+ let waker2 = counter_waker2. clone ( ) . into ( ) ;
675
+
676
+ let mut context1 = Context :: from_waker ( & waker1) ;
677
+ let mut context2 = Context :: from_waker ( & waker2) ;
678
+
679
+ let mut linked_chunk = LinkedChunk :: < 3 , char , ( ) > :: new_with_update_history ( ) ;
680
+
681
+ let updates_subscriber1 = linked_chunk. updates ( ) . unwrap ( ) . subscribe ( ) ;
682
+ pin_mut ! ( updates_subscriber1) ;
683
+
684
+ // Scope for `updates_subscriber2`.
685
+ let updates_subscriber2_token = {
686
+ let updates_subscriber2 = linked_chunk. updates ( ) . unwrap ( ) . subscribe ( ) ;
687
+ pin_mut ! ( updates_subscriber2) ;
688
+
689
+ // No update, streams are pending.
690
+ assert_matches ! ( updates_subscriber1. as_mut( ) . poll_next( & mut context1) , Poll :: Pending ) ;
691
+ assert_eq ! ( * counter_waker1. number_of_wakeup. lock( ) . unwrap( ) , 0 ) ;
692
+ assert_matches ! ( updates_subscriber2. as_mut( ) . poll_next( & mut context2) , Poll :: Pending ) ;
693
+ assert_eq ! ( * counter_waker2. number_of_wakeup. lock( ) . unwrap( ) , 0 ) ;
694
+
695
+ // Let's generate an update.
696
+ linked_chunk. push_items_back ( [ 'a' ] ) ;
697
+
698
+ // The wakers must have been called.
699
+ assert_eq ! ( * counter_waker1. number_of_wakeup. lock( ) . unwrap( ) , 1 ) ;
700
+ assert_eq ! ( * counter_waker2. number_of_wakeup. lock( ) . unwrap( ) , 1 ) ;
701
+
702
+ // There is an update! Right after that, the streams are pending again.
703
+ assert_matches ! (
704
+ updates_subscriber1. as_mut( ) . poll_next( & mut context1) ,
705
+ Poll :: Ready ( Some ( items) ) => {
706
+ assert_eq!(
707
+ items,
708
+ & [ PushItems { at: Position ( ChunkIdentifier ( 0 ) , 0 ) , items: vec![ 'a' ] } ]
709
+ ) ;
710
+ }
711
+ ) ;
712
+ assert_matches ! ( updates_subscriber1. as_mut( ) . poll_next( & mut context1) , Poll :: Pending ) ;
713
+ assert_matches ! (
714
+ updates_subscriber2. as_mut( ) . poll_next( & mut context2) ,
715
+ Poll :: Ready ( Some ( items) ) => {
716
+ assert_eq!(
717
+ items,
718
+ & [ PushItems { at: Position ( ChunkIdentifier ( 0 ) , 0 ) , items: vec![ 'a' ] } ]
719
+ ) ;
720
+ }
721
+ ) ;
722
+ assert_matches ! ( updates_subscriber2. as_mut( ) . poll_next( & mut context2) , Poll :: Pending ) ;
723
+
724
+ // Let's generate two other updates.
725
+ linked_chunk. push_items_back ( [ 'b' ] ) ;
726
+ linked_chunk. push_items_back ( [ 'c' ] ) ;
727
+
728
+ // The waker must have been called only once for the two updates.
729
+ assert_eq ! ( * counter_waker1. number_of_wakeup. lock( ) . unwrap( ) , 2 ) ;
730
+ assert_eq ! ( * counter_waker2. number_of_wakeup. lock( ) . unwrap( ) , 2 ) ;
731
+
732
+ // Let's poll `updates_subscriber1` only.
733
+ assert_matches ! (
734
+ updates_subscriber1. as_mut( ) . poll_next( & mut context1) ,
735
+ Poll :: Ready ( Some ( items) ) => {
736
+ assert_eq!(
737
+ items,
738
+ & [
739
+ PushItems { at: Position ( ChunkIdentifier ( 0 ) , 1 ) , items: vec![ 'b' ] } ,
740
+ PushItems { at: Position ( ChunkIdentifier ( 0 ) , 2 ) , items: vec![ 'c' ] } ,
741
+ ]
742
+ ) ;
743
+ }
744
+ ) ;
745
+ assert_matches ! ( updates_subscriber1. as_mut( ) . poll_next( & mut context1) , Poll :: Pending ) ;
746
+
747
+ // For the sake of this test, we also need to advance the main reader token.
748
+ let _ = linked_chunk. updates ( ) . unwrap ( ) . take ( ) ;
749
+ let _ = linked_chunk. updates ( ) . unwrap ( ) . take ( ) ;
750
+
751
+ // If we inspect the garbage collector state, `a`, `b` and `c` should still be
752
+ // present because not all of them have been consumed by `updates_subscriber2`
753
+ // yet.
754
+ {
755
+ let updates = linked_chunk. updates ( ) . unwrap ( ) ;
756
+
757
+ let inner = updates. inner . read ( ) . unwrap ( ) ;
758
+
759
+ // Inspect number of updates in memory.
760
+ // We get 2 because the garbage collector runs before data are taken, not after:
761
+ // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
762
+ assert_eq ! ( inner. len( ) , 2 ) ;
763
+
764
+ // Inspect the indices.
765
+ let indices = & inner. last_index_per_reader ;
766
+
767
+ assert_eq ! ( indices. get( & updates_subscriber1. token) , Some ( & 2 ) ) ;
768
+ assert_eq ! ( indices. get( & updates_subscriber2. token) , Some ( & 0 ) ) ;
769
+ }
770
+
771
+ // Poll `updates_subscriber1` again: there is no new update so it must be
772
+ // pending.
773
+ assert_matches ! ( updates_subscriber1. as_mut( ) . poll_next( & mut context1) , Poll :: Pending ) ;
774
+
775
+ // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
776
+ // in memory.
777
+ {
778
+ let updates = linked_chunk. updates ( ) . unwrap ( ) ;
779
+
780
+ let inner = updates. inner . read ( ) . unwrap ( ) ;
781
+
782
+ // Inspect number of updates in memory. Value is unchanged.
783
+ assert_eq ! ( inner. len( ) , 2 ) ;
784
+
785
+ // Inspect the indices. They are unchanged.
786
+ let indices = & inner. last_index_per_reader ;
787
+
788
+ assert_eq ! ( indices. get( & updates_subscriber1. token) , Some ( & 2 ) ) ;
789
+ assert_eq ! ( indices. get( & updates_subscriber2. token) , Some ( & 0 ) ) ;
790
+ }
791
+
792
+ updates_subscriber2. token . clone ( )
793
+ // Drop `updates_subscriber2`!
794
+ } ;
795
+
796
+ // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
797
+ // still no new update, but it will run the garbage collector again, and this
798
+ // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
799
+ // collector must be empty.
800
+ assert_matches ! ( updates_subscriber1. as_mut( ) . poll_next( & mut context1) , Poll :: Pending ) ;
801
+
802
+ // Inspect the garbage collector.
803
+ {
804
+ let updates = linked_chunk. updates ( ) . unwrap ( ) ;
805
+
806
+ let inner = updates. inner . read ( ) . unwrap ( ) ;
807
+
808
+ // Inspect number of updates in memory.
809
+ assert_eq ! ( inner. len( ) , 0 ) ;
810
+
811
+ // Inspect the indices.
812
+ let indices = & inner. last_index_per_reader ;
813
+
814
+ assert_eq ! ( indices. get( & updates_subscriber1. token) , Some ( & 0 ) ) ;
815
+ assert_eq ! ( indices. get( & updates_subscriber2_token) , None ) ; // token is unknown!
816
+ }
817
+
818
+ // When dropping the `LinkedChunk`, it closes the stream.
819
+ drop ( linked_chunk) ;
820
+ assert_matches ! ( updates_subscriber1. as_mut( ) . poll_next( & mut context1) , Poll :: Ready ( None ) ) ;
821
+ }
649
822
}
0 commit comments