@@ -18,7 +18,10 @@ use std::{
18
18
collections:: BTreeMap ,
19
19
fmt,
20
20
ops:: { Deref , DerefMut } ,
21
- sync:: Arc ,
21
+ sync:: {
22
+ atomic:: { AtomicUsize , Ordering } ,
23
+ Arc ,
24
+ } ,
22
25
} ;
23
26
24
27
use events:: Gap ;
@@ -75,18 +78,15 @@ pub struct RoomEventCacheListener {
75
78
auto_shrink_sender : mpsc:: Sender < AutoShrinkChannelPayload > ,
76
79
77
80
/// Shared instance of the auto-shrinker.
78
- auto_shrinker : Arc < ( ) > ,
81
+ listener_count : Arc < AtomicUsize > ,
79
82
}
80
83
81
84
impl Drop for RoomEventCacheListener {
82
85
fn drop ( & mut self ) {
83
- if Arc :: strong_count ( & self . auto_shrinker ) == 2 {
84
- // There are only two instances of the auto-shrinker: the one in the
85
- // `RoomEventCacheState`, and the one here.
86
- //
87
- // The one here is about to be dropped, such that there'll remain only the one
88
- // in the `RoomEventCacheState`; notify the auto-shrink task that it
89
- // may have to trigger for this room.
86
+ let previous_listener_count = self . listener_count . fetch_sub ( 1 , Ordering :: SeqCst ) ;
87
+ if previous_listener_count == 1 {
88
+ // We were the last instance of the listener; let the auto-shrinker know by
89
+ // notifying it of our room id.
90
90
91
91
let mut room_id = self . room_id . clone ( ) ;
92
92
@@ -145,16 +145,17 @@ impl RoomEventCache {
145
145
/// Subscribe to this room updates, after getting the initial list of
146
146
/// events.
147
147
pub async fn subscribe ( & self ) -> ( Vec < TimelineEvent > , RoomEventCacheListener ) {
148
- let mut state = self . inner . state . write ( ) . await ;
148
+ let state = self . inner . state . read ( ) . await ;
149
149
let events = state. events ( ) . events ( ) . map ( |( _position, item) | item. clone ( ) ) . collect ( ) ;
150
150
151
- let listener_count = state. auto_shrinker . get_or_insert_with ( || Arc :: new ( ( ) ) ) . clone ( ) ;
151
+ state. listener_count . fetch_add ( 1 , Ordering :: SeqCst ) ;
152
+
152
153
let recv = self . inner . sender . subscribe ( ) ;
153
154
let listener = RoomEventCacheListener {
154
155
recv,
155
- auto_shrinker : listener_count,
156
156
room_id : self . inner . room_id . clone ( ) ,
157
157
auto_shrink_sender : self . inner . auto_shrink_sender . clone ( ) ,
158
+ listener_count : state. listener_count . clone ( ) ,
158
159
} ;
159
160
160
161
( events, listener)
@@ -627,7 +628,7 @@ pub(super) enum LoadMoreEventsBackwardsOutcome {
627
628
628
629
// Use a private module to hide `events` to this parent module.
629
630
mod private {
630
- use std:: sync:: Arc ;
631
+ use std:: sync:: { atomic :: AtomicUsize , Arc } ;
631
632
632
633
use eyeball_im:: VectorDiff ;
633
634
use matrix_sdk_base:: {
@@ -669,10 +670,8 @@ mod private {
669
670
/// that upon clearing the timeline events.
670
671
pub waited_for_initial_prev_token : bool ,
671
672
672
- /// A shared auto-shrinker, shared among all the live subscribers.
673
- ///
674
- /// If set to `None`, means there are no
675
- pub ( super ) auto_shrinker : Option < Arc < ( ) > > ,
673
+ /// An atomic count of the current number of listeners.
674
+ pub ( super ) listener_count : Arc < AtomicUsize > ,
676
675
}
677
676
678
677
impl RoomEventCacheState {
@@ -729,7 +728,7 @@ mod private {
729
728
events,
730
729
deduplicator,
731
730
waited_for_initial_prev_token : false ,
732
- auto_shrinker : None ,
731
+ listener_count : Default :: default ( ) ,
733
732
} )
734
733
}
735
734
@@ -911,21 +910,19 @@ mod private {
911
910
Ok ( Some ( self . events . updates_as_vector_diffs ( ) ) )
912
911
}
913
912
914
- /// Automatically shrink the room if there are no listeners.
913
+ /// Automatically shrink the room if there are no listeners, as
914
+ /// indicated by the atomic number of active listeners.
915
915
#[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
916
916
pub ( crate ) async fn auto_shrink_if_no_listeners (
917
917
& mut self ,
918
918
) -> Result < Option < Vec < VectorDiff < TimelineEvent > > > , EventCacheError > {
919
- let mut diffs = None ;
920
- if let Some ( auto_shrinker) = self . auto_shrinker . as_mut ( ) {
919
+ if self . listener_count . load ( std:: sync:: atomic:: Ordering :: SeqCst ) == 0 {
921
920
// If we are the last strong reference to the auto-shrinker, we can shrink the
922
921
// events data structure to its last chunk.
923
- if Arc :: strong_count ( auto_shrinker) == 1 {
924
- self . auto_shrinker = None ;
925
- diffs = self . shrink_to_last_chunk ( ) . await ?;
926
- }
922
+ self . shrink_to_last_chunk ( ) . await
923
+ } else {
924
+ Ok ( None )
927
925
}
928
- Ok ( diffs)
929
926
}
930
927
931
928
/// Removes the bundled relations from an event, if they were present.
@@ -2203,7 +2200,7 @@ mod tests {
2203
2200
{
2204
2201
// Check the inner state: there's no more shared auto-shrinker.
2205
2202
let state = room_event_cache. inner . state . read ( ) . await ;
2206
- assert ! ( state. auto_shrinker . is_none ( ) ) ;
2203
+ assert_eq ! ( state. listener_count . load ( std :: sync :: atomic :: Ordering :: SeqCst ) , 0 ) ;
2207
2204
}
2208
2205
2209
2206
// Getting the events will only give us the latest chunk.
0 commit comments