Skip to content

fix(event cache): don't remove a gap when it's the only chunk in memory #4779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 11, 2025
4 changes: 4 additions & 0 deletions crates/matrix-sdk-base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file.

### Features

- [**breaking**] The `Client::subscribe_to_ignore_user_list_changes()` method will now only trigger
whenever the ignored user list has changed from what was previously known, instead of triggering
every time an ignore-user-list event has been received from sync.
([#4779](https://github.com/matrix-org/matrix-rust-sdk/pull/4779))
- [**breaking**] The `MediaRetentionPolicy` can now trigger regular cleanups
with its new `cleanup_frequency` setting.
([#4603](https://github.com/matrix-org/matrix-rust-sdk/pull/4603))
Expand Down
109 changes: 105 additions & 4 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,9 +1254,10 @@ impl BaseClient {

{
let _sync_lock = self.sync_lock().lock().await;
let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
self.store.save_changes(&changes).await?;
*self.store.sync_token.write().await = Some(response.next_batch.clone());
self.apply_changes(&changes, room_info_notable_updates);
self.apply_changes(&changes, room_info_notable_updates, prev_ignored_user_list);
}

// Now that all the rooms information have been saved, update the display name
Expand Down Expand Up @@ -1286,10 +1287,17 @@ impl BaseClient {
Ok(response)
}

pub(crate) async fn load_previous_ignored_user_list(
&self,
) -> Option<Raw<IgnoredUserListEvent>> {
self.store().get_account_data_event_static().await.ok().flatten()
}

pub(crate) fn apply_changes(
&self,
changes: &StateChanges,
room_info_notable_updates: BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
prev_ignored_user_list: Option<Raw<IgnoredUserListEvent>>,
) {
if let Some(event) = changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
{
Expand All @@ -1298,8 +1306,27 @@ impl BaseClient {
let user_ids: Vec<String> =
event.content.ignored_users.keys().map(|id| id.to_string()).collect();

self.ignore_user_list_changes.set(user_ids);
// Try to only trigger the observable if the ignored user list has changed,
// from the previous time we've seen it. If we couldn't load the previous event
// for any reason, always trigger.
if let Some(prev_user_ids) =
prev_ignored_user_list.and_then(|raw| raw.deserialize().ok()).map(|event| {
event
.content
.ignored_users
.keys()
.map(|id| id.to_string())
.collect::<Vec<_>>()
})
{
if user_ids != prev_user_ids {
self.ignore_user_list_changes.set(user_ids);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be a good use case for update_if here, though it's not blocking the approval of this patch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think update_if would trigger an update the first time it's set to a non default value, right? or we'd need to initialize the ignore_user_list_changes with an initial value without having it trigger, or something? might be nice actually, but yeah, follow up

}
} else {
self.ignore_user_list_changes.set(user_ids);
}
}

Err(error) => {
error!("Failed to deserialize ignored user list event: {error}")
}
Expand Down Expand Up @@ -1419,8 +1446,9 @@ impl BaseClient {
room_info.mark_members_synced();
changes.add_room(room_info);

let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
self.store.save_changes(&changes).await?;
self.apply_changes(&changes, Default::default());
self.apply_changes(&changes, Default::default(), prev_ignored_user_list);

let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);

Expand Down Expand Up @@ -1758,9 +1786,11 @@ fn handle_room_member_event_for_profiles(

#[cfg(test)]
mod tests {
use assert_matches2::assert_let;
use futures_util::FutureExt as _;
use matrix_sdk_test::{
async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder,
LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
};
use ruma::{
api::client as api, event_id, events::room::member::MembershipState, room_id, serde::Raw,
Expand Down Expand Up @@ -2152,4 +2182,75 @@ mod tests {
assert_eq!(member.display_name().unwrap(), "Invited Alice");
assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
}

#[async_test]
async fn test_ignored_user_list_changes() {
let user_id = user_id!("@alice:example.org");
let client = BaseClient::with_store_config(StoreConfig::new(
"cross-process-store-locks-holder-name".to_owned(),
));
client
.set_session_meta(
SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
#[cfg(feature = "e2e-encryption")]
None,
)
.await
.unwrap();

let mut subscriber = client.subscribe_to_ignore_user_list_changes();
assert!(subscriber.next().now_or_never().is_none());

let mut sync_builder = SyncResponseBuilder::new();
let response = sync_builder
.add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
json!({
"content": {
"ignored_users": {
*BOB: {}
}
},
"type": "m.ignored_user_list",
}),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();

assert_let!(Some(ignored) = subscriber.next().await);
assert_eq!(ignored, [BOB.to_string()]);

// Receive the same response.
let response = sync_builder
.add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
json!({
"content": {
"ignored_users": {
*BOB: {}
}
},
"type": "m.ignored_user_list",
}),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();

// No changes in the ignored list.
assert!(subscriber.next().now_or_never().is_none());

// Now remove Bob from the ignored list.
let response = sync_builder
.add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
json!({
"content": {
"ignored_users": {}
},
"type": "m.ignored_user_list",
}),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();

assert_let!(Some(ignored) = subscriber.next().await);
assert!(ignored.is_empty());
}
}
10 changes: 5 additions & 5 deletions crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2534,7 +2534,7 @@ mod tests {
client
.handle_room_account_data(room_id, &[tag_raw], &mut changes, &mut Default::default())
.await;
client.apply_changes(&changes, Default::default());
client.apply_changes(&changes, Default::default(), None);

// The `RoomInfo` is getting notified.
assert_ready!(room_info_subscriber);
Expand All @@ -2555,7 +2555,7 @@ mod tests {
client
.handle_room_account_data(room_id, &[tag_raw], &mut changes, &mut Default::default())
.await;
client.apply_changes(&changes, Default::default());
client.apply_changes(&changes, Default::default(), None);

// The `RoomInfo` is getting notified.
assert_ready!(room_info_subscriber);
Expand Down Expand Up @@ -2614,7 +2614,7 @@ mod tests {
client
.handle_room_account_data(room_id, &[tag_raw], &mut changes, &mut Default::default())
.await;
client.apply_changes(&changes, Default::default());
client.apply_changes(&changes, Default::default(), None);

// The `RoomInfo` is getting notified.
assert_ready!(room_info_subscriber);
Expand All @@ -2635,7 +2635,7 @@ mod tests {
client
.handle_room_account_data(room_id, &[tag_raw], &mut changes, &mut Default::default())
.await;
client.apply_changes(&changes, Default::default());
client.apply_changes(&changes, Default::default(), None);

// The `RoomInfo` is getting notified.
assert_ready!(room_info_subscriber);
Expand Down Expand Up @@ -3197,7 +3197,7 @@ mod tests {
assert!(room_info_notable_update.try_recv().is_err());

// Then updating the room info will store the event,
client.apply_changes(&changes, room_info_notable_updates);
client.apply_changes(&changes, room_info_notable_updates, None);
assert_eq!(room.latest_event().unwrap().event_id(), event.event_id());

// And wake up the subscriber.
Expand Down
6 changes: 4 additions & 2 deletions crates/matrix-sdk-base/src/sliding_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ impl BaseClient {
.await?;

trace!("ready to submit e2ee changes to store");
let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
self.store.save_changes(&changes).await?;
self.apply_changes(&changes, room_info_notable_updates);
self.apply_changes(&changes, room_info_notable_updates, prev_ignored_user_list);
trace!("applied e2ee changes");

Ok(Some(to_device))
Expand Down Expand Up @@ -324,8 +325,9 @@ impl BaseClient {
changes.ambiguity_maps = ambiguity_cache.cache;

trace!("ready to submit changes to store");
let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
store.save_changes(&changes).await?;
self.apply_changes(&changes, room_info_notable_updates);
self.apply_changes(&changes, room_info_notable_updates, prev_ignored_user_list);
trace!("applied changes");

// Now that all the rooms information have been saved, update the display name
Expand Down
9 changes: 4 additions & 5 deletions crates/matrix-sdk-common/src/linked_chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ mod updates;
use std::{
fmt,
marker::PhantomData,
ops::Not,
ptr::NonNull,
sync::atomic::{AtomicU64, Ordering},
};
Expand Down Expand Up @@ -354,7 +353,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
// We need to update `self.links.last` if and only if `last_chunk` _is not_ the
// first chunk, and _is_ the last chunk (ensured by the `debug_assert!`
// above).
if last_chunk.is_first_chunk().not() {
if !last_chunk.is_first_chunk() {
// Maybe `last_chunk` is the same as the previous `self.links.last` chunk, but
// it's OK.
self.links.last = Some(last_chunk.as_ptr());
Expand Down Expand Up @@ -455,7 +454,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {

// We need to update `self.links.last` if and only if `chunk` _is not_ the first
// chunk, and _is_ the last chunk.
if chunk.is_first_chunk().not() && chunk.is_last_chunk() {
if !chunk.is_first_chunk() && chunk.is_last_chunk() {
// Maybe `chunk` is the same as the previous `self.links.last` chunk, but it's
// OK.
self.links.last = Some(chunk.as_ptr());
Expand Down Expand Up @@ -509,7 +508,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {

// If removing empty chunk is desired, and if the `chunk` can be unlinked, and
// if the `chunk` is not the first one, we can remove it.
if can_unlink_chunk && chunk.is_first_chunk().not() {
if can_unlink_chunk && !chunk.is_first_chunk() {
// Unlink `chunk`.
chunk.unlink(self.updates.as_mut());

Expand Down Expand Up @@ -682,7 +681,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {

// We need to update `self.links.last` if and only if `chunk` _is not_ the first
// chunk, and _is_ the last chunk.
if chunk.is_first_chunk().not() && chunk.is_last_chunk() {
if !chunk.is_first_chunk() && chunk.is_last_chunk() {
// Maybe `chunk` is the same as the previous `self.links.last` chunk, but it's
// OK.
self.links.last = Some(chunk.as_ptr());
Expand Down
37 changes: 33 additions & 4 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,15 @@ impl SqliteEventCacheStore {
}

async fn acquire(&self) -> Result<SqliteAsyncConn> {
Ok(self.pool.get().await?)
let connection = self.pool.get().await?;

// Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key support must be
// enabled on a per-connection basis. Execute it every time we try to get a
// connection, since we can't guarantee a previous connection did enable
// it before.
connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


Ok(connection)
}

fn map_row_to_chunk(
Expand Down Expand Up @@ -302,6 +310,9 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
return Ok(());
}

// Always enable foreign keys for the current connection.
conn.execute_batch("PRAGMA foreign_keys = ON;").await?;

if version < 1 {
// First turn on WAL mode, this can't be done in the transaction, it fails with
// the error message: "cannot change into wal mode from within a transaction".
Expand All @@ -322,9 +333,6 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
}

if version < 3 {
// Enable foreign keys for this database.
conn.execute_batch("PRAGMA foreign_keys = ON;").await?;

conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
txn.set_db_version(3)
Expand Down Expand Up @@ -1915,6 +1923,27 @@ mod tests {
let chunks = store.load_all_chunks(room_id).await.unwrap();
assert!(chunks.is_empty());

// Check that cascading worked. Yes, sqlite, I doubt you.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How dare you?

store
.acquire()
.await
.unwrap()
.with_transaction(|txn| -> rusqlite::Result<_> {
let num_gaps = txn
.prepare("SELECT COUNT(chunk_id) FROM gaps ORDER BY chunk_id")?
.query_row((), |row| row.get::<_, u64>(0))?;
assert_eq!(num_gaps, 0);

let num_events = txn
.prepare("SELECT COUNT(event_id) FROM events ORDER BY chunk_id")?
.query_row((), |row| row.get::<_, u64>(0))?;
assert_eq!(num_events, 0);

Ok(())
})
.await
.unwrap();

// It's okay to re-insert a past event.
store
.handle_linked_chunk_updates(
Expand Down
12 changes: 2 additions & 10 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,15 +640,7 @@ impl EventCacheInner {

let rooms = self.by_room.write().await;
for room in rooms.values() {
// Clear all the room state.
let updates_as_vector_diffs = room.inner.state.write().await.reset().await?;

// Notify all the observers that we've lost track of state. (We ignore the
// error if there aren't any.)
let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Sync,
});
room.clear().await?;
}

Ok(())
Expand Down Expand Up @@ -681,7 +673,7 @@ impl EventCacheInner {
room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
{
// Non-fatal error, try to continue to the next room.
error!("handling joined room update: {err}");
error!(%room_id, "handling joined room update: {err}");
}
}

Expand Down
18 changes: 8 additions & 10 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,15 @@ impl RoomPagination {
let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
// There is a prior gap, let's replace it by new events!
if all_duplicates {
// All the events were duplicated; don't act upon them, and only remove the
// prior gap that we just filled.
trace!("removing previous gap, as all events have been deduplicated");
room_events.remove_empty_chunk_at(gap_id).expect("gap identifier is a valid gap chunk id we read previously")
} else {
trace!("replacing previous gap with the back-paginated events");

// Replace the gap with the events we just deduplicated.
room_events.replace_gap_at(reversed_events.clone(), gap_id)
.expect("gap_identifier is a valid chunk id we read previously")
assert!(reversed_events.is_empty());
}

trace!("replacing previous gap with the back-paginated events");

// Replace the gap with the events we just deduplicated. This might get rid of the
// underlying gap, if the conditions are favorable to us.
room_events.replace_gap_at(reversed_events.clone(), gap_id)
.expect("gap_identifier is a valid chunk id we read previously")
} else if let Some(pos) = first_event_pos {
// No prior gap, but we had some events: assume we need to prepend events
// before those.
Expand Down
Loading
Loading