@@ -31,7 +31,7 @@ use tokio::sync::{
31
31
broadcast:: { Receiver , Sender } ,
32
32
Notify , RwLock ,
33
33
} ;
34
- use tracing:: { trace, warn} ;
34
+ use tracing:: { error , trace, warn} ;
35
35
36
36
use super :: {
37
37
paginator:: { Paginator , PaginatorState } ,
@@ -92,21 +92,25 @@ impl RoomEventCache {
92
92
93
93
/// Try to find an event by id in this room.
94
94
pub async fn event ( & self , event_id : & EventId ) -> Option < TimelineEvent > {
95
- if let Some ( ( room_id, event) ) =
95
+ // Search in all loaded or stored events.
96
+ let Ok ( maybe_position_and_event) = self . inner . state . read ( ) . await . event ( event_id) . await
97
+ else {
98
+ error ! ( "Failed to find the event" ) ;
99
+
100
+ return None ;
101
+ } ;
102
+
103
+ if let Some ( event) = maybe_position_and_event. map ( |( _position, event) | event) {
104
+ Some ( event)
105
+ }
106
+ // Search in `AllEventsCache` for known events that are not stored.
107
+ else if let Some ( ( room_id, event) ) =
96
108
self . inner . all_events . read ( ) . await . events . get ( event_id) . cloned ( )
97
109
{
98
- if room_id == self . inner . room_id {
99
- return Some ( event) ;
100
- }
101
- }
102
-
103
- let state = self . inner . state . read ( ) . await ;
104
- for ( _pos, event) in state. events ( ) . revents ( ) {
105
- if event. event_id ( ) . as_deref ( ) == Some ( event_id) {
106
- return Some ( event. clone ( ) ) ;
107
- }
110
+ ( room_id == self . inner . room_id ) . then_some ( event)
111
+ } else {
112
+ None
108
113
}
109
- None
110
114
}
111
115
112
116
/// Try to find an event by id in this room, along with its related events.
@@ -549,11 +553,11 @@ mod private {
549
553
use matrix_sdk_base:: {
550
554
deserialized_responses:: { TimelineEvent , TimelineEventKind } ,
551
555
event_cache:: { store:: EventCacheStoreLock , Event } ,
552
- linked_chunk:: { lazy_loader, ChunkContent , ChunkIdentifierGenerator , Update } ,
556
+ linked_chunk:: { lazy_loader, ChunkContent , ChunkIdentifierGenerator , Position , Update } ,
553
557
} ;
554
558
use matrix_sdk_common:: executor:: spawn;
555
559
use once_cell:: sync:: OnceCell ;
556
- use ruma:: { serde:: Raw , OwnedEventId , OwnedRoomId } ;
560
+ use ruma:: { serde:: Raw , EventId , OwnedEventId , OwnedRoomId } ;
557
561
use tracing:: { debug, error, instrument, trace} ;
558
562
559
563
use super :: { events:: RoomEvents , LoadMoreEventsBackwardsOutcome } ;
@@ -938,6 +942,31 @@ mod private {
938
942
& self . events
939
943
}
940
944
945
+ /// Find a single event in this room.
946
+ pub async fn event (
947
+ & self ,
948
+ event_id : & EventId ,
949
+ ) -> Result < Option < ( Position , TimelineEvent ) > , EventCacheError > {
950
+ let room_id = self . room . as_ref ( ) ;
951
+
952
+ // There is supposedly less events loaded in memory than in the store. Let's
953
+ // start by looking up in the `RoomEvents`.
954
+ for ( position, event) in self . events ( ) . revents ( ) {
955
+ if event. event_id ( ) . as_deref ( ) == Some ( event_id) {
956
+ return Ok ( Some ( ( position, event. clone ( ) ) ) ) ;
957
+ }
958
+ }
959
+
960
+ let Some ( store) = self . store . get ( ) else {
961
+ // No store, event is not present.
962
+ return Ok ( None ) ;
963
+ } ;
964
+
965
+ let store = store. lock ( ) . await ?;
966
+
967
+ Ok ( store. find_event ( room_id, event_id) . await ?)
968
+ }
969
+
941
970
/// Gives a temporary mutable handle to the underlying in-memory events,
942
971
/// and will propagate changes to the storage once done.
943
972
///
@@ -1460,33 +1489,33 @@ mod tests {
1460
1489
1461
1490
let ( items, mut stream) = room_event_cache. subscribe ( ) . await ;
1462
1491
1463
- // The rooms knows about some cached events.
1492
+ // The rooms knows about all cached events.
1464
1493
{
1465
- // The chunk containing this event is not loaded yet
1466
- assert ! ( room_event_cache. event( event_id1) . await . is_none( ) ) ;
1467
- // The chunk containing this event **is** loaded.
1494
+ assert ! ( room_event_cache. event( event_id1) . await . is_some( ) ) ;
1468
1495
assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1496
+ }
1469
1497
1470
- // The reloaded room must contain only one event.
1498
+ // But only part of events are loaded from the store
1499
+ {
1500
+ // The room must contain only one event because only one chunk has been loaded.
1471
1501
assert_eq ! ( items. len( ) , 1 ) ;
1472
1502
assert_eq ! ( items[ 0 ] . event_id( ) . unwrap( ) , event_id2) ;
1473
1503
1474
1504
assert ! ( stream. is_empty( ) ) ;
1475
1505
}
1476
1506
1477
- // Let's load more chunks to get all events.
1507
+ // Let's load more chunks to load all events.
1478
1508
{
1479
1509
room_event_cache. pagination ( ) . run_backwards_once ( 20 ) . await . unwrap ( ) ;
1480
1510
1481
1511
assert_let_timeout ! (
1482
1512
Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } ) = stream. recv( )
1483
1513
) ;
1484
1514
assert_eq ! ( diffs. len( ) , 1 ) ;
1485
- assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: _ } ) ;
1486
-
1487
- // The rooms knows about more cached events.
1488
- assert ! ( room_event_cache. event( event_id1) . await . is_some( ) ) ;
1489
- assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1515
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: event } => {
1516
+ // Here you are `event_id1`!
1517
+ assert_eq!( event. event_id( ) . unwrap( ) , event_id1) ;
1518
+ } ) ;
1490
1519
1491
1520
assert ! ( stream. is_empty( ) ) ;
1492
1521
}
@@ -1608,8 +1637,8 @@ mod tests {
1608
1637
assert_eq ! ( items[ 0 ] . event_id( ) . unwrap( ) , event_id2) ;
1609
1638
assert ! ( stream. is_empty( ) ) ;
1610
1639
1611
- // The event cache knows only one event .
1612
- assert ! ( room_event_cache. event( event_id1) . await . is_none ( ) ) ;
1640
+ // The event cache knows only all events though, even if they aren't loaded .
1641
+ assert ! ( room_event_cache. event( event_id1) . await . is_some ( ) ) ;
1613
1642
assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1614
1643
1615
1644
// Let's paginate to load more events.
@@ -1619,11 +1648,9 @@ mod tests {
1619
1648
Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } ) = stream. recv( )
1620
1649
) ;
1621
1650
assert_eq ! ( diffs. len( ) , 1 ) ;
1622
- assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: _ } ) ;
1623
-
1624
- // The event cache knows about the two events now!
1625
- assert ! ( room_event_cache. event( event_id1) . await . is_some( ) ) ;
1626
- assert ! ( room_event_cache. event( event_id2) . await . is_some( ) ) ;
1651
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: event } => {
1652
+ assert_eq!( event. event_id( ) . unwrap( ) , event_id1) ;
1653
+ } ) ;
1627
1654
1628
1655
assert ! ( stream. is_empty( ) ) ;
1629
1656
0 commit comments