@@ -57,7 +57,7 @@ use ruma::{
57
57
} ;
58
58
use tokio:: sync:: {
59
59
broadcast:: { error:: RecvError , Receiver } ,
60
- Mutex , RwLock ,
60
+ mpsc , Mutex , RwLock ,
61
61
} ;
62
62
use tracing:: { error, info, info_span, instrument, trace, warn, Instrument as _, Span } ;
63
63
@@ -129,6 +129,9 @@ pub struct EventCacheDropHandles {
129
129
130
130
/// Task that listens to updates to the user's ignored list.
131
131
ignore_user_list_update_task : JoinHandle < ( ) > ,
132
+
133
+ /// The task used to automatically shrink the linked chunks.
134
+ auto_shrink_linked_chunk_tasks : JoinHandle < ( ) > ,
132
135
}
133
136
134
137
impl Debug for EventCacheDropHandles {
@@ -141,6 +144,7 @@ impl Drop for EventCacheDropHandles {
141
144
fn drop ( & mut self ) {
142
145
self . listen_updates_task . abort ( ) ;
143
146
self . ignore_user_list_update_task . abort ( ) ;
147
+ self . auto_shrink_linked_chunk_tasks . abort ( ) ;
144
148
}
145
149
}
146
150
@@ -172,6 +176,7 @@ impl EventCache {
172
176
by_room : Default :: default ( ) ,
173
177
drop_handles : Default :: default ( ) ,
174
178
all_events : Default :: default ( ) ,
179
+ auto_shrink_sender : Default :: default ( ) ,
175
180
} ) ,
176
181
}
177
182
}
@@ -213,7 +218,19 @@ impl EventCache {
213
218
client. subscribe_to_ignore_user_list_changes ( ) ,
214
219
) ) ;
215
220
216
- Arc :: new ( EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task } )
221
+ let ( tx, rx) = mpsc:: channel ( 32 ) ;
222
+
223
+ // Force-initialize the sender in the [`RoomEventCacheInner`].
224
+ self . inner . auto_shrink_sender . get_or_init ( || tx) ;
225
+
226
+ let auto_shrink_linked_chunk_tasks =
227
+ spawn ( Self :: auto_shrink_linked_chunk_task ( self . inner . clone ( ) , rx) ) ;
228
+
229
+ Arc :: new ( EventCacheDropHandles {
230
+ listen_updates_task,
231
+ ignore_user_list_update_task,
232
+ auto_shrink_linked_chunk_tasks,
233
+ } )
217
234
} ) ;
218
235
219
236
Ok ( ( ) )
@@ -309,6 +326,59 @@ impl EventCache {
309
326
}
310
327
}
311
328
329
+ /// Spawns the task that will listen to auto-shrink notifications.
330
+ ///
331
+ /// The auto-shrink mechanism works this way:
332
+ ///
333
+ /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
334
+ /// get-or-create a new shared `Arc<()>`. When that subscriber is dropped,
335
+ /// and the number of shared references is about to drop to 0, we notify
336
+ /// this task (below) that this is the case, with the room id.
337
+ /// - This task here, owned by the [`EventCacheInner`], will listen to such
338
+ /// notifications that a room may be shrunk; it will look at the number of
339
+ /// shared references again, and will auto-shrink if the number of shared
340
+ /// references is 0.
341
+ #[ instrument( skip_all) ]
342
+ async fn auto_shrink_linked_chunk_task (
343
+ inner : Arc < EventCacheInner > ,
344
+ mut rx : mpsc:: Receiver < AutoShrinkChannelPayload > ,
345
+ ) {
346
+ while let Some ( room_id) = rx. recv ( ) . await {
347
+ let room = match inner. for_room ( & room_id) . await {
348
+ Ok ( room) => room,
349
+ Err ( err) => {
350
+ warn ! ( for_room = %room_id, "error when getting a RoomEventCache: {err}" ) ;
351
+ continue ;
352
+ }
353
+ } ;
354
+
355
+ let mut state = room. inner . state . write ( ) . await ;
356
+
357
+ match state. auto_shrink_if_no_listeners ( ) . await {
358
+ Ok ( diffs) => {
359
+ if let Some ( diffs) = diffs {
360
+ // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
361
+ // listeners, right? RIGHT? Especially because the state is guarded behind
362
+ // a lock.
363
+ //
364
+ // However, better safe than sorry, and it's cheap to send an update here,
365
+ // so let's do it!
366
+ let _ =
367
+ room. inner . sender . send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
368
+ diffs,
369
+ origin : EventsOrigin :: Cache ,
370
+ } ) ;
371
+ }
372
+ }
373
+
374
+ Err ( err) => {
375
+ // There's not much we can do here, unfortunately.
376
+ warn ! ( for_room = %room_id, "error when attempting to shrink linked chunk: {err}" ) ;
377
+ }
378
+ }
379
+ }
380
+ }
381
+
312
382
/// Return a room-specific view over the [`EventCache`].
313
383
pub ( crate ) async fn for_room (
314
384
& self ,
@@ -534,8 +604,18 @@ struct EventCacheInner {
534
604
535
605
/// Handles to keep alive the task listening to updates.
536
606
drop_handles : OnceLock < Arc < EventCacheDropHandles > > ,
607
+
608
+ /// A sender for notifications that a room *may* need to be auto-shrunk.
609
+ ///
610
+ /// Needs to live here, so it may be passed to each [`RoomEventCache`]
611
+ /// instance.
612
+ ///
613
+ /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
614
+ auto_shrink_sender : OnceLock < mpsc:: Sender < AutoShrinkChannelPayload > > ,
537
615
}
538
616
617
+ type AutoShrinkChannelPayload = OwnedRoomId ;
618
+
539
619
impl EventCacheInner {
540
620
fn client ( & self ) -> Result < Client > {
541
621
self . client . get ( ) . ok_or ( EventCacheError :: ClientDropped )
@@ -644,12 +724,20 @@ impl EventCacheInner {
644
724
RoomVersionId :: V1
645
725
} ) ;
646
726
727
+ // SAFETY: we must have subscribed before reaching this coed, otherwise
728
+ // something is very wrong.
729
+ let auto_shrink_sender =
730
+ self . auto_shrink_sender . get ( ) . cloned ( ) . expect (
731
+ "we must have called `EventCache::subscribe()` before calling here." ,
732
+ ) ;
733
+
647
734
let room_event_cache = RoomEventCache :: new (
648
735
self . client . clone ( ) ,
649
736
room_state,
650
737
room_id. to_owned ( ) ,
651
738
room_version,
652
739
self . all_events . clone ( ) ,
740
+ auto_shrink_sender,
653
741
) ;
654
742
655
743
by_room_guard. insert ( room_id. to_owned ( ) , room_event_cache. clone ( ) ) ;
@@ -718,6 +806,9 @@ pub enum EventsOrigin {
718
806
719
807
/// Events are coming from pagination.
720
808
Pagination ,
809
+
810
+ /// The cause of the change is purely internal to the cache.
811
+ Cache ,
721
812
}
722
813
723
814
#[ cfg( test) ]
0 commit comments