@@ -57,7 +57,7 @@ use ruma::{
57
57
} ;
58
58
use tokio:: sync:: {
59
59
broadcast:: { error:: RecvError , Receiver } ,
60
- Mutex , RwLock ,
60
+ mpsc , Mutex , RwLock ,
61
61
} ;
62
62
use tracing:: { error, info, info_span, instrument, trace, warn, Instrument as _, Span } ;
63
63
@@ -129,6 +129,9 @@ pub struct EventCacheDropHandles {
129
129
130
130
/// Task that listens to updates to the user's ignored list.
131
131
ignore_user_list_update_task : JoinHandle < ( ) > ,
132
+
133
+ /// The task used to automatically shrink the linked chunks.
134
+ auto_shrink_linked_chunk_task : JoinHandle < ( ) > ,
132
135
}
133
136
134
137
impl Debug for EventCacheDropHandles {
@@ -141,6 +144,7 @@ impl Drop for EventCacheDropHandles {
141
144
fn drop ( & mut self ) {
142
145
self . listen_updates_task . abort ( ) ;
143
146
self . ignore_user_list_update_task . abort ( ) ;
147
+ self . auto_shrink_linked_chunk_task . abort ( ) ;
144
148
}
145
149
}
146
150
@@ -172,6 +176,7 @@ impl EventCache {
172
176
by_room : Default :: default ( ) ,
173
177
drop_handles : Default :: default ( ) ,
174
178
all_events : Default :: default ( ) ,
179
+ auto_shrink_sender : Default :: default ( ) ,
175
180
} ) ,
176
181
}
177
182
}
@@ -213,7 +218,19 @@ impl EventCache {
213
218
client. subscribe_to_ignore_user_list_changes ( ) ,
214
219
) ) ;
215
220
216
- Arc :: new ( EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task } )
221
+ let ( tx, rx) = mpsc:: channel ( 32 ) ;
222
+
223
+ // Force-initialize the sender in the [`RoomEventCacheInner`].
224
+ self . inner . auto_shrink_sender . get_or_init ( || tx) ;
225
+
226
+ let auto_shrink_linked_chunk_tasks =
227
+ spawn ( Self :: auto_shrink_linked_chunk_task ( self . inner . clone ( ) , rx) ) ;
228
+
229
+ Arc :: new ( EventCacheDropHandles {
230
+ listen_updates_task,
231
+ ignore_user_list_update_task,
232
+ auto_shrink_linked_chunk_task : auto_shrink_linked_chunk_tasks,
233
+ } )
217
234
} ) ;
218
235
219
236
Ok ( ( ) )
@@ -309,6 +326,60 @@ impl EventCache {
309
326
}
310
327
}
311
328
329
+ /// Spawns the task that will listen to auto-shrink notifications.
330
+ ///
331
+ /// The auto-shrink mechanism works this way:
332
+ ///
333
+ /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
334
+ /// increment the active number of listeners to that room, aka
335
+ /// [`RoomEventCacheState::listener_count`].
336
+ /// - When that subscriber is dropped, it will decrement that count; and
337
+ /// notify the task below if it reached 0.
338
+ /// - The task spawned here, owned by the [`EventCacheInner`], will listen
339
+ /// to such notifications that a room may be shrunk. It will attempt an
340
+ /// auto-shrink, by letting the inner state decide whether this is a good
341
+ /// time to do so (new listeners might have spawned in the meanwhile).
342
+ #[ instrument( skip_all) ]
343
+ async fn auto_shrink_linked_chunk_task (
344
+ inner : Arc < EventCacheInner > ,
345
+ mut rx : mpsc:: Receiver < AutoShrinkChannelPayload > ,
346
+ ) {
347
+ while let Some ( room_id) = rx. recv ( ) . await {
348
+ let room = match inner. for_room ( & room_id) . await {
349
+ Ok ( room) => room,
350
+ Err ( err) => {
351
+ warn ! ( for_room = %room_id, "error when getting a RoomEventCache: {err}" ) ;
352
+ continue ;
353
+ }
354
+ } ;
355
+
356
+ let mut state = room. inner . state . write ( ) . await ;
357
+
358
+ match state. auto_shrink_if_no_listeners ( ) . await {
359
+ Ok ( diffs) => {
360
+ if let Some ( diffs) = diffs {
361
+ // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
362
+ // listeners, right? RIGHT? Especially because the state is guarded behind
363
+ // a lock.
364
+ //
365
+ // However, better safe than sorry, and it's cheap to send an update here,
366
+ // so let's do it!
367
+ let _ =
368
+ room. inner . sender . send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
369
+ diffs,
370
+ origin : EventsOrigin :: Cache ,
371
+ } ) ;
372
+ }
373
+ }
374
+
375
+ Err ( err) => {
376
+ // There's not much we can do here, unfortunately.
377
+ warn ! ( for_room = %room_id, "error when attempting to shrink linked chunk: {err}" ) ;
378
+ }
379
+ }
380
+ }
381
+ }
382
+
312
383
/// Return a room-specific view over the [`EventCache`].
313
384
pub ( crate ) async fn for_room (
314
385
& self ,
@@ -534,8 +605,18 @@ struct EventCacheInner {
534
605
535
606
/// Handles to keep alive the task listening to updates.
536
607
drop_handles : OnceLock < Arc < EventCacheDropHandles > > ,
608
+
609
+ /// A sender for notifications that a room *may* need to be auto-shrunk.
610
+ ///
611
+ /// Needs to live here, so it may be passed to each [`RoomEventCache`]
612
+ /// instance.
613
+ ///
614
+ /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
615
+ auto_shrink_sender : OnceLock < mpsc:: Sender < AutoShrinkChannelPayload > > ,
537
616
}
538
617
618
+ type AutoShrinkChannelPayload = OwnedRoomId ;
619
+
539
620
impl EventCacheInner {
540
621
fn client ( & self ) -> Result < Client > {
541
622
self . client . get ( ) . ok_or ( EventCacheError :: ClientDropped )
@@ -644,12 +725,20 @@ impl EventCacheInner {
644
725
RoomVersionId :: V1
645
726
} ) ;
646
727
728
+ // SAFETY: we must have subscribed before reaching this coed, otherwise
729
+ // something is very wrong.
730
+ let auto_shrink_sender =
731
+ self . auto_shrink_sender . get ( ) . cloned ( ) . expect (
732
+ "we must have called `EventCache::subscribe()` before calling here." ,
733
+ ) ;
734
+
647
735
let room_event_cache = RoomEventCache :: new (
648
736
self . client . clone ( ) ,
649
737
room_state,
650
738
room_id. to_owned ( ) ,
651
739
room_version,
652
740
self . all_events . clone ( ) ,
741
+ auto_shrink_sender,
653
742
) ;
654
743
655
744
by_room_guard. insert ( room_id. to_owned ( ) , room_event_cache. clone ( ) ) ;
@@ -718,6 +807,9 @@ pub enum EventsOrigin {
718
807
719
808
/// Events are coming from pagination.
720
809
Pagination ,
810
+
811
+ /// The cause of the change is purely internal to the cache.
812
+ Cache ,
721
813
}
722
814
723
815
#[ cfg( test) ]
0 commit comments