42
42
43
43
use std:: { collections:: BTreeMap , fmt:: Debug , sync:: Arc } ;
44
44
45
- use matrix_sdk:: { sync :: RoomUpdate , Client , Room } ;
45
+ use matrix_sdk:: { Client , Room } ;
46
46
use matrix_sdk_base:: {
47
47
deserialized_responses:: { AmbiguityChange , SyncTimelineEvent } ,
48
48
sync:: { JoinedRoomUpdate , LeftRoomUpdate , Timeline } ,
@@ -54,10 +54,13 @@ use ruma::{
54
54
} ;
55
55
use tokio:: {
56
56
spawn,
57
- sync:: broadcast:: { error:: RecvError , Receiver , Sender } ,
57
+ sync:: {
58
+ broadcast:: { error:: RecvError , Receiver , Sender } ,
59
+ RwLock ,
60
+ } ,
58
61
task:: JoinHandle ,
59
62
} ;
60
- use tracing:: { debug , error, trace} ;
63
+ use tracing:: { error, trace} ;
61
64
62
65
use self :: store:: { EventCacheStore , MemoryStore } ;
63
66
@@ -75,11 +78,17 @@ pub enum EventCacheError {
75
78
pub type Result < T > = std:: result:: Result < T , EventCacheError > ;
76
79
77
80
/// Hold handles to the tasks spawn by a [`RoomEventCache`].
78
- struct RoomCacheDropHandles {
81
+ pub struct EventCacheDropHandles {
79
82
listen_updates_task : JoinHandle < ( ) > ,
80
83
}
81
84
82
- impl Drop for RoomCacheDropHandles {
85
+ impl Debug for EventCacheDropHandles {
86
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
87
+ f. debug_struct ( "EventCacheDropHandles" ) . finish_non_exhaustive ( )
88
+ }
89
+ }
90
+
91
+ impl Drop for EventCacheDropHandles {
83
92
fn drop ( & mut self ) {
84
93
self . listen_updates_task . abort ( ) ;
85
94
}
@@ -89,12 +98,9 @@ impl Drop for RoomCacheDropHandles {
89
98
///
90
99
/// See also the module-level comment.
91
100
pub struct EventCache {
92
- /// Reference to the client used to navigate this cache.
93
- client : Client ,
94
- /// Lazily-filled cache of live [`RoomEventCache`], once per room.
95
- by_room : BTreeMap < OwnedRoomId , RoomEventCache > ,
96
- /// Backend used for storage.
97
- store : Arc < dyn EventCacheStore > ,
101
+ inner : Arc < RwLock < EventCacheInner > > ,
102
+
103
+ drop_handles : Arc < EventCacheDropHandles > ,
98
104
}
99
105
100
106
impl Debug for EventCache {
@@ -106,26 +112,97 @@ impl Debug for EventCache {
106
112
impl EventCache {
107
113
/// Create a new [`EventCache`] for the given client.
108
114
pub fn new ( client : Client ) -> Self {
115
+ let mut room_updates_feed = client. subscribe_to_all_room_updates ( ) ;
116
+
109
117
let store = Arc :: new ( MemoryStore :: new ( ) ) ;
110
- Self { client, by_room : Default :: default ( ) , store }
118
+ let inner =
119
+ Arc :: new ( RwLock :: new ( EventCacheInner { client, by_room : Default :: default ( ) , store } ) ) ;
120
+
121
+ // Spawn the task that will listen to all the room updates at once.
122
+ trace ! ( "Spawning the listen task" ) ;
123
+ let listen_updates_task = spawn ( {
124
+ let inner = inner. clone ( ) ;
125
+
126
+ async move {
127
+ loop {
128
+ match room_updates_feed. recv ( ) . await {
129
+ Ok ( updates) => {
130
+ // We received some room updates. Handle them.
131
+
132
+ // Left rooms.
133
+ for ( room_id, left_room_update) in updates. leave {
134
+ let room = match inner. write ( ) . await . for_room ( & room_id) . await {
135
+ Ok ( room) => room,
136
+ Err ( err) => {
137
+ error ! ( "can't get left room {room_id}: {err}" ) ;
138
+ continue ;
139
+ }
140
+ } ;
141
+
142
+ if let Err ( err) =
143
+ room. inner . handle_left_room_update ( left_room_update) . await
144
+ {
145
+ error ! ( "handling left room update: {err}" ) ;
146
+ }
147
+ }
148
+
149
+ // Joined rooms.
150
+ for ( room_id, joined_room_update) in updates. join {
151
+ let room = match inner. write ( ) . await . for_room ( & room_id) . await {
152
+ Ok ( room) => room,
153
+ Err ( err) => {
154
+ error ! ( "can't get joined room {room_id}: {err}" ) ;
155
+ continue ;
156
+ }
157
+ } ;
158
+
159
+ if let Err ( err) =
160
+ room. inner . handle_joined_room_update ( joined_room_update) . await
161
+ {
162
+ error ! ( "handling joined room update: {err}" ) ;
163
+ }
164
+ }
165
+
166
+ // Invited rooms.
167
+ // TODO: we don't anything with `updates.invite` at
168
+ // this point.
169
+ }
170
+
171
+ Err ( RecvError :: Lagged ( _) ) => {
172
+ // Forget everything we know; we could have missed events, and we have
173
+ // no way to reconcile at the moment!
174
+ // TODO: implement Smart Matching™,
175
+ let mut inner = inner. write ( ) . await ;
176
+ for room_id in inner. by_room . keys ( ) {
177
+ if let Err ( err) = inner. store . clear_room_events ( room_id) . await {
178
+ error ! ( "unable to clear room after room updates lag: {err}" ) ;
179
+ }
180
+ }
181
+ inner. by_room . clear ( ) ;
182
+ }
183
+
184
+ Err ( RecvError :: Closed ) => {
185
+ // The sender has shut down, exit.
186
+ break ;
187
+ }
188
+ }
189
+ }
190
+ }
191
+ } ) ;
192
+
193
+ Self { inner, drop_handles : Arc :: new ( EventCacheDropHandles { listen_updates_task } ) }
111
194
}
112
195
113
196
/// Return a room-specific view over the [`EventCache`].
114
197
///
115
198
/// It may not be found, if the room isn't known to the client.
116
- pub fn for_room ( & mut self , room_id : & RoomId ) -> Result < RoomEventCache > {
117
- match self . by_room . get ( room_id) {
118
- Some ( room) => Ok ( room. clone ( ) ) ,
119
- None => {
120
- let room = self
121
- . client
122
- . get_room ( room_id)
123
- . ok_or_else ( || EventCacheError :: RoomNotFound ( room_id. to_owned ( ) ) ) ?;
124
- let room_event_cache = RoomEventCache :: new ( room, self . store . clone ( ) ) ;
125
- self . by_room . insert ( room_id. to_owned ( ) , room_event_cache. clone ( ) ) ;
126
- Ok ( room_event_cache)
127
- }
128
- }
199
+ pub async fn for_room (
200
+ & self ,
201
+ room_id : & RoomId ,
202
+ ) -> Result < ( RoomEventCache , Arc < EventCacheDropHandles > ) > {
203
+ let room = self . inner . write ( ) . await . for_room ( room_id) . await ?;
204
+
205
+ Ok ( ( room, self . drop_handles . clone ( ) ) )
129
206
}
130
207
131
208
/// Add an initial set of events to the event cache, reloaded from a cache.
@@ -137,20 +214,51 @@ impl EventCache {
137
214
room_id : & RoomId ,
138
215
events : Vec < SyncTimelineEvent > ,
139
216
) -> Result < ( ) > {
140
- let room_cache = self . for_room ( room_id) ?;
217
+ let room_cache = self . inner . write ( ) . await . for_room ( room_id) . await ?;
141
218
room_cache. inner . append_events ( events) . await ?;
142
219
Ok ( ( ) )
143
220
}
144
221
}
145
222
223
+ struct EventCacheInner {
224
+ /// Reference to the client used to navigate this cache.
225
+ client : Client ,
226
+
227
+ /// Lazily-filled cache of live [`RoomEventCache`], once per room.
228
+ by_room : BTreeMap < OwnedRoomId , RoomEventCache > ,
229
+
230
+ /// Backend used for storage.
231
+ store : Arc < dyn EventCacheStore > ,
232
+ }
233
+
234
+ impl EventCacheInner {
235
+ /// Return a room-specific view over the [`EventCache`].
236
+ ///
237
+ /// It may not be found, if the room isn't known to the client.
238
+ async fn for_room ( & mut self , room_id : & RoomId ) -> Result < RoomEventCache > {
239
+ match self . by_room . get ( room_id) {
240
+ Some ( room) => Ok ( room. clone ( ) ) ,
241
+ None => {
242
+ let room = self
243
+ . client
244
+ . get_room ( room_id)
245
+ . ok_or_else ( || EventCacheError :: RoomNotFound ( room_id. to_owned ( ) ) ) ?;
246
+ let room_event_cache = RoomEventCache :: new ( room, self . store . clone ( ) ) ;
247
+
248
+ self . by_room . insert ( room_id. to_owned ( ) , room_event_cache. clone ( ) ) ;
249
+
250
+ Ok ( room_event_cache)
251
+ }
252
+ }
253
+ }
254
+ }
255
+
146
256
/// A subset of an event cache, for a room.
147
257
///
148
258
/// Cloning is shallow, and thus is cheap to do.
149
259
#[ derive( Clone ) ]
150
260
pub struct RoomEventCache {
151
261
inner : Arc < RoomEventCacheInner > ,
152
-
153
- _drop_handles : Arc < RoomCacheDropHandles > ,
154
262
}
155
263
156
264
impl Debug for RoomEventCache {
@@ -162,8 +270,7 @@ impl Debug for RoomEventCache {
162
270
impl RoomEventCache {
163
271
/// Create a new [`RoomEventCache`] using the given room and store.
164
272
fn new ( room : Room , store : Arc < dyn EventCacheStore > ) -> Self {
165
- let ( inner, drop_handles) = RoomEventCacheInner :: new ( room, store) ;
166
- Self { inner, _drop_handles : drop_handles }
273
+ Self { inner : Arc :: new ( RoomEventCacheInner :: new ( room, store) ) }
167
274
}
168
275
169
276
/// Subscribe to room updates for this room, after getting the initial list
@@ -189,14 +296,9 @@ struct RoomEventCacheInner {
189
296
impl RoomEventCacheInner {
190
297
/// Creates a new cache for a room, and subscribes to room updates, so as
191
298
/// to handle new timeline events.
192
- fn new ( room : Room , store : Arc < dyn EventCacheStore > ) -> ( Arc < Self > , Arc < RoomCacheDropHandles > ) {
299
+ fn new ( room : Room , store : Arc < dyn EventCacheStore > ) -> Self {
193
300
let sender = Sender :: new ( 32 ) ;
194
-
195
- let room_cache = Arc :: new ( Self { room, store, sender } ) ;
196
-
197
- let listen_updates_task = spawn ( Self :: listen_task ( room_cache. clone ( ) ) ) ;
198
-
199
- ( room_cache, Arc :: new ( RoomCacheDropHandles { listen_updates_task } ) )
301
+ Self { room, store, sender }
200
302
}
201
303
202
304
async fn handle_joined_room_update ( & self , updates : JoinedRoomUpdate ) -> Result < ( ) > {
@@ -250,58 +352,6 @@ impl RoomEventCacheInner {
250
352
Ok ( ( ) )
251
353
}
252
354
253
- async fn listen_task ( this : Arc < Self > ) {
254
- // TODO for prototyping, i'm spawning a new task to get the room updates.
255
- // Ideally we'd have something like the whole sync update, a generalisation of
256
- // the room update.
257
- trace ! ( "Spawning the listen task" ) ;
258
-
259
- let mut update_receiver = this. room . client ( ) . subscribe_to_room_updates ( this. room . room_id ( ) ) ;
260
-
261
- loop {
262
- match update_receiver. recv ( ) . await {
263
- Ok ( update) => {
264
- trace ! ( "Listen task received an update" ) ;
265
-
266
- match update {
267
- RoomUpdate :: Left { updates, .. } => {
268
- if let Err ( err) = this. handle_left_room_update ( updates) . await {
269
- error ! ( "handling left room update: {err}" ) ;
270
- }
271
- }
272
- RoomUpdate :: Joined { updates, .. } => {
273
- if let Err ( err) = this. handle_joined_room_update ( updates) . await {
274
- error ! ( "handling joined room update: {err}" ) ;
275
- }
276
- }
277
- RoomUpdate :: Invited { .. } => {
278
- // We don't do anything for invited rooms at this
279
- // point. TODO should
280
- // we?
281
- }
282
- }
283
- }
284
-
285
- Err ( RecvError :: Closed ) => {
286
- // The loop terminated successfully.
287
- debug ! ( "Listen task closed" ) ;
288
- break ;
289
- }
290
-
291
- Err ( RecvError :: Lagged ( _) ) => {
292
- // Since we've lagged behind updates to this room, we might be out of
293
- // sync with the events, leading to potentially lost events. Play it
294
- // safe here, and clear the cache. It's fine because we can retrigger
295
- // backpagination from the last event at any time, if needs be.
296
- debug ! ( "Listen task lagged, clearing room" ) ;
297
- if let Err ( err) = this. store . clear_room_events ( this. room . room_id ( ) ) . await {
298
- error ! ( "unable to clear room after room updates lag: {err}" ) ;
299
- }
300
- }
301
- }
302
- }
303
- }
304
-
305
355
/// Append a set of events to the room cache and storage, notifying
306
356
/// observers.
307
357
async fn append_events ( & self , events : Vec < SyncTimelineEvent > ) -> Result < ( ) > {
0 commit comments