Skip to content

feat(timeline): handle live aggregations on non-live timelines #5060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,26 +392,22 @@ async fn room_event_cache_updates_task(

RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
trace!("Received new timeline events diffs");

// We shouldn't use the general way of adding events to timelines to
// non-live timelines, such as pinned events or focused timeline.
// These timelines should handle any live updates by themselves.
if !is_live {
continue;
let origin = match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
};

let has_diffs = !diffs.is_empty();

if is_live {
timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
} else {
// Only handle the remote aggregation for a non-live timeline.
timeline_controller.handle_remote_aggregations(diffs, origin).await;
}

timeline_controller
.handle_remote_events_with_diffs(
diffs,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
},
)
.await;

if matches!(origin, EventsOrigin::Cache) {
if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
timeline_controller.retry_event_decryption(None).await;
}
}
Expand Down
16 changes: 16 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,22 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
.await
}

/// Only handle aggregations received as [`VectorDiff`]s.
pub(super) async fn handle_remote_aggregations(
&self,
diffs: Vec<VectorDiff<TimelineEvent>>,
origin: RemoteEventOrigin,
) {
if diffs.is_empty() {
return;
}

let mut state = self.state.write().await;
state
.handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
.await
}

pub(super) async fn clear(&self) {
self.state.write().await.clear();
}
Expand Down
19 changes: 19 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/controller/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,25 @@ impl TimelineState {
transaction.commit();
}

/// Handle remote aggregations on events as [`VectorDiff`]s.
pub(super) async fn handle_remote_aggregations<RoomData>(
&mut self,
diffs: Vec<VectorDiff<TimelineEvent>>,
origin: RemoteEventOrigin,
room_data: &RoomData,
settings: &TimelineSettings,
) where
RoomData: RoomDataProvider,
{
if diffs.is_empty() {
return;
}

let mut transaction = self.transaction();
transaction.handle_remote_aggregations(diffs, origin, room_data, settings).await;
transaction.commit();
}

/// Marks the given event as fully read, using the read marker received from
/// sync.
pub(super) fn handle_fully_read_marker(&mut self, fully_read_event_id: OwnedEventId) {
Expand Down
153 changes: 153 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,159 @@ impl<'a> TimelineStateTransaction<'a> {
self.check_invariants();
}

async fn handle_remote_aggregation<RoomData>(
&mut self,
event: TimelineEvent,
position: TimelineItemPosition,
room_data_provider: &RoomData,
date_divider_adjuster: &mut DateDividerAdjuster,
) where
RoomData: RoomDataProvider,
{
let deserialized = match event.raw().deserialize() {
Ok(deserialized) => deserialized,
Err(err) => {
warn!("Failed to deserialize timeline event: {err}");
return;
}
};

let sender = deserialized.sender().to_owned();
let timestamp = deserialized.origin_server_ts();
let event_id = deserialized.event_id().to_owned();
let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);

if let Some(action @ TimelineAction::HandleAggregation { .. }) = TimelineAction::from_event(
deserialized,
event.raw(),
room_data_provider,
None,
&self.items,
&mut self.meta,
)
.await
{
let encryption_info = event.kind.encryption_info().cloned();

let sender_profile = room_data_provider.profile_from_user_id(&sender).await;

let ctx = TimelineEventContext {
sender,
sender_profile,
timestamp,
// These are not used when handling an aggregation.
read_receipts: Default::default(),
is_highlighted: false,
flow: Flow::Remote {
event_id: event_id.clone(),
raw_event: event.raw().clone(),
encryption_info,
txn_id,
position,
},
// This field is not used when handling an aggregation.
should_add_new_items: false,
};

TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, action).await;
}
}

/// Handle a set of live remote aggregations on events as [`VectorDiff`]s.
///
/// This is like `handle_remote_events`, with two key differences:
/// - it only applies to aggregated events, not all the sync events.
/// - it will also not add the events to the `all_remote_events` array
/// itself.
pub(super) async fn handle_remote_aggregations<RoomData>(
&mut self,
diffs: Vec<VectorDiff<TimelineEvent>>,
origin: RemoteEventOrigin,
room_data_provider: &RoomData,
settings: &TimelineSettings,
) where
RoomData: RoomDataProvider,
{
let mut date_divider_adjuster =
DateDividerAdjuster::new(settings.date_divider_mode.clone());

for diff in diffs {
match diff {
VectorDiff::Append { values: events } => {
for event in events {
self.handle_remote_aggregation(
event,
TimelineItemPosition::End { origin },
room_data_provider,
&mut date_divider_adjuster,
)
.await;
}
}

VectorDiff::PushFront { value: event } => {
self.handle_remote_aggregation(
event,
TimelineItemPosition::Start { origin },
room_data_provider,
&mut date_divider_adjuster,
)
.await;
}

VectorDiff::PushBack { value: event } => {
self.handle_remote_aggregation(
event,
TimelineItemPosition::End { origin },
room_data_provider,
&mut date_divider_adjuster,
)
.await;
}

VectorDiff::Insert { index: event_index, value: event } => {
self.handle_remote_aggregation(
event,
TimelineItemPosition::At { event_index, origin },
room_data_provider,
&mut date_divider_adjuster,
)
.await;
}

VectorDiff::Set { index: event_index, value: event } => {
if let Some(timeline_item_index) = self
.items
.all_remote_events()
.get(event_index)
.and_then(|meta| meta.timeline_item_index)
{
self.handle_remote_aggregation(
event,
TimelineItemPosition::UpdateAt { timeline_item_index },
room_data_provider,
&mut date_divider_adjuster,
)
.await;
} else {
warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
}
}

VectorDiff::Remove { .. } | VectorDiff::Clear => {
// Do nothing. An aggregated redaction comes with a
// redaction event, or as a redacted event in the first
// place.
}

v => unimplemented!("{v:?}"),
}
}

self.adjust_date_dividers(date_divider_adjuster);
self.check_invariants();
}

fn check_invariants(&self) {
self.check_no_duplicate_read_receipts();
self.check_no_unused_unique_ids();
Expand Down
14 changes: 11 additions & 3 deletions crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn test_new_focused() {
}

#[async_test]
async fn test_focused_timeline_does_not_react() {
async fn test_live_aggregations_are_reflected_on_focused_timelines() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client_with_server().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
Expand Down Expand Up @@ -254,8 +254,16 @@ async fn test_focused_timeline_does_not_react() {
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

// Nothing was received by the focused event timeline
assert_pending!(timeline_stream);
// We only receive one updated for the reaction, from the timeline stream.
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]);

let event_item = item.as_event().unwrap();
assert_eq!(event_item.content().as_message().unwrap().body(), "yolo");
let reactions = event_item.content().reactions().cloned().unwrap_or_default();
assert_eq!(reactions.len(), 1);
let _ = reactions["👍"][*BOB];
}

#[async_test]
Expand Down
39 changes: 28 additions & 11 deletions crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async fn test_pinned_timeline_with_pinned_utd_on_sync_contains_it() {
}

#[async_test]
async fn test_edited_events_are_not_reflected_in_sync() {
async fn test_edited_events_are_reflected_in_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!test:localhost");
Expand All @@ -452,11 +452,11 @@ async fn test_edited_events_are_not_reflected_in_sync() {
.server_ts(MilliSecondsSinceUnixEpoch::now())
.into_raw_sync();

// Mock /event for some timeline events
// Mock /event for some timeline events.
mock_events_endpoint(&server, room_id, vec![pinned_event]).await;

// Load initial timeline items: a text message and a `m.room.pinned_events` with
// event $1
// event $1.
let room = PinnedEventsSync::new(room_id)
.with_pinned_event_ids(vec!["$1"])
.mock_and_sync(&client, &server)
Expand All @@ -470,7 +470,7 @@ async fn test_edited_events_are_not_reflected_in_sync() {
"there should be no live back-pagination status for a focused timeline"
);

// Load timeline items
// Load timeline items.
let (items, mut timeline_stream) = timeline.subscribe().await;

assert_eq!(items.len(), 1 + 1); // event item + a date divider
Expand All @@ -479,31 +479,40 @@ async fn test_edited_events_are_not_reflected_in_sync() {
assert_pending!(timeline_stream);

let edited_event = f
.text_msg("edited message!")
.text_msg("* edited message!")
.edit(
event_id!("$1"),
RoomMessageEventContentWithoutRelation::text_plain("* edited message!"),
RoomMessageEventContentWithoutRelation::text_plain("edited message!"),
)
.event_id(event_id!("$2"))
.server_ts(MilliSecondsSinceUnixEpoch::now())
.into_raw_sync();

// Mock /event for some timeline events
// Mock /event for some timeline events.
mock_events_endpoint(&server, room_id, vec![edited_event.clone()]).await;

// Load new pinned event contents from sync, where $2 is and edit on $1
// Load new pinned event contents from sync, where $2 is and edit on $1.
let _ = PinnedEventsSync::new(room_id)
.with_timeline_events(vec![edited_event])
.mock_and_sync(&client, &server)
.await
.expect("Sync failed");

// The edit does not replace the original event
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 1);

// The edit does replace the original event.
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event = value.as_event().unwrap();
assert_eq!(event.event_id().unwrap(), event_id!("$1"));
assert_eq!(event.content().as_message().unwrap().body(), "edited message!");

// That's all, folks!
assert_pending!(timeline_stream);
}

#[async_test]
async fn test_redacted_events_are_not_reflected_in_sync() {
async fn test_redacted_events_are_reflected_in_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!test:localhost");
Expand Down Expand Up @@ -557,7 +566,15 @@ async fn test_redacted_events_are_not_reflected_in_sync() {
.await
.expect("Sync failed");

// The redaction does not replace the original event
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 1);

// The redaction takes place.
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event = value.as_event().unwrap();
assert!(event.content().is_redacted());

// That's all, folks!
assert_pending!(timeline_stream);
}

Expand Down
Loading