Skip to content

Commit 291665b

Browse files
committed
feat(event cache): auto-shrink a room event cache's chunk after all listeners are left
1 parent f3f37a3 commit 291665b

File tree

1 file changed

+229
-8
lines changed
  • crates/matrix-sdk/src/event_cache/room

1 file changed

+229
-8
lines changed

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 229 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414

1515
//! All event cache types for a single room.
1616
17-
use std::{collections::BTreeMap, fmt, sync::Arc};
17+
use std::{
18+
collections::BTreeMap,
19+
fmt,
20+
ops::{Deref, DerefMut},
21+
sync::Arc,
22+
};
1823

1924
use events::Gap;
2025
use eyeball_im::VectorDiff;
@@ -27,9 +32,12 @@ use ruma::{
2732
serde::Raw,
2833
EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
2934
};
30-
use tokio::sync::{
31-
broadcast::{Receiver, Sender},
32-
Notify, RwLock,
35+
use tokio::{
36+
spawn,
37+
sync::{
38+
broadcast::{Receiver, Sender},
39+
Notify, RwLock,
40+
},
3341
};
3442
use tracing::{trace, warn};
3543

@@ -55,6 +63,84 @@ impl fmt::Debug for RoomEventCache {
5563
}
5664
}
5765

66+
/// A RAII data structure that will automatically shrink the current room's
67+
/// linked chunk on drop.
68+
pub(super) struct AutoShrinkLinkedChunk {
69+
/// The related room event cache, wrapped in an option so as to take it from
70+
/// the drop implementation.
71+
room_event_cache: Option<RoomEventCache>,
72+
}
73+
74+
impl Drop for AutoShrinkLinkedChunk {
75+
fn drop(&mut self) {
76+
let room_event_cache = self.room_event_cache.take().expect("drop called multiple times");
77+
78+
// I hear you from the future: "but, spawning a detached task in a drop
79+
// implementation is real bad! Maybe there will be multiple shrinks
80+
// happening at the same time, and that's bad!". However, this can't
81+
// happen, because the whole `state` variable is guarded by a fair lock,
82+
// which will run queries in the order they happen. Should be fine™.
83+
spawn(async move {
84+
let mut state = room_event_cache.inner.state.write().await;
85+
86+
if let Err(err) = state.shrink_to_last_chunk().await {
87+
warn!("error when trying to shrink the linked chunk: {err}");
88+
}
89+
});
90+
}
91+
}
92+
93+
/// Thin wrapper for a room event cache listener, so as to trigger side-effects
94+
/// when all listeners are gone.
95+
#[allow(missing_debug_implementations)]
96+
pub struct RoomEventCacheListener {
97+
/// Underlying receiver of the room event cache's updates.
98+
recv: Receiver<RoomEventCacheUpdate>,
99+
100+
/// Shared instance of the auto-shrinker.
101+
auto_shrinker: Arc<AutoShrinkLinkedChunk>,
102+
}
103+
104+
impl Drop for RoomEventCacheListener {
105+
fn drop(&mut self) {
106+
if Arc::strong_count(&self.auto_shrinker) == 2 {
107+
// There are only two instances of the auto-shrinker: the one in the
108+
// `RoomEventCacheState`, and the one here.
109+
110+
// The one here is about to be dropped. The only remaining one will thus be the
111+
// one in the [`RoomEventCacheState`]. Reset it manually, so as to
112+
// trigger its drop.
113+
114+
let room_event_cache = self
115+
.auto_shrinker
116+
.room_event_cache
117+
.as_ref()
118+
.expect("drop called multiple times")
119+
.clone();
120+
121+
spawn(async move {
122+
let mut state = room_event_cache.inner.state.write().await;
123+
// Decrease the strong count of the auto-shrinker to 1.
124+
state.auto_shrinker = None;
125+
});
126+
}
127+
}
128+
}
129+
130+
impl Deref for RoomEventCacheListener {
131+
type Target = Receiver<RoomEventCacheUpdate>;
132+
133+
fn deref(&self) -> &Self::Target {
134+
&self.recv
135+
}
136+
}
137+
138+
impl DerefMut for RoomEventCacheListener {
139+
fn deref_mut(&mut self) -> &mut Self::Target {
140+
&mut self.recv
141+
}
142+
}
143+
58144
impl RoomEventCache {
59145
/// Create a new [`RoomEventCache`] using the given room and store.
60146
pub(super) fn new(
@@ -77,11 +163,20 @@ impl RoomEventCache {
77163

78164
/// Subscribe to this room updates, after getting the initial list of
79165
/// events.
80-
pub async fn subscribe(&self) -> (Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>) {
81-
let state = self.inner.state.read().await;
166+
pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
167+
let mut state = self.inner.state.write().await;
82168
let events = state.events().events().map(|(_position, item)| item.clone()).collect();
83169

84-
(events, self.inner.sender.subscribe())
170+
let listener_count = state
171+
.auto_shrinker
172+
.get_or_insert_with(|| {
173+
Arc::new(AutoShrinkLinkedChunk { room_event_cache: Some(self.clone()) })
174+
})
175+
.clone();
176+
let recv = self.inner.sender.subscribe();
177+
let listener = RoomEventCacheListener { recv, auto_shrinker: listener_count };
178+
179+
(events, listener)
85180
}
86181

87182
/// Return a [`RoomPagination`] API object useful for running
@@ -556,7 +651,7 @@ mod private {
556651
use ruma::{serde::Raw, OwnedEventId, OwnedRoomId};
557652
use tracing::{debug, error, instrument, trace};
558653

559-
use super::{events::RoomEvents, LoadMoreEventsBackwardsOutcome};
654+
use super::{events::RoomEvents, AutoShrinkLinkedChunk, LoadMoreEventsBackwardsOutcome};
560655
use crate::event_cache::{deduplicator::Deduplicator, EventCacheError};
561656

562657
/// State for a single room's event cache.
@@ -584,6 +679,11 @@ mod private {
584679
/// the first time we try to run backward pagination. We reset
585680
/// that upon clearing the timeline events.
586681
pub waited_for_initial_prev_token: bool,
682+
683+
/// A shared auto-shrinker, shared among all the live subscribers.
684+
///
685+
/// If set to `None`, means there are no
686+
pub(super) auto_shrinker: Option<Arc<AutoShrinkLinkedChunk>>,
587687
}
588688

589689
impl RoomEventCacheState {
@@ -640,6 +740,7 @@ mod private {
640740
events,
641741
deduplicator,
642742
waited_for_initial_prev_token: false,
743+
auto_shrinker: None,
643744
})
644745
}
645746

@@ -1986,4 +2087,124 @@ mod tests {
19862087
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
19872088
assert!(outcome.reached_start);
19882089
}
2090+
2091+
#[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2092+
#[async_test]
2093+
async fn test_auto_shrink_after_all_subscribers_are_gone() {
2094+
use std::ops::Not as _;
2095+
2096+
use eyeball_im::VectorDiff;
2097+
use tokio::task::yield_now;
2098+
2099+
use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2100+
2101+
let room_id = room_id!("!galette:saucisse.bzh");
2102+
2103+
let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2104+
2105+
let f = EventFactory::new().room(room_id);
2106+
2107+
let evid1 = event_id!("$1");
2108+
let evid2 = event_id!("$2");
2109+
2110+
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2111+
let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2112+
2113+
// Fill the event cache store with an initial linked chunk with 2 events chunks.
2114+
{
2115+
let store = client.event_cache_store();
2116+
let store = store.lock().await.unwrap();
2117+
store
2118+
.handle_linked_chunk_updates(
2119+
room_id,
2120+
vec![
2121+
Update::NewItemsChunk {
2122+
previous: None,
2123+
new: ChunkIdentifier::new(0),
2124+
next: None,
2125+
},
2126+
Update::PushItems {
2127+
at: Position::new(ChunkIdentifier::new(0), 0),
2128+
items: vec![ev1],
2129+
},
2130+
Update::NewItemsChunk {
2131+
previous: Some(ChunkIdentifier::new(0)),
2132+
new: ChunkIdentifier::new(1),
2133+
next: None,
2134+
},
2135+
Update::PushItems {
2136+
at: Position::new(ChunkIdentifier::new(1), 0),
2137+
items: vec![ev2],
2138+
},
2139+
],
2140+
)
2141+
.await
2142+
.unwrap();
2143+
}
2144+
2145+
let event_cache = client.event_cache();
2146+
event_cache.subscribe().unwrap();
2147+
event_cache.enable_storage().unwrap();
2148+
2149+
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2150+
let room = client.get_room(room_id).unwrap();
2151+
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2152+
2153+
// Sanity check: lazily loaded, so only includes one item at start.
2154+
let (events1, mut stream1) = room_event_cache.subscribe().await;
2155+
assert_eq!(events1.len(), 1);
2156+
assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2157+
assert!(stream1.is_empty());
2158+
2159+
// Force loading the full linked chunk by back-paginating.
2160+
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2161+
assert_eq!(outcome.events.len(), 1);
2162+
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2163+
assert!(outcome.reached_start.not());
2164+
2165+
// We also get an update about the loading from the store. Ignore it, for this
2166+
// test's sake.
2167+
assert_let_timeout!(
2168+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2169+
);
2170+
assert_eq!(diffs.len(), 1);
2171+
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2172+
assert_eq!(value.event_id().as_deref(), Some(evid1));
2173+
});
2174+
2175+
assert!(stream1.is_empty());
2176+
2177+
// Have another listener subscribe to the event cache.
2178+
// Since it's not the first one, and the previous one loaded some more events,
2179+
// the second listener seems them all.
2180+
let (events2, stream2) = room_event_cache.subscribe().await;
2181+
assert_eq!(events2.len(), 2);
2182+
assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2183+
assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2184+
assert!(stream2.is_empty());
2185+
2186+
// Drop the first stream, and wait a bit.
2187+
drop(stream1);
2188+
yield_now().await;
2189+
2190+
// The second stream remains undisturbed.
2191+
assert!(stream2.is_empty());
2192+
2193+
// Now drop the second stream, and wait a bit.
2194+
drop(stream2);
2195+
yield_now().await;
2196+
2197+
// The linked chunk must have auto-shrunk by now.
2198+
2199+
{
2200+
// Check the inner state: there's no more shared auto-shrinker.
2201+
let state = room_event_cache.inner.state.read().await;
2202+
assert!(state.auto_shrinker.is_none());
2203+
}
2204+
2205+
// Getting the events will only give us the latest chunk.
2206+
let (events3, _stream2) = room_event_cache.subscribe().await;
2207+
assert_eq!(events3.len(), 1);
2208+
assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2209+
}
19892210
}

0 commit comments

Comments
 (0)