Skip to content

Commit bd83937

Browse files
committed
feat(timeline): handle aggregations only in non-live timelines
1 parent 6f8b744 commit bd83937

File tree

6 files changed

+241
-32
lines changed

6 files changed

+241
-32
lines changed

crates/matrix-sdk-ui/src/timeline/builder.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -392,26 +392,22 @@ async fn room_event_cache_updates_task(
392392

393393
RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
394394
trace!("Received new timeline events diffs");
395-
396-
// We shouldn't use the general way of adding events to timelines to
397-
// non-live timelines, such as pinned events or focused timeline.
398-
// These timelines should handle any live updates by themselves.
399-
if !is_live {
400-
continue;
395+
let origin = match origin {
396+
EventsOrigin::Sync => RemoteEventOrigin::Sync,
397+
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
398+
EventsOrigin::Cache => RemoteEventOrigin::Cache,
399+
};
400+
401+
let has_diffs = !diffs.is_empty();
402+
403+
if is_live {
404+
timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
405+
} else {
406+
// Only handle the remote aggregation for a non-live timeline.
407+
timeline_controller.handle_remote_aggregations(diffs, origin).await;
401408
}
402409

403-
timeline_controller
404-
.handle_remote_events_with_diffs(
405-
diffs,
406-
match origin {
407-
EventsOrigin::Sync => RemoteEventOrigin::Sync,
408-
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
409-
EventsOrigin::Cache => RemoteEventOrigin::Cache,
410-
},
411-
)
412-
.await;
413-
414-
if matches!(origin, EventsOrigin::Cache) {
410+
if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
415411
timeline_controller.retry_event_decryption(None).await;
416412
}
417413
}

crates/matrix-sdk-ui/src/timeline/controller/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,22 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
753753
.await
754754
}
755755

756+
/// Only handle aggregations received as [`VectorDiff`]s.
757+
pub(super) async fn handle_remote_aggregations(
758+
&self,
759+
diffs: Vec<VectorDiff<TimelineEvent>>,
760+
origin: RemoteEventOrigin,
761+
) {
762+
if diffs.is_empty() {
763+
return;
764+
}
765+
766+
let mut state = self.state.write().await;
767+
state
768+
.handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
769+
.await
770+
}
771+
756772
pub(super) async fn clear(&self) {
757773
self.state.write().await.clear();
758774
}

crates/matrix-sdk-ui/src/timeline/controller/state.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ impl TimelineState {
9191
transaction.commit();
9292
}
9393

94+
/// Handle remote aggregations on events as [`VectorDiff`]s.
95+
pub(super) async fn handle_remote_aggregations<RoomData>(
96+
&mut self,
97+
diffs: Vec<VectorDiff<TimelineEvent>>,
98+
origin: RemoteEventOrigin,
99+
room_data: &RoomData,
100+
settings: &TimelineSettings,
101+
) where
102+
RoomData: RoomDataProvider,
103+
{
104+
if diffs.is_empty() {
105+
return;
106+
}
107+
108+
let mut transaction = self.transaction();
109+
transaction.handle_remote_aggregations(diffs, origin, room_data, settings).await;
110+
transaction.commit();
111+
}
112+
94113
/// Marks the given event as fully read, using the read marker received from
95114
/// sync.
96115
pub(super) fn handle_fully_read_marker(&mut self, fully_read_event_id: OwnedEventId) {

crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,159 @@ impl<'a> TimelineStateTransaction<'a> {
177177
self.check_invariants();
178178
}
179179

180+
async fn handle_remote_aggregation<RoomData>(
181+
&mut self,
182+
event: TimelineEvent,
183+
position: TimelineItemPosition,
184+
room_data_provider: &RoomData,
185+
date_divider_adjuster: &mut DateDividerAdjuster,
186+
) where
187+
RoomData: RoomDataProvider,
188+
{
189+
let deserialized = match event.raw().deserialize() {
190+
Ok(deserialized) => deserialized,
191+
Err(err) => {
192+
warn!("Failed to deserialize timeline event: {err}");
193+
return;
194+
}
195+
};
196+
197+
let sender = deserialized.sender().to_owned();
198+
let timestamp = deserialized.origin_server_ts();
199+
let event_id = deserialized.event_id().to_owned();
200+
let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
201+
202+
if let Some(action @ TimelineAction::HandleAggregation { .. }) = TimelineAction::from_event(
203+
deserialized,
204+
&event.raw(),
205+
room_data_provider,
206+
None,
207+
&self.items,
208+
&mut self.meta,
209+
)
210+
.await
211+
{
212+
let encryption_info = event.kind.encryption_info().cloned();
213+
214+
let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
215+
216+
let ctx = TimelineEventContext {
217+
sender,
218+
sender_profile,
219+
timestamp,
220+
// These are not used when handling an aggregation.
221+
read_receipts: Default::default(),
222+
is_highlighted: false,
223+
flow: Flow::Remote {
224+
event_id: event_id.clone(),
225+
raw_event: event.raw().clone(),
226+
encryption_info,
227+
txn_id,
228+
position,
229+
},
230+
// This field is not used when handling an aggregation.
231+
should_add_new_items: false,
232+
};
233+
234+
TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, action).await;
235+
}
236+
}
237+
238+
/// Handle a set of live remote aggregations on events as [`VectorDiff`]s.
239+
///
240+
/// This is like `handle_remote_events`, with two key differences:
241+
/// - it only applies to aggregated events, not all the sync events.
242+
/// - it will also not add the events to the `all_remote_events` array
243+
/// itself.
244+
pub(super) async fn handle_remote_aggregations<RoomData>(
245+
&mut self,
246+
diffs: Vec<VectorDiff<TimelineEvent>>,
247+
origin: RemoteEventOrigin,
248+
room_data_provider: &RoomData,
249+
settings: &TimelineSettings,
250+
) where
251+
RoomData: RoomDataProvider,
252+
{
253+
let mut date_divider_adjuster =
254+
DateDividerAdjuster::new(settings.date_divider_mode.clone());
255+
256+
for diff in diffs {
257+
match diff {
258+
VectorDiff::Append { values: events } => {
259+
for event in events {
260+
self.handle_remote_aggregation(
261+
event,
262+
TimelineItemPosition::End { origin },
263+
room_data_provider,
264+
&mut date_divider_adjuster,
265+
)
266+
.await;
267+
}
268+
}
269+
270+
VectorDiff::PushFront { value: event } => {
271+
self.handle_remote_aggregation(
272+
event,
273+
TimelineItemPosition::Start { origin },
274+
room_data_provider,
275+
&mut date_divider_adjuster,
276+
)
277+
.await;
278+
}
279+
280+
VectorDiff::PushBack { value: event } => {
281+
self.handle_remote_aggregation(
282+
event,
283+
TimelineItemPosition::End { origin },
284+
room_data_provider,
285+
&mut date_divider_adjuster,
286+
)
287+
.await;
288+
}
289+
290+
VectorDiff::Insert { index: event_index, value: event } => {
291+
self.handle_remote_aggregation(
292+
event,
293+
TimelineItemPosition::At { event_index, origin },
294+
room_data_provider,
295+
&mut date_divider_adjuster,
296+
)
297+
.await;
298+
}
299+
300+
VectorDiff::Set { index: event_index, value: event } => {
301+
if let Some(timeline_item_index) = self
302+
.items
303+
.all_remote_events()
304+
.get(event_index)
305+
.and_then(|meta| meta.timeline_item_index)
306+
{
307+
self.handle_remote_aggregation(
308+
event,
309+
TimelineItemPosition::UpdateAt { timeline_item_index },
310+
room_data_provider,
311+
&mut date_divider_adjuster,
312+
)
313+
.await;
314+
} else {
315+
warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
316+
}
317+
}
318+
319+
VectorDiff::Remove { .. } | VectorDiff::Clear => {
320+
// Do nothing. An aggregated redaction comes with a
321+
// redaction event, or as a redacted event in the first
322+
// place.
323+
}
324+
325+
v => unimplemented!("{v:?}"),
326+
}
327+
}
328+
329+
self.adjust_date_dividers(date_divider_adjuster);
330+
self.check_invariants();
331+
}
332+
180333
fn check_invariants(&self) {
181334
self.check_no_duplicate_read_receipts();
182335
self.check_no_unused_unique_ids();

crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ async fn test_new_focused() {
185185
}
186186

187187
#[async_test]
188-
async fn test_focused_timeline_does_not_react() {
188+
async fn test_live_aggregations_are_reflected_on_focused_timelines() {
189189
let room_id = room_id!("!a98sd12bjh:example.org");
190190
let (client, server) = logged_in_client_with_server().await;
191191
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
@@ -254,8 +254,16 @@ async fn test_focused_timeline_does_not_react() {
254254
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
255255
server.reset().await;
256256

257-
// Nothing was received by the focused event timeline
258-
assert_pending!(timeline_stream);
257+
// We only receive one updated for the reaction, from the timeline stream.
258+
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
259+
assert_eq!(timeline_updates.len(), 1);
260+
assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]);
261+
262+
let event_item = item.as_event().unwrap();
263+
assert_eq!(event_item.content().as_message().unwrap().body(), "yolo");
264+
let reactions = event_item.content().reactions().cloned().unwrap_or_default();
265+
assert_eq!(reactions.len(), 1);
266+
let _ = reactions["👍"][*BOB];
259267
}
260268

261269
#[async_test]

crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ async fn test_pinned_timeline_with_pinned_utd_on_sync_contains_it() {
440440
}
441441

442442
#[async_test]
443-
async fn test_edited_events_are_not_reflected_in_sync() {
443+
async fn test_edited_events_are_reflected_in_sync() {
444444
let server = MatrixMockServer::new().await;
445445
let client = server.client_builder().build().await;
446446
let room_id = room_id!("!test:localhost");
@@ -452,11 +452,11 @@ async fn test_edited_events_are_not_reflected_in_sync() {
452452
.server_ts(MilliSecondsSinceUnixEpoch::now())
453453
.into_raw_sync();
454454

455-
// Mock /event for some timeline events
455+
// Mock /event for some timeline events.
456456
mock_events_endpoint(&server, room_id, vec![pinned_event]).await;
457457

458458
// Load initial timeline items: a text message and a `m.room.pinned_events` with
459-
// event $1
459+
// event $1.
460460
let room = PinnedEventsSync::new(room_id)
461461
.with_pinned_event_ids(vec!["$1"])
462462
.mock_and_sync(&client, &server)
@@ -470,7 +470,7 @@ async fn test_edited_events_are_not_reflected_in_sync() {
470470
"there should be no live back-pagination status for a focused timeline"
471471
);
472472

473-
// Load timeline items
473+
// Load timeline items.
474474
let (items, mut timeline_stream) = timeline.subscribe().await;
475475

476476
assert_eq!(items.len(), 1 + 1); // event item + a date divider
@@ -479,31 +479,40 @@ async fn test_edited_events_are_not_reflected_in_sync() {
479479
assert_pending!(timeline_stream);
480480

481481
let edited_event = f
482-
.text_msg("edited message!")
482+
.text_msg("* edited message!")
483483
.edit(
484484
event_id!("$1"),
485-
RoomMessageEventContentWithoutRelation::text_plain("* edited message!"),
485+
RoomMessageEventContentWithoutRelation::text_plain("edited message!"),
486486
)
487487
.event_id(event_id!("$2"))
488488
.server_ts(MilliSecondsSinceUnixEpoch::now())
489489
.into_raw_sync();
490490

491-
// Mock /event for some timeline events
491+
// Mock /event for some timeline events.
492492
mock_events_endpoint(&server, room_id, vec![edited_event.clone()]).await;
493493

494-
// Load new pinned event contents from sync, where $2 is and edit on $1
494+
// Load new pinned event contents from sync, where $2 is and edit on $1.
495495
let _ = PinnedEventsSync::new(room_id)
496496
.with_timeline_events(vec![edited_event])
497497
.mock_and_sync(&client, &server)
498498
.await
499499
.expect("Sync failed");
500500

501-
// The edit does not replace the original event
501+
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
502+
assert_eq!(timeline_updates.len(), 1);
503+
504+
// The edit does replace the original event.
505+
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
506+
let event = value.as_event().unwrap();
507+
assert_eq!(event.event_id().unwrap(), event_id!("$1"));
508+
assert_eq!(event.content().as_message().unwrap().body(), "edited message!");
509+
510+
// That's all, folks!
502511
assert_pending!(timeline_stream);
503512
}
504513

505514
#[async_test]
506-
async fn test_redacted_events_are_not_reflected_in_sync() {
515+
async fn test_redacted_events_are_reflected_in_sync() {
507516
let server = MatrixMockServer::new().await;
508517
let client = server.client_builder().build().await;
509518
let room_id = room_id!("!test:localhost");
@@ -557,7 +566,15 @@ async fn test_redacted_events_are_not_reflected_in_sync() {
557566
.await
558567
.expect("Sync failed");
559568

560-
// The redaction does not replace the original event
569+
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
570+
assert_eq!(timeline_updates.len(), 1);
571+
572+
// The redaction takes place.
573+
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
574+
let event = value.as_event().unwrap();
575+
assert!(event.content().is_redacted());
576+
577+
// That's all, folks!
561578
assert_pending!(timeline_stream);
562579
}
563580

0 commit comments

Comments
 (0)