Skip to content

Commit b304a1f

Browse files
committed
event graph: add an initial implementation of the event graph
This is mostly a demonstration of how to plug this with the timeline, and how little it changes as a result. Remove read receipts
1 parent 25ecf08 commit b304a1f

File tree

14 files changed

+463
-59
lines changed

14 files changed

+463
-59
lines changed
Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
// Copyright 2024 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! The event graph is an abstraction layer, sitting between the Rust SDK and a
16+
//! final client, that acts as a global observer of all the rooms, gathering and
17+
//! inferring some extra useful information about each room. In particular, this
18+
//! doesn't require subscribing to a specific room to get access to this
19+
//! information.
20+
//!
21+
//! It's intended to be fast, robust and easy to maintain.
22+
//!
23+
//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more details about the historical reasons that led us to start writing this.
24+
//!
25+
//! Most of it is still a work-in-progress, as of 2024-01-22.
26+
//!
27+
//! The desired set of features it may eventually implement is the following:
28+
//!
29+
//! - [ ] compute proper unread room counts, and use backpagination to get
30+
//! missing messages/notifications/mentions, if needs be.
31+
//! - [ ] expose that information with a new data structure similar to the
32+
//! `RoomInfo`, and that may update a `RoomListService`.
33+
//! - [ ] provide read receipts for each message.
34+
//! - [ ] backwards and forward pagination, and reconcile results with cached
35+
//! timelines.
36+
//! - [ ] retry decryption upon receiving new keys (from an encryption sync
37+
//! service or from a key backup).
38+
//! - [ ] expose the latest event for a given room.
39+
//! - [ ] caching of events on-disk.
40+
41+
#![forbid(missing_docs)]
42+
43+
use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
44+
45+
use async_trait::async_trait;
46+
use matrix_sdk::{sync::RoomUpdate, Client, Room};
47+
use matrix_sdk_base::{
48+
deserialized_responses::SyncTimelineEvent,
49+
sync::{JoinedRoom, LeftRoom, Timeline},
50+
};
51+
use ruma::{
52+
events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
53+
serde::Raw,
54+
OwnedRoomId, RoomId,
55+
};
56+
use tokio::{
57+
spawn,
58+
sync::{
59+
broadcast::{error::RecvError, Receiver, Sender},
60+
RwLock,
61+
},
62+
task::JoinHandle,
63+
};
64+
use tracing::{debug, error, trace};
65+
66+
/// An error observed in the `EventGraph`.
67+
#[derive(thiserror::Error, Debug)]
68+
pub enum EventGraphError {
69+
/// A room hasn't been found, when trying to create a graph view for that
70+
/// room.
71+
#[error("Room with id {0} not found")]
72+
RoomNotFound(OwnedRoomId),
73+
}
74+
75+
/// A result using the [`EventGraphError`].
76+
pub type Result<T> = std::result::Result<T, EventGraphError>;
77+
78+
/// Hold handles to the tasks spawn by a [`RoomEventGraph`].
79+
struct RoomGraphDropHandles {
80+
listen_updates_task: JoinHandle<()>,
81+
}
82+
83+
impl Drop for RoomGraphDropHandles {
84+
fn drop(&mut self) {
85+
self.listen_updates_task.abort();
86+
}
87+
}
88+
89+
/// An event graph, providing lots of useful functionality for clients.
90+
///
91+
/// See also the module-level comment.
92+
pub struct EventGraph {
93+
/// Reference to the client used to navigate this graph.
94+
client: Client,
95+
/// Lazily-filled cache of live [`RoomEventGraph`], once per room.
96+
by_room: BTreeMap<OwnedRoomId, RoomEventGraph>,
97+
/// Backend used for storage.
98+
store: Arc<dyn EventGraphStore>,
99+
}
100+
101+
impl Debug for EventGraph {
102+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103+
f.debug_struct("EventGraph").finish_non_exhaustive()
104+
}
105+
}
106+
107+
impl EventGraph {
108+
/// Create a new [`EventGraph`] for the given client.
109+
pub fn new(client: Client) -> Self {
110+
let store = Arc::new(MemoryStore::new());
111+
Self { client, by_room: Default::default(), store }
112+
}
113+
114+
/// Return a room-specific view over the [`EventGraph`].
115+
///
116+
/// It may not be found, if the room isn't known to the client.
117+
pub fn for_room(&mut self, room_id: &RoomId) -> Option<RoomEventGraph> {
118+
match self.by_room.get(room_id) {
119+
Some(room) => Some(room.clone()),
120+
None => {
121+
let room = self.client.get_room(room_id)?;
122+
let room_event_graph = RoomEventGraph::new(room, self.store.clone());
123+
self.by_room.insert(room_id.to_owned(), room_event_graph.clone());
124+
Some(room_event_graph)
125+
}
126+
}
127+
}
128+
129+
/// Add an initial set of events to the event graph, reloaded from a cache.
130+
///
131+
/// TODO: temporary for API compat, as the event graph should take care of
132+
/// its own cache.
133+
pub async fn add_initial_events(
134+
&mut self,
135+
room_id: &RoomId,
136+
events: Vec<SyncTimelineEvent>,
137+
) -> Result<()> {
138+
let room_graph = self
139+
.for_room(room_id)
140+
.ok_or_else(|| EventGraphError::RoomNotFound(room_id.to_owned()))?;
141+
room_graph.inner.append_events(events).await?;
142+
Ok(())
143+
}
144+
}
145+
146+
/// A store that can be remember information about the event graph.
147+
///
148+
/// It really acts as a cache, in the sense that clearing the backing data
149+
/// should not have any irremediable effect, other than providing a lesser user
150+
/// experience.
151+
#[async_trait]
152+
pub trait EventGraphStore: Send + Sync {
153+
/// Returns all the known events for the given room.
154+
async fn room_events(&self, room: &RoomId) -> Result<Vec<SyncTimelineEvent>>;
155+
156+
/// Adds all the events to the given room.
157+
async fn add_room_events(&self, room: &RoomId, events: Vec<SyncTimelineEvent>) -> Result<()>;
158+
159+
/// Clear all the events from the given room.
160+
async fn clear_room_events(&self, room: &RoomId) -> Result<()>;
161+
}
162+
163+
struct MemoryStore {
164+
/// All the events per room, in sync order.
165+
by_room: RwLock<BTreeMap<OwnedRoomId, Vec<SyncTimelineEvent>>>,
166+
}
167+
168+
impl MemoryStore {
169+
fn new() -> Self {
170+
Self { by_room: Default::default() }
171+
}
172+
}
173+
174+
#[async_trait]
175+
impl EventGraphStore for MemoryStore {
176+
async fn room_events(&self, room: &RoomId) -> Result<Vec<SyncTimelineEvent>> {
177+
Ok(self.by_room.read().await.get(room).cloned().unwrap_or_default())
178+
}
179+
180+
async fn add_room_events(&self, room: &RoomId, events: Vec<SyncTimelineEvent>) -> Result<()> {
181+
self.by_room.write().await.entry(room.to_owned()).or_default().extend(events);
182+
Ok(())
183+
}
184+
185+
async fn clear_room_events(&self, room: &RoomId) -> Result<()> {
186+
let _ = self.by_room.write().await.remove(room);
187+
Ok(())
188+
}
189+
}
190+
191+
/// A subset of an event graph, for a room.
192+
///
193+
/// Cloning is shallow, and thus is cheap to do.
194+
#[derive(Clone)]
195+
pub struct RoomEventGraph {
196+
inner: Arc<RoomEventGraphInner>,
197+
198+
_drop_handles: Arc<RoomGraphDropHandles>,
199+
}
200+
201+
impl Debug for RoomEventGraph {
202+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203+
f.debug_struct("RoomEventGraph").finish_non_exhaustive()
204+
}
205+
}
206+
207+
impl RoomEventGraph {
208+
/// Create a new [`RoomEventGraph`] using the given room and store.
209+
fn new(room: Room, store: Arc<dyn EventGraphStore>) -> Self {
210+
let (inner, drop_handles) = RoomEventGraphInner::new(room, store);
211+
Self { inner, _drop_handles: drop_handles }
212+
}
213+
214+
/// Subscribe to room updates for this room, after getting the initial list
215+
/// of events. XXX: Could/should it use some kind of `Observable`
216+
/// instead? Or not something async, like explicit handlers as our event
217+
/// handlers?
218+
pub async fn subscribe(
219+
&self,
220+
) -> Result<(Vec<SyncTimelineEvent>, Receiver<RoomEventGraphUpdate>)> {
221+
Ok((
222+
self.inner.store.room_events(self.inner.room.room_id()).await?,
223+
self.inner.sender.subscribe(),
224+
))
225+
}
226+
}
227+
228+
struct RoomEventGraphInner {
229+
sender: Sender<RoomEventGraphUpdate>,
230+
store: Arc<dyn EventGraphStore>,
231+
room: Room,
232+
}
233+
234+
impl RoomEventGraphInner {
235+
/// Creates a new graph for a room, and subscribes to room updates., so as
236+
/// to handle new timeline events.
237+
fn new(room: Room, store: Arc<dyn EventGraphStore>) -> (Arc<Self>, Arc<RoomGraphDropHandles>) {
238+
let sender = Sender::new(32);
239+
240+
let room_graph = Arc::new(Self { room, store, sender });
241+
242+
let listen_updates_task = spawn(Self::listen_task(room_graph.clone()));
243+
244+
(room_graph, Arc::new(RoomGraphDropHandles { listen_updates_task }))
245+
}
246+
247+
async fn handle_joined_room_update(&self, updates: JoinedRoom) -> Result<()> {
248+
self.handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.account_data)
249+
.await?;
250+
Ok(())
251+
}
252+
253+
async fn handle_timeline(
254+
&self,
255+
timeline: Timeline,
256+
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
257+
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
258+
) -> Result<()> {
259+
let room_id = self.room.room_id();
260+
261+
if timeline.limited {
262+
// Ideally we'd try to reconcile existing events against those received in the
263+
// timeline, but we're not there yet. In the meanwhile, clear the
264+
// items from the room. TODO: implement Smart Matching™.
265+
trace!("limited timeline, clearing all previous events");
266+
self.store.clear_room_events(room_id).await?;
267+
let _ = self.sender.send(RoomEventGraphUpdate::Clear);
268+
}
269+
270+
// Add all the events to the backend.
271+
trace!("adding new events");
272+
self.store.add_room_events(room_id, timeline.events.clone()).await?;
273+
274+
// Propagate events to observers.
275+
let _ = self.sender.send(RoomEventGraphUpdate::Append {
276+
events: timeline.events,
277+
prev_batch: timeline.prev_batch,
278+
ephemeral,
279+
account_data,
280+
});
281+
282+
Ok(())
283+
}
284+
285+
async fn handle_left_room_update(&self, updates: LeftRoom) -> Result<()> {
286+
self.handle_timeline(updates.timeline, Vec::new(), Vec::new()).await?;
287+
Ok(())
288+
}
289+
290+
async fn listen_task(this: Arc<Self>) {
291+
// TODO for prototyping, i'm spawning a new task to get the room updates.
292+
// Ideally we'd have something like the whole sync update, a generalisation of
293+
// the room update.
294+
trace!("Spawning the listen task");
295+
296+
let mut update_receiver = this.room.client().subscribe_to_room_updates(this.room.room_id());
297+
298+
loop {
299+
match update_receiver.recv().await {
300+
Ok(update) => {
301+
trace!("Listen task received an update");
302+
303+
match update {
304+
RoomUpdate::Left { updates, .. } => {
305+
if let Err(err) = this.handle_left_room_update(updates).await {
306+
error!("handling left room update: {err}");
307+
}
308+
}
309+
RoomUpdate::Joined { updates, .. } => {
310+
if let Err(err) = this.handle_joined_room_update(updates).await {
311+
error!("handling joined room update: {err}");
312+
}
313+
}
314+
RoomUpdate::Invited { .. } => {
315+
// We don't do anything for invited rooms at this
316+
// point. TODO should
317+
// we?
318+
}
319+
}
320+
}
321+
322+
Err(RecvError::Closed) => {
323+
// The loop terminated successfully.
324+
debug!("Listen task closed");
325+
break;
326+
}
327+
328+
Err(RecvError::Lagged(_)) => {
329+
// Since we've lagged behind updates to this room, we might be out of
330+
// sync with the events, leading to potentially lost events. Play it
331+
// safe here, and clear the cache. It's fine because we can retrigger
332+
// backpagination from the last event at any time, if needs be.
333+
debug!("Listen task lagged, clearing room");
334+
if let Err(err) = this.store.clear_room_events(this.room.room_id()).await {
335+
error!("unable to clear room after room updates lag: {err}");
336+
}
337+
}
338+
}
339+
}
340+
}
341+
342+
/// Append a set of events to the room graph and storage, notifying
343+
/// observers.
344+
async fn append_events(&self, events: Vec<SyncTimelineEvent>) -> Result<()> {
345+
self.store.add_room_events(self.room.room_id(), events.clone()).await?;
346+
347+
let _ = self.sender.send(RoomEventGraphUpdate::Append {
348+
events,
349+
prev_batch: None,
350+
account_data: Default::default(),
351+
ephemeral: Default::default(),
352+
});
353+
354+
Ok(())
355+
}
356+
}
357+
358+
/// An update related to events happened in a room.
359+
#[derive(Clone)]
360+
pub enum RoomEventGraphUpdate {
361+
/// The room has been cleared from events.
362+
Clear,
363+
/// The room has new events.
364+
Append {
365+
/// All the new events that have been added to the room.
366+
events: Vec<SyncTimelineEvent>,
367+
/// XXX: this is temporary, until backpagination lives in the event
368+
/// graph.
369+
prev_batch: Option<String>,
370+
/// XXX: this is temporary, until account data lives in the event graph
371+
/// — or will it live there?
372+
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
373+
/// XXX: this is temporary, until read receipts are handled in the event
374+
/// graph
375+
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
376+
},
377+
}

crates/matrix-sdk-ui/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use ruma::html::HtmlSanitizerMode;
1717
mod events;
1818

1919
pub mod encryption_sync_service;
20+
pub mod event_graph;
2021
pub mod notification_client;
2122
pub mod room_list_service;
2223
pub mod sync_service;

crates/matrix-sdk-ui/src/room_list_service/room.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ impl Room {
128128
self.inner.sliding_sync_room.prev_batch(),
129129
self.inner.sliding_sync_room.timeline_queue(),
130130
)
131+
.await
131132
.track_read_marker_and_receipts()
132133
.build()
133134
.await,

0 commit comments

Comments
 (0)