Skip to content

Store the timeline in the SledStore #141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions matrix_sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ use matrix_sdk_common::{
instant::{Duration, Instant},
presence::PresenceState,
uuid::Uuid,
FromHttpResponseError, UInt,
FromHttpResponseError, Raw, UInt,
};

#[cfg(feature = "encryption")]
Expand Down Expand Up @@ -1051,7 +1051,29 @@ impl Client {
request: impl Into<get_message_events::Request<'_>>,
) -> Result<get_message_events::Response> {
let request = request.into();
self.send(request, None).await
let room_id = request.room_id.clone();

if let Some((to_tkn, requested_events)) = self
.store()
.contains_timeline_events(&room_id, &request)
.await?
{
return Ok(assign!(get_message_events::Response::new(), {
start: Some(request.from.to_string()),
end: Some(to_tkn),
chunk: requested_events.iter().map(|e| Raw::from(e)).collect(),
// TODO: State changes this seems difficult?
state: vec![],
}));
}

let dir = request.dir.clone();

let resp = self.send(request, None).await?;
self.base_client
.receive_messages_response(&room_id, dir, &resp)
.await?;
Ok(resp)
}

/// Send a request to notify the room of a user typing.
Expand Down Expand Up @@ -3099,4 +3121,62 @@ mod test {
}
}
}

#[tokio::test]
async fn messages() {
let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
let client = logged_in_client().await;

let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
)
.with_status(200)
.with_body(test_json::SYNC.to_string())
.match_header("authorization", "Bearer 1234")
.create();

let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/rooms/.*/messages".to_string()),
)
.with_status(200)
.with_body(test_json::ROOM_MESSAGES.to_string())
.match_header("authorization", "Bearer 1234")
.create();

let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));

let _response = client.sync_once(sync_settings).await.unwrap();

assert!(!client
.store()
.get_messages(&room_id, "t392-516_47314_0_7_1_1_1_11444_1")
.await
.unwrap()
.is_empty());

let room_id = room_id!("!roomid:example.com");
let request = matrix_sdk_common::api::r0::message::get_message_events::Request::backward(
&room_id,
"t47429-4392820_219380_26003_2265",
);

let _resp = client.room_messages(request).await.unwrap();

assert_eq!(
client
.store()
.get_messages(&room_id, "t47409-4357353_219380_26003_2265")
.await
.unwrap()
.len(),
3
);
// end: t3336-1714379051_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049
// start: t3356-1714663804_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049

// end: t3316-1714212736_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049
// start: t3336-1714379051_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049
}
}
55 changes: 41 additions & 14 deletions matrix_sdk_base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
sync::Arc,
};

use api::message::get_message_events::Direction;
use matrix_sdk_common::{
api::r0 as api,
deserialized_responses::{
Expand Down Expand Up @@ -495,20 +496,17 @@ impl BaseClient {
},

#[cfg(feature = "encryption")]
AnySyncRoomEvent::Message(message) => {
if let AnySyncMessageEvent::RoomEncrypted(encrypted) = message {
if let Some(olm) = self.olm_machine().await {
if let Ok(decrypted) =
olm.decrypt_room_event(encrypted, room_id).await
{
match decrypted.deserialize() {
Ok(decrypted) => e = decrypted,
Err(e) => {
warn!(
"Error deserializing a decrypted event {:?} ",
e
)
}
AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted(
encrypted,
)) => {
if let Some(olm) = self.olm_machine().await {
if let Ok(decrypted) =
olm.decrypt_room_event(encrypted, room_id).await
{
match decrypted.deserialize() {
Ok(decrypted) => e = decrypted,
Err(e) => {
warn!("Error deserializing a decrypted event {:?} ", e)
}
}
}
Expand All @@ -529,6 +527,19 @@ impl BaseClient {
}
}

// TODO:
// We wait until after the event has been decrypted?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry what are you asking here? The block above already makes sure that all events that can be decrypted during this sync are decrypted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was just making sure that we wanted the events decrypted I figured since the Store can be encrypted that this is Ok but I just wanted to make sure.

if let Some(prev) = timeline.prev_batch.as_deref() {
let needed = self
.store
.unknown_timeline_events(room_id, prev, &timeline.events)
.await?;

if !needed.is_empty() {
changes.handle_sync_timeline(room_id, prev, needed);
}
}

Ok(timeline)
}

Expand Down Expand Up @@ -1046,6 +1057,22 @@ impl BaseClient {
})
}

/// Receive a successful /messages response.
///
/// * `response` - The successful response from /messages.
pub async fn receive_messages_response(
&self,
room_id: &RoomId,
dir: Direction,
resp: &api::message::get_message_events::Response,
) -> Result<()> {
let mut changes = StateChanges::default();
changes.handle_messages_response(room_id, resp, dir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will need to decrypt the events as well, but that can be left for later on.

self.store().save_changes(&changes).await?;

Ok(())
}

/// Receive a successful filter upload response, the filter id will be
/// stored under the given name in the store.
///
Expand Down
37 changes: 36 additions & 1 deletion matrix_sdk_base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ use std::{

use dashmap::{DashMap, DashSet};
use matrix_sdk_common::{
api::r0::message::get_message_events::{
Request as MessagesRequest, Response as MessagesResponse,
},
async_trait,
events::{
presence::PresenceEvent,
room::member::{MemberEventContent, MembershipState},
AnyBasicEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
AnyBasicEvent, AnyMessageEvent, AnyRoomEvent, AnyStrippedStateEvent, AnySyncMessageEvent,
AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType,
},
identifiers::{RoomId, UserId},
instant::Instant,
Expand Down Expand Up @@ -295,6 +299,15 @@ impl MemoryStore {
#[allow(clippy::map_clone)]
self.stripped_room_info.iter().map(|r| r.clone()).collect()
}

pub async fn unknown_timeline_events<'a>(
&'a self,
_: &RoomId,
_: &str,
_: &'a [AnySyncRoomEvent],
) -> Result<&'a [AnySyncRoomEvent]> {
todo!()
}
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down Expand Up @@ -373,4 +386,26 @@ impl StateStore for MemoryStore {
.and_then(|d| d.get(display_name).map(|d| d.clone()))
.unwrap_or_default())
}

async fn get_messages(&self, _: &RoomId, _: &str) -> Result<Vec<AnySyncMessageEvent>> {
todo!()
}

async fn contains_timeline_events(
&self,
room_id: &RoomId,
req: &MessagesRequest<'_>,
) -> Result<Option<(String, Vec<AnyRoomEvent>)>> {
todo!()
}

async fn unknown_timeline_events<'a>(
&'a self,
room_id: &RoomId,
prev_batch: &str,
events: &'a [AnySyncRoomEvent],
) -> Result<&'a [AnySyncRoomEvent]> {
self.unknown_timeline_events(room_id, prev_batch, events)
.await
}
}
92 changes: 90 additions & 2 deletions matrix_sdk_base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use std::{

use dashmap::DashMap;
use matrix_sdk_common::{
api::r0::message::get_message_events::{
Direction, Request as MessagesRequest, Response as MessagesResponse,
},
async_trait,
events::{
presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent,
AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent, AnyMessageEvent,
AnyRoomEvent, AnyStrippedStateEvent, AnySyncMessageEvent, AnySyncRoomEvent,
AnySyncStateEvent, EventContent, EventType,
},
identifiers::{RoomId, UserId},
locks::RwLock,
Expand Down Expand Up @@ -180,6 +184,32 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
display_name: &str,
) -> Result<BTreeSet<UserId>>;

/// Get all timeline events that came with this `prev_batch` token.
async fn get_messages(
&self,
room_id: &RoomId,
prev_batch: &str,
) -> Result<Vec<AnySyncMessageEvent>>;

/// Checks if we have the events requested from /messages in the DB.
async fn contains_timeline_events(
&self,
room_id: &RoomId,
req: &MessagesRequest<'_>,
) -> Result<Option<(String, Vec<AnyRoomEvent>)>>;

/// Checks if the message events in this chunk are already contained in the store.
///
/// # Arguments
///
/// * `room_id` - The id of the room we are checking.
async fn unknown_timeline_events<'a>(
&'a self,
room_id: &RoomId,
prev_batch: &str,
events: &'a [AnySyncRoomEvent],
) -> Result<&'a [AnySyncRoomEvent]>;
}

/// A state store wrapper for the SDK.
Expand Down Expand Up @@ -386,6 +416,13 @@ pub struct StateChanges {
pub stripped_members: BTreeMap<RoomId, BTreeMap<UserId, StrippedMemberEvent>>,
/// A map of `RoomId` to `StrippedRoomInfo`.
pub invited_room_info: BTreeMap<RoomId, StrippedRoomInfo>,

/// A mapping of `RoomId` to sync timeline events ordered based on their `prev_batch` token from the /sync
/// response.
pub sync_timeline: BTreeMap<RoomId, BTreeMap<(String, u64), AnySyncRoomEvent>>,
/// A mapping of `RoomId` to timeline events ordered based on their `end` token from the /messages
/// response.
pub messages_timeline: BTreeMap<RoomId, BTreeMap<(String, u64), AnyRoomEvent>>,
}

impl StateChanges {
Expand Down Expand Up @@ -457,4 +494,55 @@ impl StateChanges {
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_string(), event);
}

///
pub fn handle_sync_timeline(
&mut self,
room_id: &RoomId,
prev_batch: &str,
content: &[AnySyncRoomEvent],
) {
for (idx, msg) in content.iter().enumerate() {
let room = self.sync_timeline.entry(room_id.to_owned()).or_default();

room.insert((prev_batch.to_string(), idx as u64), msg.clone());
}
}

///
pub fn handle_messages_response(
&mut self,
room_id: &RoomId,
resp: &MessagesResponse,
dir: Direction,
) {
match dir {
// the end token is how to request older events
// events are in reverse-chronological order
Direction::Backward => {
if let Some(room) = self.room_infos.get_mut(room_id) {
room.set_prev_batch(resp.end.as_deref());
}

if let Some(prev_batch) = &resp.end {
// We reverse so our slice is oldest -> most recent
for (idx, msg) in resp.chunk.iter().rev().enumerate() {
if let Ok(msg) = msg.deserialize() {
let room = self
.messages_timeline
.entry(room_id.to_owned())
.or_default();

room.insert((prev_batch.to_string(), idx as u64), msg.clone());
}
}
}
}
// the start token is the oldest events
// events are in chronological order
Direction::Forward => {
todo!()
}
}
}
}
Loading