Skip to content

event cache: add a way to get an event by ID #3682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use matrix_sdk_common::executor::{spawn, JoinHandle};
use ruma::{
events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
serde::Raw,
OwnedEventId, OwnedRoomId, RoomId,
EventId, OwnedEventId, OwnedRoomId, RoomId,
};
use tokio::sync::{
broadcast::{error::RecvError, Receiver, Sender},
Expand Down Expand Up @@ -196,6 +196,23 @@ impl EventCache {
Ok(())
}

/// Try to find an event by its ID in all the rooms.
// NOTE: this does a linear scan, so it could be slow. If performance
// requires it, we could use a direct mapping of event id -> event, and keep it
// in memory until we store it on disk (and this becomes a SQL query by id).
pub async fn event(&self, event_id: &EventId) -> Option<SyncTimelineEvent> {
let by_room = self.inner.by_room.read().await;
for room in by_room.values() {
let events = room.inner.events.read().await;
for (_pos, event) in events.revents() {
if event.event_id().as_deref() == Some(event_id) {
return Some(event.clone());
}
}
}
None
}

#[instrument(skip_all)]
async fn ignore_user_list_update_task(
inner: Arc<EventCacheInner>,
Expand Down Expand Up @@ -759,13 +776,13 @@ pub enum EventsOrigin {
mod tests {
use assert_matches2::assert_matches;
use futures_util::FutureExt as _;
use matrix_sdk_base::sync::JoinedRoomUpdate;
use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
use matrix_sdk_test::async_test;
use ruma::{room_id, serde::Raw};
use ruma::{event_id, room_id, serde::Raw, user_id};
use serde_json::json;

use super::{EventCacheError, RoomEventCacheUpdate};
use crate::test_utils::logged_in_client;
use crate::test_utils::{assert_event_matches_msg, events::EventFactory, logged_in_client};

#[async_test]
async fn test_must_explicitly_subscribe() {
Expand Down Expand Up @@ -828,4 +845,60 @@ mod tests {

assert!(stream.recv().now_or_never().is_none());
}

#[async_test]
async fn test_get_event_by_id() {
let client = logged_in_client(None).await;
let room1 = room_id!("!galette:saucisse.bzh");
let room2 = room_id!("!crepe:saucisse.bzh");

let event_cache = client.event_cache();
event_cache.subscribe().unwrap();

// Insert two rooms with a few events.
let f = EventFactory::new().room(room1).sender(user_id!("@ben:saucisse.bzh"));

let eid1 = event_id!("$1");
let eid2 = event_id!("$2");
let eid3 = event_id!("$3");

let joined_room_update1 = JoinedRoomUpdate {
timeline: Timeline {
events: vec![
f.text_msg("hey").event_id(eid1).into(),
f.text_msg("you").event_id(eid2).into(),
],
..Default::default()
},
..Default::default()
};

let joined_room_update2 = JoinedRoomUpdate {
timeline: Timeline {
events: vec![f.text_msg("bjr").event_id(eid3).into()],
..Default::default()
},
..Default::default()
};

let mut updates = RoomUpdates::default();
updates.join.insert(room1.to_owned(), joined_room_update1);
updates.join.insert(room2.to_owned(), joined_room_update2);

// Have the event cache handle them.
event_cache.inner.handle_room_updates(updates).await.unwrap();

// Now retrieve all the events one by one.
let found1 = event_cache.event(eid1).await.unwrap();
assert_event_matches_msg(&found1, "hey");

let found2 = event_cache.event(eid2).await.unwrap();
assert_event_matches_msg(&found2, "you");

let found3 = event_cache.event(eid3).await.unwrap();
assert_event_matches_msg(&found3, "bjr");

// An unknown event won't be found.
assert!(event_cache.event(event_id!("$unknown")).await.is_none());
}
}
66 changes: 2 additions & 64 deletions crates/matrix-sdk/src/event_cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt, iter::once};
use std::fmt;

use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;

use super::linked_chunk::{
Chunk, ChunkIdentifier, Error, Iter, IterBackward, LinkedChunk, Position,
};
use super::linked_chunk::{Chunk, ChunkIdentifier, Error, Iter, LinkedChunk, Position};

#[derive(Clone, Debug)]
pub struct Gap {
Expand All @@ -39,7 +37,6 @@ impl Default for RoomEvents {
}
}

#[allow(dead_code)]
impl RoomEvents {
pub fn new() -> Self {
Self { chunks: LinkedChunk::new() }
Expand All @@ -50,16 +47,6 @@ impl RoomEvents {
self.chunks = LinkedChunk::new();
}

/// Return the number of events.
pub fn len(&self) -> usize {
self.chunks.len()
}

/// Push one event after existing events.
pub fn push_event(&mut self, event: SyncTimelineEvent) {
self.push_events(once(event))
}

/// Push events after all events or gaps.
///
/// The last event in `events` is the most recent one.
Expand Down Expand Up @@ -117,45 +104,13 @@ impl RoomEvents {
self.chunks.chunk_identifier(predicate)
}

/// Search for an item, and return its position.
pub fn event_position<'a, P>(&'a self, predicate: P) -> Option<Position>
where
P: FnMut(&'a SyncTimelineEvent) -> bool,
{
self.chunks.item_position(predicate)
}

/// Iterate over the chunks, backward.
///
/// The most recent chunk comes first.
pub fn rchunks(&self) -> IterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
self.chunks.rchunks()
}

/// Iterate over the chunks, forward.
///
/// The oldest chunk comes first.
pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
self.chunks.chunks()
}

/// Iterate over the chunks, starting from `identifier`, backward.
pub fn rchunks_from(
&self,
identifier: ChunkIdentifier,
) -> Result<IterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, Error> {
self.chunks.rchunks_from(identifier)
}

/// Iterate over the chunks, starting from `identifier`, forward — i.e.
/// to the latest chunk.
pub fn chunks_from(
&self,
identifier: ChunkIdentifier,
) -> Result<Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, Error> {
self.chunks.chunks_from(identifier)
}

/// Iterate over the events, backward.
///
/// The most recent event comes first.
Expand All @@ -169,23 +124,6 @@ impl RoomEvents {
pub fn events(&self) -> impl Iterator<Item = (Position, &SyncTimelineEvent)> {
self.chunks.items()
}

/// Iterate over the events, starting from `position`, backward.
pub fn revents_from(
&self,
position: Position,
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, Error> {
self.chunks.ritems_from(position)
}

/// Iterate over the events, starting from `position`, forward — i.e.
/// to the latest event.
pub fn events_from(
&self,
position: Position,
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, Error> {
self.chunks.items_from(position)
}
}

impl fmt::Debug for RoomEvents {
Expand Down
Loading