Skip to content

refactor(timeline): finish the aggregations refactoring #5046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
2821ace
feat(timeline): handle redaction in the pending aggregations
bnjbvr May 15, 2025
e3ff67f
refactor(timeline): rename `Aggregations::apply` to `Aggregations::ap…
bnjbvr May 15, 2025
4a20671
refactor!(timeline): have `TimelineItem::reactions()` return an `Opti…
bnjbvr May 15, 2025
0c4b539
optimize(timeline): don't eagerly clone an `EventTimelineItem` before…
bnjbvr May 15, 2025
24d64a0
refactor(timeline): simplify a bit `mark_aggregation_as_sent` by havi…
bnjbvr May 15, 2025
fd9f017
refactor(timeline): have `Aggregations::try_remove_aggregation` also …
bnjbvr May 15, 2025
013c451
refactor(timeline): use the aggregations manager for handling edits
bnjbvr May 15, 2025
444dfe3
fix(timeline): properly update encryption info upon edit
bnjbvr May 15, 2025
f2ea16b
refactor(timeline): simplify a few functions as a result of not provi…
bnjbvr May 15, 2025
e038659
optimize(timeline): avoid one clone if a new item has pending aggrega…
bnjbvr May 15, 2025
0431d57
chore: make clippy happy
bnjbvr May 15, 2025
4845fef
refactor(timeline): hey, i can actually remove this pending_edits fie…
bnjbvr May 15, 2025
a1e680f
optimize(timeline): find the position of an event by starting from th…
bnjbvr May 15, 2025
af49a41
refactor(timeline): group extracting the reply and thread root
bnjbvr May 19, 2025
99d740a
refactor(timeline): slightly optimize flow for saving a bundled edit
bnjbvr May 19, 2025
73df64e
refactor(timeline): avoid cloning of `relates_to` for room messages
bnjbvr May 19, 2025
2adfa00
refactor(timeline): address review comments
bnjbvr May 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/matrix-sdk-ui/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ All notable changes to this project will be documented in this file.

## [Unreleased] - ReleaseDate

### Refactor

- [**breaking**] [`TimelineItemContent::reactions()`] returns an `Option<&ReactionsByKeyBySender>`
instead of `ReactionsByKeyBySender`. This reflects the fact that some timeline items cannot hold
reactions at all.

### Bug Fixes

- Introduce `Timeline` regions, which helps to remove a class of bugs in the
Expand Down
541 changes: 440 additions & 101 deletions crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,16 @@ mod tests {
origin: RemoteEventOrigin::Sync,
});

let content = RoomMessageEventContent::text_plain("hi");

TimelineItem::new(
TimelineItemKind::Event(EventTimelineItem::new(
owned_user_id!("@u:s.to"),
TimelineDetails::Pending,
timestamp(),
TimelineItemContent::message(
RoomMessageEventContent::text_plain("hi"),
None,
content.msgtype,
content.mentions,
ReactionsByKeyBySender::default(),
None,
None,
Expand Down
15 changes: 3 additions & 12 deletions crates/matrix-sdk-ui/src/timeline/controller/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@

use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
sync::Arc,
};

use matrix_sdk::ring_buffer::RingBuffer;
use ruma::{EventId, OwnedEventId, OwnedUserId, RoomVersionId};
use tracing::trace;

use super::{
super::{subscriber::skip::SkipCount, TimelineItem, TimelineItemKind, TimelineUniqueId},
read_receipts::ReadReceipts,
Aggregations, AllRemoteEvents, ObservableItemsTransaction, PendingEdit,
Aggregations, AllRemoteEvents, ObservableItemsTransaction,
};
use crate::unable_to_decrypt_hook::UtdHookManager;

Expand Down Expand Up @@ -77,11 +75,10 @@ pub(in crate::timeline) struct TimelineMetadata {
pub aggregations: Aggregations,

/// Given an event, what are all the events that are replies to it?
///
/// Only works for remote events *and* replies which are remote-echoed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename replies to remote_replies? Not convinced, just asking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear in remote_replies which side is remote; in this case, it's both the original and the reply which must have been remote-echoed, and I think this information is more clearly described from the code comments + types used (only OwnedEventId, not TimelineEventItemId anywhere). If you don't mind too much, I'll keep this name.

pub replies: HashMap<OwnedEventId, BTreeSet<OwnedEventId>>,

/// Edit events received before the related event they're editing.
pub pending_edits: RingBuffer<PendingEdit>,

/// Identifier of the fully-read event, helping knowing where to introduce
/// the read marker.
pub fully_read_event: Option<OwnedEventId>,
Expand All @@ -100,10 +97,6 @@ pub(in crate::timeline) struct TimelineMetadata {
pub(super) read_receipts: ReadReceipts,
}

/// Maximum number of stash pending edits.
/// SAFETY: 32 is not 0.
const MAX_NUM_STASHED_PENDING_EDITS: NonZeroUsize = NonZeroUsize::new(32).unwrap();

impl TimelineMetadata {
pub(in crate::timeline) fn new(
own_user_id: OwnedUserId,
Expand All @@ -117,7 +110,6 @@ impl TimelineMetadata {
own_user_id,
next_internal_id: Default::default(),
aggregations: Default::default(),
pending_edits: RingBuffer::new(MAX_NUM_STASHED_PENDING_EDITS),
replies: Default::default(),
fully_read_event: Default::default(),
// It doesn't make sense to set this to false until we fill the `fully_read_event`
Expand All @@ -136,7 +128,6 @@ impl TimelineMetadata {
// ids across timeline clears.
self.aggregations.clear();
self.replies.clear();
self.pending_edits.clear();
self.fully_read_event = None;
// We forgot about the fully read marker right above, so wait for a new one
// before attempting to update it for each new timeline item.
Expand Down
115 changes: 41 additions & 74 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(super) use self::{
AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
ObservableItemsTransactionEntry,
},
state::{PendingEdit, PendingEditKind, TimelineState},
state::TimelineState,
state_transaction::TimelineStateTransaction,
};
use super::{
Expand All @@ -84,7 +84,7 @@ use crate::{
unable_to_decrypt_hook::UtdHookManager,
};

mod aggregations;
pub(in crate::timeline) mod aggregations;
mod decryption_retry_task;
mod metadata;
mod observable_items;
Expand Down Expand Up @@ -607,9 +607,7 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
let prev_status = item
.content()
.reactions()
.get(key)
.and_then(|group| group.get(user_id))
.map(|reaction_info| reaction_info.status.clone());
.and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));

let Some(prev_status) = prev_status else {
match &item.kind {
Expand Down Expand Up @@ -685,7 +683,7 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
return Ok(false);
};

let mut reactions = item.content().reactions().clone();
let mut reactions = item.content().reactions().cloned().unwrap_or_default();
let reaction_info = reactions.remove_reaction(user_id, key);

if reaction_info.is_some() {
Expand All @@ -708,7 +706,8 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
rfind_event_by_id(&state.items, &annotated_event_id)
{
// Re-add the reaction to the mapping.
let mut reactions = item.content().reactions();
let mut reactions =
item.content().reactions().cloned().unwrap_or_default();
reactions
.entry(key.to_owned())
.or_default()
Expand Down Expand Up @@ -893,45 +892,18 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
// If it was just sent, try to find if it matches a corresponding aggregation,
// and mark it as sent in that case.
if let Some(new_event_id) = new_event_id {
match txn
.meta
.aggregations
.mark_aggregation_as_sent(txn_id.to_owned(), new_event_id.to_owned())
{
MarkAggregationSentResult::MarkedSent { update } => {
trace!("marked aggregation as sent");

if let Some((target, aggregation)) = update {
if let Some((item_pos, item)) =
rfind_event_by_item_id(&txn.items, &target)
{
let mut content = item.content().clone();
match aggregation.apply(&mut content) {
ApplyAggregationResult::UpdatedItem => {
trace!("reapplied aggregation in the event");
let internal_id = item.internal_id.to_owned();
let new_item = item.with_content(content);
txn.items.replace(
item_pos,
TimelineItem::new(new_item, internal_id),
);
txn.commit();
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => {
warn!("when reapplying aggregation just marked as sent: {err}");
}
}
}
}

// Early return: we've found the event to mark as sent, it was an
// aggregation.
return;
}

MarkAggregationSentResult::NotFound => {}
if txn.meta.aggregations.mark_aggregation_as_sent(
txn_id.to_owned(),
new_event_id.to_owned(),
&mut txn.items,
&txn.meta.room_version,
) {
trace!("Aggregation marked as sent");
txn.commit();
return;
}

trace!("Sent aggregation was not found");
}

warn!("Timeline item not found, can't update send state");
Expand Down Expand Up @@ -986,37 +958,26 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {

// Avoid multiple mutable and immutable borrows of the lock guard by explicitly
// dereferencing it once.
let state = &mut *state;
let mut txn = state.transaction();

// Look if this was a local aggregation.
if let Some((target, aggregation)) = state
.meta
.aggregations
.try_remove_aggregation(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
{
let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, target) else {
warn!("missing target item for a local aggregation");
return false;
};

let mut content = item.content().clone();
match aggregation.unapply(&mut content) {
ApplyAggregationResult::UpdatedItem => {
trace!("removed local reaction to local echo");
let internal_id = item.internal_id.clone();
let new_item = item.with_content(content);
state.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => {
warn!("when undoing local aggregation: {err}");
}
let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
&TimelineEventItemId::TransactionId(txn_id.to_owned()),
&mut txn.items,
) {
Ok(val) => val,
Err(err) => {
warn!("error when discarding local echo for an aggregation: {err}");
// The aggregation has been found, it's just that we couldn't discard it.
true
}
};

return true;
if found_aggregation {
txn.commit();
}

false
found_aggregation
}

pub(super) async fn replace_local_echo(
Expand Down Expand Up @@ -1056,9 +1017,9 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
// Replace the local-related state (kind) and the content state.
let new_item = TimelineItem::new(
prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
content,
None,
prev_item.content().reactions(),
content.msgtype,
content.mentions,
prev_item.content().reactions().cloned().unwrap_or_default(),
prev_item.content().thread_root(),
prev_item.content().in_reply_to(),
prev_item.content().thread_summary(),
Expand Down Expand Up @@ -1281,7 +1242,13 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
);

tr.meta.aggregations.add(target.clone(), aggregation.clone());
find_item_and_apply_aggregation(&mut tr.items, &target, aggregation);
find_item_and_apply_aggregation(
&tr.meta.aggregations,
&mut tr.items,
&target,
aggregation,
&tr.meta.room_version,
);

tr.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ impl<'observable_items> ObservableItemsTransaction<'observable_items> {
self.all_remote_events.get_by_event_id(event_id)
}

/// Get the position of an event in the events array by its ID.
pub fn position_by_event_id(&self, event_id: &EventId) -> Option<usize> {
self.all_remote_events.position_by_event_id(event_id)
}

/// Replace a timeline item at position `timeline_item_index` by
/// `timeline_item`.
pub fn replace(
Expand Down Expand Up @@ -1940,6 +1945,17 @@ impl AllRemoteEvents {
self.0.iter().rev().find(|event_meta| event_meta.event_id == event_id)
}

/// Get the position of an event in the events array by its ID.
pub fn position_by_event_id(&self, event_id: &EventId) -> Option<usize> {
// Reverse the iterator to start looking at the end. Since this will give us the
// "reverse" position, reverse the index after finding the event.
self.0
.iter()
.enumerate()
.rev()
.find_map(|(i, event_meta)| (event_meta.event_id == event_id).then_some(i))
}

/// Shift to the right all timeline item indexes that are equal to or
/// greater than `new_timeline_item_index`.
fn increment_all_timeline_item_index_after(&mut self, new_timeline_item_index: usize) {
Expand Down
42 changes: 2 additions & 40 deletions crates/matrix-sdk-ui/src/timeline/controller/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ use matrix_sdk::{deserialized_responses::TimelineEvent, send_queue::SendHandle};
#[cfg(test)]
use ruma::events::receipt::ReceiptEventContent;
use ruma::{
events::{
poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncTimelineEvent,
},
events::{AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent},
serde::Raw,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
RoomVersionId,
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId,
};
use tracing::{instrument, trace, warn};

Expand Down Expand Up @@ -283,36 +278,3 @@ impl TimelineState {
TimelineStateTransaction::new(&mut self.items, &mut self.meta, self.timeline_focus)
}
}

#[derive(Clone)]
pub(in crate::timeline) enum PendingEditKind {
RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
}

#[derive(Clone)]
pub(in crate::timeline) struct PendingEdit {
pub kind: PendingEditKind,
pub event_json: Raw<AnySyncTimelineEvent>,
}

impl PendingEdit {
pub fn edited_event(&self) -> &EventId {
match &self.kind {
PendingEditKind::RoomMessage(Replacement { event_id, .. })
| PendingEditKind::Poll(Replacement { event_id, .. }) => event_id,
}
}
}

#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for PendingEdit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
PendingEditKind::RoomMessage(_) => {
f.debug_struct("RoomMessage").finish_non_exhaustive()
}
PendingEditKind::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
}
}
}
Loading
Loading