Skip to content

Commit a98f71e

Browse files
authored
feat(timeline): handle live aggregations on non-live timelines (#5060)
This makes it possible to handle reactions/redactions/edits/etc. on non-live timelines. As a result, the pinned and focused timelines will now get live reactions/redactions and so on. This makes it possible to also have the thread timelines handle those live events, although it's unclear how it will pane out in the end, when the event cache is also involved.
1 parent 6f8b744 commit a98f71e

File tree

6 files changed

+241
-32
lines changed

6 files changed

+241
-32
lines changed

crates/matrix-sdk-ui/src/timeline/builder.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -392,26 +392,22 @@ async fn room_event_cache_updates_task(
392392

393393
RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
394394
trace!("Received new timeline events diffs");
395-
396-
// We shouldn't use the general way of adding events to timelines to
397-
// non-live timelines, such as pinned events or focused timeline.
398-
// These timelines should handle any live updates by themselves.
399-
if !is_live {
400-
continue;
395+
let origin = match origin {
396+
EventsOrigin::Sync => RemoteEventOrigin::Sync,
397+
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
398+
EventsOrigin::Cache => RemoteEventOrigin::Cache,
399+
};
400+
401+
let has_diffs = !diffs.is_empty();
402+
403+
if is_live {
404+
timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
405+
} else {
406+
// Only handle the remote aggregation for a non-live timeline.
407+
timeline_controller.handle_remote_aggregations(diffs, origin).await;
401408
}
402409

403-
timeline_controller
404-
.handle_remote_events_with_diffs(
405-
diffs,
406-
match origin {
407-
EventsOrigin::Sync => RemoteEventOrigin::Sync,
408-
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
409-
EventsOrigin::Cache => RemoteEventOrigin::Cache,
410-
},
411-
)
412-
.await;
413-
414-
if matches!(origin, EventsOrigin::Cache) {
410+
if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
415411
timeline_controller.retry_event_decryption(None).await;
416412
}
417413
}

crates/matrix-sdk-ui/src/timeline/controller/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,22 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
753753
.await
754754
}
755755

756+
/// Only handle aggregations received as [`VectorDiff`]s.
757+
pub(super) async fn handle_remote_aggregations(
758+
&self,
759+
diffs: Vec<VectorDiff<TimelineEvent>>,
760+
origin: RemoteEventOrigin,
761+
) {
762+
if diffs.is_empty() {
763+
return;
764+
}
765+
766+
let mut state = self.state.write().await;
767+
state
768+
.handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
769+
.await
770+
}
771+
756772
pub(super) async fn clear(&self) {
757773
self.state.write().await.clear();
758774
}

crates/matrix-sdk-ui/src/timeline/controller/state.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ impl TimelineState {
9191
transaction.commit();
9292
}
9393

94+
/// Handle remote aggregations on events as [`VectorDiff`]s.
95+
pub(super) async fn handle_remote_aggregations<RoomData>(
96+
&mut self,
97+
diffs: Vec<VectorDiff<TimelineEvent>>,
98+
origin: RemoteEventOrigin,
99+
room_data: &RoomData,
100+
settings: &TimelineSettings,
101+
) where
102+
RoomData: RoomDataProvider,
103+
{
104+
if diffs.is_empty() {
105+
return;
106+
}
107+
108+
let mut transaction = self.transaction();
109+
transaction.handle_remote_aggregations(diffs, origin, room_data, settings).await;
110+
transaction.commit();
111+
}
112+
94113
/// Marks the given event as fully read, using the read marker received from
95114
/// sync.
96115
pub(super) fn handle_fully_read_marker(&mut self, fully_read_event_id: OwnedEventId) {

crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,159 @@ impl<'a> TimelineStateTransaction<'a> {
177177
self.check_invariants();
178178
}
179179

180+
async fn handle_remote_aggregation<RoomData>(
181+
&mut self,
182+
event: TimelineEvent,
183+
position: TimelineItemPosition,
184+
room_data_provider: &RoomData,
185+
date_divider_adjuster: &mut DateDividerAdjuster,
186+
) where
187+
RoomData: RoomDataProvider,
188+
{
189+
let deserialized = match event.raw().deserialize() {
190+
Ok(deserialized) => deserialized,
191+
Err(err) => {
192+
warn!("Failed to deserialize timeline event: {err}");
193+
return;
194+
}
195+
};
196+
197+
let sender = deserialized.sender().to_owned();
198+
let timestamp = deserialized.origin_server_ts();
199+
let event_id = deserialized.event_id().to_owned();
200+
let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
201+
202+
if let Some(action @ TimelineAction::HandleAggregation { .. }) = TimelineAction::from_event(
203+
deserialized,
204+
event.raw(),
205+
room_data_provider,
206+
None,
207+
&self.items,
208+
&mut self.meta,
209+
)
210+
.await
211+
{
212+
let encryption_info = event.kind.encryption_info().cloned();
213+
214+
let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
215+
216+
let ctx = TimelineEventContext {
217+
sender,
218+
sender_profile,
219+
timestamp,
220+
// These are not used when handling an aggregation.
221+
read_receipts: Default::default(),
222+
is_highlighted: false,
223+
flow: Flow::Remote {
224+
event_id: event_id.clone(),
225+
raw_event: event.raw().clone(),
226+
encryption_info,
227+
txn_id,
228+
position,
229+
},
230+
// This field is not used when handling an aggregation.
231+
should_add_new_items: false,
232+
};
233+
234+
TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, action).await;
235+
}
236+
}
237+
238+
/// Handle a set of live remote aggregations on events as [`VectorDiff`]s.
239+
///
240+
/// This is like `handle_remote_events`, with two key differences:
241+
/// - it only applies to aggregated events, not all the sync events.
242+
/// - it will also not add the events to the `all_remote_events` array
243+
/// itself.
244+
pub(super) async fn handle_remote_aggregations<RoomData>(
245+
&mut self,
246+
diffs: Vec<VectorDiff<TimelineEvent>>,
247+
origin: RemoteEventOrigin,
248+
room_data_provider: &RoomData,
249+
settings: &TimelineSettings,
250+
) where
251+
RoomData: RoomDataProvider,
252+
{
253+
let mut date_divider_adjuster =
254+
DateDividerAdjuster::new(settings.date_divider_mode.clone());
255+
256+
for diff in diffs {
257+
match diff {
258+
VectorDiff::Append { values: events } => {
259+
for event in events {
260+
self.handle_remote_aggregation(
261+
event,
262+
TimelineItemPosition::End { origin },
263+
room_data_provider,
264+
&mut date_divider_adjuster,
265+
)
266+
.await;
267+
}
268+
}
269+
270+
VectorDiff::PushFront { value: event } => {
271+
self.handle_remote_aggregation(
272+
event,
273+
TimelineItemPosition::Start { origin },
274+
room_data_provider,
275+
&mut date_divider_adjuster,
276+
)
277+
.await;
278+
}
279+
280+
VectorDiff::PushBack { value: event } => {
281+
self.handle_remote_aggregation(
282+
event,
283+
TimelineItemPosition::End { origin },
284+
room_data_provider,
285+
&mut date_divider_adjuster,
286+
)
287+
.await;
288+
}
289+
290+
VectorDiff::Insert { index: event_index, value: event } => {
291+
self.handle_remote_aggregation(
292+
event,
293+
TimelineItemPosition::At { event_index, origin },
294+
room_data_provider,
295+
&mut date_divider_adjuster,
296+
)
297+
.await;
298+
}
299+
300+
VectorDiff::Set { index: event_index, value: event } => {
301+
if let Some(timeline_item_index) = self
302+
.items
303+
.all_remote_events()
304+
.get(event_index)
305+
.and_then(|meta| meta.timeline_item_index)
306+
{
307+
self.handle_remote_aggregation(
308+
event,
309+
TimelineItemPosition::UpdateAt { timeline_item_index },
310+
room_data_provider,
311+
&mut date_divider_adjuster,
312+
)
313+
.await;
314+
} else {
315+
warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
316+
}
317+
}
318+
319+
VectorDiff::Remove { .. } | VectorDiff::Clear => {
320+
// Do nothing. An aggregated redaction comes with a
321+
// redaction event, or as a redacted event in the first
322+
// place.
323+
}
324+
325+
v => unimplemented!("{v:?}"),
326+
}
327+
}
328+
329+
self.adjust_date_dividers(date_divider_adjuster);
330+
self.check_invariants();
331+
}
332+
180333
fn check_invariants(&self) {
181334
self.check_no_duplicate_read_receipts();
182335
self.check_no_unused_unique_ids();

crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ async fn test_new_focused() {
185185
}
186186

187187
#[async_test]
188-
async fn test_focused_timeline_does_not_react() {
188+
async fn test_live_aggregations_are_reflected_on_focused_timelines() {
189189
let room_id = room_id!("!a98sd12bjh:example.org");
190190
let (client, server) = logged_in_client_with_server().await;
191191
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
@@ -254,8 +254,16 @@ async fn test_focused_timeline_does_not_react() {
254254
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
255255
server.reset().await;
256256

257-
// Nothing was received by the focused event timeline
258-
assert_pending!(timeline_stream);
257+
// We only receive one updated for the reaction, from the timeline stream.
258+
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
259+
assert_eq!(timeline_updates.len(), 1);
260+
assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]);
261+
262+
let event_item = item.as_event().unwrap();
263+
assert_eq!(event_item.content().as_message().unwrap().body(), "yolo");
264+
let reactions = event_item.content().reactions().cloned().unwrap_or_default();
265+
assert_eq!(reactions.len(), 1);
266+
let _ = reactions["👍"][*BOB];
259267
}
260268

261269
#[async_test]

crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ async fn test_pinned_timeline_with_pinned_utd_on_sync_contains_it() {
440440
}
441441

442442
#[async_test]
443-
async fn test_edited_events_are_not_reflected_in_sync() {
443+
async fn test_edited_events_are_reflected_in_sync() {
444444
let server = MatrixMockServer::new().await;
445445
let client = server.client_builder().build().await;
446446
let room_id = room_id!("!test:localhost");
@@ -452,11 +452,11 @@ async fn test_edited_events_are_not_reflected_in_sync() {
452452
.server_ts(MilliSecondsSinceUnixEpoch::now())
453453
.into_raw_sync();
454454

455-
// Mock /event for some timeline events
455+
// Mock /event for some timeline events.
456456
mock_events_endpoint(&server, room_id, vec![pinned_event]).await;
457457

458458
// Load initial timeline items: a text message and a `m.room.pinned_events` with
459-
// event $1
459+
// event $1.
460460
let room = PinnedEventsSync::new(room_id)
461461
.with_pinned_event_ids(vec!["$1"])
462462
.mock_and_sync(&client, &server)
@@ -470,7 +470,7 @@ async fn test_edited_events_are_not_reflected_in_sync() {
470470
"there should be no live back-pagination status for a focused timeline"
471471
);
472472

473-
// Load timeline items
473+
// Load timeline items.
474474
let (items, mut timeline_stream) = timeline.subscribe().await;
475475

476476
assert_eq!(items.len(), 1 + 1); // event item + a date divider
@@ -479,31 +479,40 @@ async fn test_edited_events_are_not_reflected_in_sync() {
479479
assert_pending!(timeline_stream);
480480

481481
let edited_event = f
482-
.text_msg("edited message!")
482+
.text_msg("* edited message!")
483483
.edit(
484484
event_id!("$1"),
485-
RoomMessageEventContentWithoutRelation::text_plain("* edited message!"),
485+
RoomMessageEventContentWithoutRelation::text_plain("edited message!"),
486486
)
487487
.event_id(event_id!("$2"))
488488
.server_ts(MilliSecondsSinceUnixEpoch::now())
489489
.into_raw_sync();
490490

491-
// Mock /event for some timeline events
491+
// Mock /event for some timeline events.
492492
mock_events_endpoint(&server, room_id, vec![edited_event.clone()]).await;
493493

494-
// Load new pinned event contents from sync, where $2 is and edit on $1
494+
// Load new pinned event contents from sync, where $2 is and edit on $1.
495495
let _ = PinnedEventsSync::new(room_id)
496496
.with_timeline_events(vec![edited_event])
497497
.mock_and_sync(&client, &server)
498498
.await
499499
.expect("Sync failed");
500500

501-
// The edit does not replace the original event
501+
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
502+
assert_eq!(timeline_updates.len(), 1);
503+
504+
// The edit does replace the original event.
505+
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
506+
let event = value.as_event().unwrap();
507+
assert_eq!(event.event_id().unwrap(), event_id!("$1"));
508+
assert_eq!(event.content().as_message().unwrap().body(), "edited message!");
509+
510+
// That's all, folks!
502511
assert_pending!(timeline_stream);
503512
}
504513

505514
#[async_test]
506-
async fn test_redacted_events_are_not_reflected_in_sync() {
515+
async fn test_redacted_events_are_reflected_in_sync() {
507516
let server = MatrixMockServer::new().await;
508517
let client = server.client_builder().build().await;
509518
let room_id = room_id!("!test:localhost");
@@ -557,7 +566,15 @@ async fn test_redacted_events_are_not_reflected_in_sync() {
557566
.await
558567
.expect("Sync failed");
559568

560-
// The redaction does not replace the original event
569+
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
570+
assert_eq!(timeline_updates.len(), 1);
571+
572+
// The redaction takes place.
573+
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
574+
let event = value.as_event().unwrap();
575+
assert!(event.content().is_redacted());
576+
577+
// That's all, folks!
561578
assert_pending!(timeline_stream);
562579
}
563580

0 commit comments

Comments
 (0)