@@ -31,7 +31,7 @@ use tokio::sync::{
31
31
broadcast:: { Receiver , Sender } ,
32
32
Notify , RwLock ,
33
33
} ;
34
- use tracing:: { trace, warn} ;
34
+ use tracing:: { error , trace, warn} ;
35
35
36
36
use super :: {
37
37
paginator:: { Paginator , PaginatorState } ,
@@ -92,21 +92,25 @@ impl RoomEventCache {
92
92
93
93
/// Try to find an event by id in this room.
94
94
pub async fn event ( & self , event_id : & EventId ) -> Option < TimelineEvent > {
95
- if let Some ( ( room_id, event) ) =
95
+ // Search in all loaded or stored events.
96
+ let Ok ( maybe_position_and_event) = self . inner . state . read ( ) . await . event ( event_id) . await
97
+ else {
98
+ error ! ( "Failed to find the event" ) ;
99
+
100
+ return None ;
101
+ } ;
102
+
103
+ if let Some ( event) = maybe_position_and_event. map ( |( _position, event) | event) {
104
+ Some ( event)
105
+ }
106
+ // Search in `AllEventsCache` for known events that are not stored.
107
+ else if let Some ( ( room_id, event) ) =
96
108
self . inner . all_events . read ( ) . await . events . get ( event_id) . cloned ( )
97
109
{
98
- if room_id == self . inner . room_id {
99
- return Some ( event) ;
100
- }
101
- }
102
-
103
- let state = self . inner . state . read ( ) . await ;
104
- for ( _pos, event) in state. events ( ) . revents ( ) {
105
- if event. event_id ( ) . as_deref ( ) == Some ( event_id) {
106
- return Some ( event. clone ( ) ) ;
107
- }
110
+ ( room_id == self . inner . room_id ) . then_some ( event)
111
+ } else {
112
+ None
108
113
}
109
- None
110
114
}
111
115
112
116
/// Try to find an event by id in this room, along with its related events.
@@ -524,11 +528,11 @@ mod private {
524
528
use matrix_sdk_base:: {
525
529
deserialized_responses:: { TimelineEvent , TimelineEventKind } ,
526
530
event_cache:: { store:: EventCacheStoreLock , Event } ,
527
- linked_chunk:: { lazy_loader, ChunkContent , Update } ,
531
+ linked_chunk:: { lazy_loader, ChunkContent , Position , Update } ,
528
532
} ;
529
533
use matrix_sdk_common:: executor:: spawn;
530
534
use once_cell:: sync:: OnceCell ;
531
- use ruma:: { serde:: Raw , OwnedEventId , OwnedRoomId } ;
535
+ use ruma:: { serde:: Raw , EventId , OwnedEventId , OwnedRoomId } ;
532
536
use tracing:: { error, instrument, trace} ;
533
537
534
538
use super :: { events:: RoomEvents , LoadMoreEventsBackwardsOutcome } ;
@@ -850,6 +854,31 @@ mod private {
850
854
& self . events
851
855
}
852
856
857
+ /// Find a single event in this room.
858
+ pub async fn event (
859
+ & self ,
860
+ event_id : & EventId ,
861
+ ) -> Result < Option < ( Position , TimelineEvent ) > , EventCacheError > {
862
+ let room_id = self . room . as_ref ( ) ;
863
+
864
+ // There is supposedly less events loaded in memory than in the store. Let's
865
+ // start by looking up in the `RoomEvents`.
866
+ for ( position, event) in self . events ( ) . revents ( ) {
867
+ if event. event_id ( ) . as_deref ( ) == Some ( event_id) {
868
+ return Ok ( Some ( ( position, event. clone ( ) ) ) ) ;
869
+ }
870
+ }
871
+
872
+ let Some ( store) = self . store . get ( ) else {
873
+ // No store, event is not present.
874
+ return Ok ( None ) ;
875
+ } ;
876
+
877
+ let store = store. lock ( ) . await ?;
878
+
879
+ Ok ( store. find_event ( room_id, event_id) . await ?)
880
+ }
881
+
853
882
/// Gives a temporary mutable handle to the underlying in-memory events,
854
883
/// and will propagate changes to the storage once done.
855
884
///
@@ -1372,33 +1401,33 @@ mod tests {
1372
1401
1373
1402
let ( items, mut stream) = room_event_cache. subscribe ( ) . await ;
1374
1403
1375
- // The rooms knows about some cached events.
1404
+ // The rooms knows about all cached events.
1376
1405
{
1377
- // The chunk containing this event is not loaded yet
1378
- assert ! ( room_event_cache. event( event_id1) . await . is_none( ) ) ;
1379
- // The chunk containing this event **is** loaded.
1406
+ assert ! ( room_event_cache. event( event_id1) . await . is_some( ) ) ;
1380
1407
assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1408
+ }
1381
1409
1382
- // The reloaded room must contain only one event.
1410
+ // But only part of events are loaded from the store
1411
+ {
1412
+ // The room must contain only one event because only one chunk has been loaded.
1383
1413
assert_eq ! ( items. len( ) , 1 ) ;
1384
1414
assert_eq ! ( items[ 0 ] . event_id( ) . unwrap( ) , event_id2) ;
1385
1415
1386
1416
assert ! ( stream. is_empty( ) ) ;
1387
1417
}
1388
1418
1389
- // Let's load more chunks to get all events.
1419
+ // Let's load more chunks to load all events.
1390
1420
{
1391
1421
room_event_cache. pagination ( ) . run_backwards_once ( 20 ) . await . unwrap ( ) ;
1392
1422
1393
1423
assert_let_timeout ! (
1394
1424
Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } ) = stream. recv( )
1395
1425
) ;
1396
1426
assert_eq ! ( diffs. len( ) , 1 ) ;
1397
- assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: _ } ) ;
1398
-
1399
- // The rooms knows about more cached events.
1400
- assert ! ( room_event_cache. event( event_id1) . await . is_some( ) ) ;
1401
- assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1427
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: event } => {
1428
+ // Here you are `event_id1`!
1429
+ assert_eq!( event. event_id( ) . unwrap( ) , event_id1) ;
1430
+ } ) ;
1402
1431
1403
1432
assert ! ( stream. is_empty( ) ) ;
1404
1433
}
@@ -1520,8 +1549,8 @@ mod tests {
1520
1549
assert_eq ! ( items[ 0 ] . event_id( ) . unwrap( ) , event_id2) ;
1521
1550
assert ! ( stream. is_empty( ) ) ;
1522
1551
1523
- // The event cache knows only one event .
1524
- assert ! ( room_event_cache. event( event_id1) . await . is_none ( ) ) ;
1552
+ // The event cache knows only all events though, even if they aren't loaded .
1553
+ assert ! ( room_event_cache. event( event_id1) . await . is_some ( ) ) ;
1525
1554
assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1526
1555
1527
1556
// Let's paginate to load more events.
@@ -1531,11 +1560,9 @@ mod tests {
1531
1560
Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } ) = stream. recv( )
1532
1561
) ;
1533
1562
assert_eq ! ( diffs. len( ) , 1 ) ;
1534
- assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: _ } ) ;
1535
-
1536
- // The event cache knows about the two events now!
1537
- assert ! ( room_event_cache. event( event_id1) . await . is_some( ) ) ;
1538
- assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1563
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: event } => {
1564
+ assert_eq!( event. event_id( ) . unwrap( ) , event_id1) ;
1565
+ } ) ;
1539
1566
1540
1567
assert ! ( stream. is_empty( ) ) ;
1541
1568
0 commit comments