13
13
// limitations under the License.
14
14
15
15
use std:: {
16
- collections:: BTreeSet ,
16
+ collections:: { BTreeMap , BTreeSet , HashMap } ,
17
17
sync:: { Arc , RwLock } ,
18
18
} ;
19
19
20
+ use async_stream:: stream;
20
21
use dashmap:: { DashMap , DashSet } ;
21
22
use futures_core:: stream:: BoxStream ;
22
23
use lru:: LruCache ;
@@ -28,14 +29,14 @@ use ruma::{
28
29
receipt:: Receipt ,
29
30
room:: member:: { MembershipState , RoomMemberEventContent } ,
30
31
AnyGlobalAccountDataEvent , AnyRoomAccountDataEvent , AnyStrippedStateEvent ,
31
- AnySyncStateEvent , EventType ,
32
+ AnySyncMessageEvent , AnySyncRoomEvent , AnySyncStateEvent , EventType , Redact ,
32
33
} ,
33
34
receipt:: ReceiptType ,
34
35
serde:: Raw ,
35
- EventId , MxcUri , RoomId , UserId ,
36
+ EventId , MxcUri , RoomId , RoomVersionId , UserId ,
36
37
} ;
37
38
#[ allow( unused_imports) ]
38
- use tracing:: info;
39
+ use tracing:: { info, warn } ;
39
40
40
41
use super :: { Result , RoomInfo , StateChanges , StateStore } ;
41
42
use crate :: {
@@ -69,6 +70,7 @@ pub struct MemoryStore {
69
70
> ,
70
71
media : Arc < Mutex < LruCache < String , Vec < u8 > > > > ,
71
72
custom : Arc < DashMap < Vec < u8 > , Vec < u8 > > > ,
73
+ room_timeline : Arc < DashMap < Box < RoomId > , TimelineData > > ,
72
74
}
73
75
74
76
impl MemoryStore {
@@ -94,6 +96,7 @@ impl MemoryStore {
94
96
room_event_receipts : Default :: default ( ) ,
95
97
media : Arc :: new ( Mutex :: new ( LruCache :: new ( 100 ) ) ) ,
96
98
custom : DashMap :: new ( ) . into ( ) ,
99
+ room_timeline : Default :: default ( ) ,
97
100
}
98
101
}
99
102
@@ -275,6 +278,116 @@ impl MemoryStore {
275
278
}
276
279
}
277
280
281
+ for ( room, timeline) in & changes. timeline {
282
+ if timeline. sync {
283
+ info ! ( "Save new timeline batch from sync response for {}" , room) ;
284
+ } else {
285
+ info ! ( "Save new timeline batch from messages response for {}" , room) ;
286
+ }
287
+
288
+ let data = if timeline. limited {
289
+ info ! ( "Delete stored timeline for {} because the sync response was limited" , room) ;
290
+ self . room_timeline . remove ( room) ;
291
+ None
292
+ } else if let Some ( mut data) = self . room_timeline . get_mut ( room) {
293
+ if !timeline. sync && Some ( & timeline. start ) != data. end . as_ref ( ) {
294
+ warn ! ( "Drop unexpected timeline batch for {}" , room) ;
295
+ return Ok ( ( ) ) ;
296
+ }
297
+
298
+ // Check if the event already exists in the store
299
+ let mut delete_timeline = false ;
300
+ for event in & timeline. events {
301
+ if let Some ( event_id) = event. event_id ( ) {
302
+ if data. event_id_to_position . contains_key ( & event_id) {
303
+ delete_timeline = true ;
304
+ break ;
305
+ }
306
+ }
307
+ }
308
+
309
+ if delete_timeline {
310
+ info ! ( "Delete stored timeline for {} because of duplicated events" , room) ;
311
+ self . room_timeline . remove ( room) ;
312
+ None
313
+ } else if timeline. sync {
314
+ data. start = timeline. start . clone ( ) ;
315
+ Some ( data)
316
+ } else {
317
+ data. end = timeline. end . clone ( ) ;
318
+ Some ( data)
319
+ }
320
+ } else {
321
+ None
322
+ } ;
323
+
324
+ let mut data = & mut * if let Some ( data) = data {
325
+ data
326
+ } else {
327
+ let data = TimelineData {
328
+ start : timeline. start . clone ( ) ,
329
+ end : timeline. end . clone ( ) ,
330
+ ..Default :: default ( )
331
+ } ;
332
+ self . room_timeline . insert ( room. to_owned ( ) , data) ;
333
+ self . room_timeline . get_mut ( room) . unwrap ( )
334
+ } ;
335
+
336
+ // Create a copy of the events if the stream created via `room_timeline()` isn't
337
+ // fully consumed
338
+ let data_events = Arc :: make_mut ( & mut data. events ) ;
339
+
340
+ if timeline. sync {
341
+ let mut room_version = None ;
342
+ for event in timeline. events . iter ( ) . rev ( ) {
343
+ // Redact events already in store only on sync response
344
+ if let Ok ( AnySyncRoomEvent :: Message ( AnySyncMessageEvent :: RoomRedaction (
345
+ redaction,
346
+ ) ) ) = event. event . deserialize ( )
347
+ {
348
+ if let Some ( position) = data. event_id_to_position . get ( & redaction. redacts ) {
349
+ if let Some ( mut full_event) = data_events. get_mut ( position) {
350
+ let inner_event = full_event. event . deserialize ( ) ?;
351
+ if room_version. is_none ( ) {
352
+ room_version = Some ( self . room_info
353
+ . get ( room)
354
+ . and_then ( |info| {
355
+ info. base_info
356
+ . create
357
+ . as_ref ( )
358
+ . map ( |event| event. room_version . clone ( ) )
359
+ } ) . unwrap_or_else ( || {
360
+ warn ! ( "Unable to find the room version for {}, assume version 9" , room) ;
361
+ RoomVersionId :: V9
362
+ } ) ) ;
363
+ }
364
+
365
+ full_event. event = Raw :: new ( & AnySyncRoomEvent :: from (
366
+ inner_event. redact ( redaction, room_version. as_ref ( ) . unwrap ( ) ) ,
367
+ ) ) ?;
368
+ }
369
+ }
370
+ }
371
+
372
+ data. start_position -= 1 ;
373
+ // Only add event with id to the position map
374
+ if let Some ( event_id) = event. event_id ( ) {
375
+ data. event_id_to_position . insert ( event_id, data. start_position ) ;
376
+ }
377
+ data_events. insert ( data. start_position , event. to_owned ( ) ) ;
378
+ }
379
+ } else {
380
+ for event in timeline. events . iter ( ) {
381
+ data. end_position += 1 ;
382
+ // Only add event with id to the position map
383
+ if let Some ( event_id) = event. event_id ( ) {
384
+ data. event_id_to_position . insert ( event_id, data. end_position ) ;
385
+ }
386
+ data_events. insert ( data. end_position , event. to_owned ( ) ) ;
387
+ }
388
+ }
389
+ }
390
+
278
391
info ! ( "Saved changes in {:?}" , now. elapsed( ) ) ;
279
392
280
393
Ok ( ( ) )
@@ -456,9 +569,32 @@ impl MemoryStore {
456
569
self . stripped_members . remove ( room_id) ;
457
570
self . room_user_receipts . remove ( room_id) ;
458
571
self . room_event_receipts . remove ( room_id) ;
572
+ self . room_timeline . remove ( room_id) ;
459
573
460
574
Ok ( ( ) )
461
575
}
576
+
577
+ async fn room_timeline (
578
+ & self ,
579
+ room_id : & RoomId ,
580
+ ) -> Result < Option < ( BoxStream < ' static , Result < SyncRoomEvent > > , Option < String > ) > > {
581
+ if let Some ( data) = self . room_timeline . get ( room_id) {
582
+ let events = data. events . clone ( ) ;
583
+ let stream = stream ! {
584
+ for item in events. values( ) {
585
+ yield Ok ( item. to_owned( ) ) ;
586
+ }
587
+ } ;
588
+ info ! (
589
+ "Found previously stored timeline for {}, with end token {:?}" ,
590
+ room_id, data. end
591
+ ) ;
592
+ Ok ( Some ( ( Box :: pin ( stream) , data. end . to_owned ( ) ) ) )
593
+ } else {
594
+ info ! ( "No timeline for {} was previously stored" , room_id) ;
595
+ Ok ( None )
596
+ }
597
+ }
462
598
}
463
599
464
600
#[ cfg_attr( target_arch = "wasm32" , async_trait( ?Send ) ) ]
@@ -612,13 +748,22 @@ impl StateStore for MemoryStore {
612
748
613
749
async fn room_timeline (
614
750
& self ,
615
- _room_id : & RoomId ,
751
+ room_id : & RoomId ,
616
752
) -> Result < Option < ( BoxStream < ' static , Result < SyncRoomEvent > > , Option < String > ) > > {
617
- // The `MemoryStore` doesn't cache any events
618
- Ok ( None )
753
+ self . room_timeline ( room_id) . await
619
754
}
620
755
}
621
756
757
+ #[ derive( Debug , Default ) ]
758
+ struct TimelineData {
759
+ pub start : String ,
760
+ pub start_position : isize ,
761
+ pub end : Option < String > ,
762
+ pub end_position : isize ,
763
+ pub events : Arc < BTreeMap < isize , SyncRoomEvent > > ,
764
+ pub event_id_to_position : HashMap < Box < EventId > , isize > ,
765
+ }
766
+
622
767
#[ cfg( test) ]
623
768
mod test {
624
769
0 commit comments