Skip to content

Commit 8414718

Browse files
authored
feat(StateStore): Adding forward and backwards streams for room timelines
Merge pull request #486 from jsparber/timeline_stream
2 parents 2b20055 + 78af96d commit 8414718

File tree

17 files changed

+1968
-65
lines changed

17 files changed

+1968
-65
lines changed

crates/matrix-sdk-base/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ indexeddb_state_store = ["indexed_db_futures", "wasm-bindgen", "pbkdf2", "hmac",
3434
indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"]
3535

3636
[dependencies]
37+
async-stream = "0.3.2"
3738
chacha20poly1305 = { version = "0.9.0", optional = true }
3839
dashmap = "4.0.2"
3940
futures-core = "0.3.15"
4041
futures-util = { version = "0.3.15", default-features = false }
42+
futures-channel = "0.3.15"
4143
hmac = { version = "0.12.0", optional = true }
4244
lru = "0.7.2"
4345
matrix-sdk-common = { version = "0.4.0", path = "../matrix-sdk-common" }

crates/matrix-sdk-base/src/client.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use matrix_sdk_common::locks::Mutex;
3030
use matrix_sdk_common::{
3131
deserialized_responses::{
3232
AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms,
33-
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline,
33+
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice,
3434
},
3535
instant::Instant,
3636
locks::RwLock,
@@ -848,6 +848,16 @@ impl BaseClient {
848848
let notification_count = new_info.unread_notifications.into();
849849
room_info.update_notification_count(notification_count);
850850

851+
let timeline_slice = TimelineSlice::new(
852+
timeline.events.clone(),
853+
next_batch.clone(),
854+
timeline.prev_batch.clone(),
855+
timeline.limited,
856+
true,
857+
);
858+
859+
changes.add_timeline(&room_id, timeline_slice);
860+
851861
new_rooms.join.insert(
852862
room_id,
853863
JoinedRoom::new(
@@ -958,11 +968,33 @@ impl BaseClient {
958968
room.update_summary(room_info.clone())
959969
}
960970
}
971+
961972
for (room_id, room_info) in &changes.stripped_room_infos {
962973
if let Some(room) = self.store.get_stripped_room(room_id) {
963974
room.update_summary(room_info.clone())
964975
}
965976
}
977+
978+
for (room_id, timeline_slice) in &changes.timeline {
979+
if let Some(room) = self.store.get_room(room_id) {
980+
room.add_timeline_slice(timeline_slice).await;
981+
}
982+
}
983+
}
984+
985+
/// Receive a timeline slice obtained from a messages request.
986+
///
987+
/// You should pass only slices requested from the store to this function.
988+
///
989+
/// * `timeline` - The `TimelineSlice`
990+
pub async fn receive_messages(&self, room_id: &RoomId, timeline: TimelineSlice) -> Result<()> {
991+
let mut changes = StateChanges::default();
992+
993+
changes.add_timeline(room_id, timeline);
994+
995+
self.store().save_changes(&changes).await?;
996+
997+
Ok(())
966998
}
967999

9681000
/// Receive a get member events response and convert it to a deserialized

crates/matrix-sdk-base/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub use matrix_sdk_common::*;
3939
pub use crate::{
4040
error::{Error, Result},
4141
session::Session,
42+
timeline_stream::TimelineStreamError,
4243
};
4344

4445
mod client;
@@ -47,6 +48,7 @@ pub mod media;
4748
mod rooms;
4849
mod session;
4950
mod store;
51+
mod timeline_stream;
5052

5153
pub use client::{BaseClient, BaseClientConfig};
5254
#[cfg(feature = "encryption")]

crates/matrix-sdk-base/src/rooms/normal.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
use std::sync::{Arc, RwLock as SyncRwLock};
1616

17+
use dashmap::DashSet;
18+
use futures_channel::mpsc;
19+
use futures_core::stream::Stream;
1720
use futures_util::stream::{self, StreamExt};
21+
use matrix_sdk_common::locks::Mutex;
1822
use ruma::{
1923
api::client::r0::sync::sync_events::RoomSummary as RumaSummary,
2024
events::{
@@ -34,12 +38,13 @@ use ruma::{
3438
EventId, MxcUri, RoomAliasId, RoomId, UserId,
3539
};
3640
use serde::{Deserialize, Serialize};
37-
use tracing::debug;
41+
use tracing::{debug, warn};
3842

3943
use super::{BaseRoomInfo, RoomMember};
4044
use crate::{
41-
deserialized_responses::UnreadNotificationsCount,
45+
deserialized_responses::{SyncRoomEvent, TimelineSlice, UnreadNotificationsCount},
4246
store::{Result as StoreResult, StateStore},
47+
timeline_stream::{TimelineStreamBackward, TimelineStreamError, TimelineStreamForward},
4348
};
4449

4550
/// The underlying room data structure collecting state for joined, left and
@@ -50,6 +55,8 @@ pub struct Room {
5055
own_user_id: Arc<UserId>,
5156
inner: Arc<SyncRwLock<RoomInfo>>,
5257
store: Arc<dyn StateStore>,
58+
forward_timeline_streams: Arc<Mutex<Vec<mpsc::Sender<TimelineSlice>>>>,
59+
backward_timeline_streams: Arc<Mutex<Vec<mpsc::Sender<TimelineSlice>>>>,
5360
}
5461

5562
/// The room summary containing member counts and members that should be used to
@@ -107,6 +114,8 @@ impl Room {
107114
room_id: room_info.room_id.clone(),
108115
store,
109116
inner: Arc::new(SyncRwLock::new(room_info)),
117+
forward_timeline_streams: Default::default(),
118+
backward_timeline_streams: Default::default(),
110119
}
111120
}
112121

@@ -467,6 +476,112 @@ impl Room {
467476
) -> StoreResult<Vec<(Box<UserId>, Receipt)>> {
468477
self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await
469478
}
479+
480+
/// Get two stream into the timeline.
481+
/// First one is forward in time and the second one is backward in time.
482+
pub async fn timeline(
483+
&self,
484+
) -> StoreResult<(
485+
impl Stream<Item = SyncRoomEvent>,
486+
impl Stream<Item = Result<SyncRoomEvent, TimelineStreamError>>,
487+
)> {
488+
// We need to hold the lock while we create the stream so that we don't lose new
489+
// sync responses
490+
let mut forward_timeline_streams = self.forward_timeline_streams.lock().await;
491+
let mut backward_timeline_streams = self.backward_timeline_streams.lock().await;
492+
let sync_token = self.store.get_sync_token().await?;
493+
let event_ids = Arc::new(DashSet::new());
494+
495+
let (backward_stream, backward_sender) = if let Some((stored_events, end_token)) =
496+
self.store.room_timeline(&self.room_id).await?
497+
{
498+
TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events))
499+
} else {
500+
TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None)
501+
};
502+
503+
backward_timeline_streams.push(backward_sender);
504+
505+
let (forward_stream, forward_sender) = TimelineStreamForward::new(event_ids);
506+
forward_timeline_streams.push(forward_sender);
507+
508+
Ok((forward_stream, backward_stream))
509+
}
510+
511+
/// Create a stream that returns all events of the room's timeline forward
512+
/// in time.
513+
///
514+
/// If you need also a backward stream you should use
515+
/// [`timeline`][`crate::Room::timeline`]
516+
pub async fn timeline_forward(&self) -> StoreResult<impl Stream<Item = SyncRoomEvent>> {
517+
let mut forward_timeline_streams = self.forward_timeline_streams.lock().await;
518+
let event_ids = Arc::new(DashSet::new());
519+
520+
let (forward_stream, forward_sender) = TimelineStreamForward::new(event_ids);
521+
forward_timeline_streams.push(forward_sender);
522+
523+
Ok(forward_stream)
524+
}
525+
526+
/// Create a stream that returns all events of the room's timeline backward
527+
/// in time.
528+
///
529+
/// If you need also a forward stream you should use
530+
/// [`timeline`][`crate::Room::timeline`]
531+
pub async fn timeline_backward(
532+
&self,
533+
) -> StoreResult<impl Stream<Item = Result<SyncRoomEvent, TimelineStreamError>>> {
534+
let mut backward_timeline_streams = self.backward_timeline_streams.lock().await;
535+
let sync_token = self.store.get_sync_token().await?;
536+
let event_ids = Arc::new(DashSet::new());
537+
538+
let (backward_stream, backward_sender) = if let Some((stored_events, end_token)) =
539+
self.store.room_timeline(&self.room_id).await?
540+
{
541+
TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events))
542+
} else {
543+
TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None)
544+
};
545+
546+
backward_timeline_streams.push(backward_sender);
547+
548+
Ok(backward_stream)
549+
}
550+
551+
/// Add a new timeline slice to the timeline streams.
552+
pub async fn add_timeline_slice(&self, timeline: &TimelineSlice) {
553+
if timeline.sync {
554+
let mut streams = self.forward_timeline_streams.lock().await;
555+
let mut remaining_streams = Vec::with_capacity(streams.len());
556+
while let Some(mut forward) = streams.pop() {
557+
if !forward.is_closed() {
558+
if let Err(error) = forward.try_send(timeline.clone()) {
559+
if error.is_full() {
560+
warn!("Drop timeline slice because the limit of the buffer for the forward stream is reached");
561+
}
562+
} else {
563+
remaining_streams.push(forward);
564+
}
565+
}
566+
}
567+
*streams = remaining_streams;
568+
} else {
569+
let mut streams = self.backward_timeline_streams.lock().await;
570+
let mut remaining_streams = Vec::with_capacity(streams.len());
571+
while let Some(mut backward) = streams.pop() {
572+
if !backward.is_closed() {
573+
if let Err(error) = backward.try_send(timeline.clone()) {
574+
if error.is_full() {
575+
warn!("Drop timeline slice because the limit of the buffer for the backward stream is reached");
576+
}
577+
} else {
578+
remaining_streams.push(backward);
579+
}
580+
}
581+
}
582+
*streams = remaining_streams;
583+
}
584+
}
470585
}
471586

472587
/// The underlying pure data structure for joined and left rooms.

0 commit comments

Comments
 (0)