Skip to content

feat(base): Add EventCacheStore::filter_duplicated_events #4659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub trait EventCacheStoreIntegrationTests {

/// Test that removing a room from storage empties all associated data.
async fn test_remove_room(&self);

/// Test that filtering duplicated events works as expected.
async fn test_filter_duplicated_events(&self);
}

fn rebuild_linked_chunk(raws: Vec<RawChunk<Event, Gap>>) -> Option<LinkedChunk<3, Event, Gap>> {
Expand Down Expand Up @@ -502,6 +505,79 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
let r1_linked_chunk = self.reload_linked_chunk(r1).await.unwrap();
assert!(!r1_linked_chunk.is_empty());
}

async fn test_filter_duplicated_events(&self) {
let room_id = room_id!("!r0:matrix.org");
let another_room_id = room_id!("!r1:matrix.org");
let event = |msg: &str| make_test_event(room_id, msg);

let event_comte = event("comté");
let event_brigand = event("brigand du jorat");
let event_raclette = event("raclette");
let event_morbier = event("morbier");
let event_gruyere = event("gruyère");
let event_tome = event("tome");
let event_mont_dor = event("mont d'or");
Comment on lines +514 to +520
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ 🧀


self.handle_linked_chunk_updates(
room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
at: Position::new(CId::new(0), 0),
items: vec![event_comte.clone(), event_brigand.clone()],
},
Update::NewGapChunk {
previous: Some(CId::new(0)),
new: CId::new(1),
next: None,
gap: Gap { prev_token: "brillat-savarin".to_owned() },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I WANT MORE 🧀

},
Update::NewItemsChunk { previous: Some(CId::new(1)), new: CId::new(2), next: None },
Update::PushItems {
at: Position::new(CId::new(2), 0),
items: vec![event_morbier.clone(), event_mont_dor.clone()],
},
],
)
.await
.unwrap();

// Add other events in another room, to ensure filtering take the `room_id` into
// account.
self.handle_linked_chunk_updates(
another_room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
at: Position::new(CId::new(0), 0),
items: vec![event_tome.clone()],
},
],
)
.await
.unwrap();

let duplicated_events = self
.filter_duplicated_events(
room_id,
vec![
event_comte.event_id().unwrap().to_owned(),
event_raclette.event_id().unwrap().to_owned(),
event_morbier.event_id().unwrap().to_owned(),
event_gruyere.event_id().unwrap().to_owned(),
event_tome.event_id().unwrap().to_owned(),
event_mont_dor.event_id().unwrap().to_owned(),
],
)
.await
.unwrap();

assert_eq!(duplicated_events.len(), 3);
assert_eq!(duplicated_events[0], event_comte.event_id().unwrap());
assert_eq!(duplicated_events[1], event_morbier.event_id().unwrap());
assert_eq!(duplicated_events[2], event_mont_dor.event_id().unwrap());
}
}

/// Macro building to allow your `EventCacheStore` implementation to run the
Expand Down Expand Up @@ -584,6 +660,13 @@ macro_rules! event_cache_store_integration_tests {
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_remove_room().await;
}

#[async_test]
async fn test_filter_duplicated_events() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_filter_duplicated_events().await;
}
}
};
}
Expand Down
31 changes: 30 additions & 1 deletion crates/matrix-sdk-base/src/event_cache/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use matrix_sdk_common::{
};
use ruma::{
time::{Instant, SystemTime},
MxcUri, OwnedMxcUri, RoomId,
MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
};

use super::{
Expand Down Expand Up @@ -148,6 +148,35 @@ impl EventCacheStore for MemoryStore {
Ok(())
}

async fn filter_duplicated_events(
&self,
room_id: &RoomId,
mut events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error> {
// Collect all duplicated events.
let inner = self.inner.read().unwrap();

let mut duplicated_events = Vec::new();

for event in inner.events.unordered_events(room_id) {
// If `events` is empty, we can short-circuit.
if events.is_empty() {
break;
}
Comment on lines +162 to +165
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you if events.is_empty() { return; } at the top of the events, instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because we remove events one by one from events. When it's empty, we can stop iterating all other events.


if let Some(known_event_id) = event.event_id() {
// This event exists in the store event!
if let Some(position) =
events.iter().position(|new_event_id| &known_event_id == new_event_id)
{
duplicated_events.push(events.remove(position));
}
}
}

Ok(duplicated_events)
}

async fn add_media_content(
&self,
request: &MediaRequestParameters,
Expand Down
18 changes: 17 additions & 1 deletion crates/matrix-sdk-base/src/event_cache/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use matrix_sdk_common::{
linked_chunk::{RawChunk, Update},
AsyncTraitDeps,
};
use ruma::{MxcUri, RoomId};
use ruma::{MxcUri, OwnedEventId, RoomId};

use super::{
media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy},
Expand Down Expand Up @@ -80,6 +80,14 @@ pub trait EventCacheStore: AsyncTraitDeps {
/// using the above [`Self::handle_linked_chunk_updates`] methods.
async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error>;

/// Given a set of event ID, remove the unique events and return the
/// duplicated events.
async fn filter_duplicated_events(
&self,
room_id: &RoomId,
events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error>;

/// Add a media file's content in the media store.
///
/// # Arguments
Expand Down Expand Up @@ -247,6 +255,14 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
self.0.clear_all_rooms_chunks().await.map_err(Into::into)
}

async fn filter_duplicated_events(
&self,
room_id: &RoomId,
events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error> {
self.0.filter_duplicated_events(room_id, events).await.map_err(Into::into)
}

async fn add_media_content(
&self,
request: &MediaRequestParameters,
Expand Down
50 changes: 50 additions & 0 deletions crates/matrix-sdk-common/src/linked_chunk/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,21 @@ impl<Item, Gap> RelationalLinkedChunk<Item, Gap> {
}
}
}

/// Return an iterator that yields events of a particular room with no
/// particular order.
pub fn unordered_events<'a>(&'a self, room_id: &'a RoomId) -> impl Iterator<Item = &'a Item> {
self.items.iter().filter_map(move |item_row| {
if item_row.room_id == room_id {
match &item_row.item {
Either::Item(item) => Some(item),
Either::Gap(..) => None,
}
} else {
None
}
})
}
}

impl<Item, Gap> RelationalLinkedChunk<Item, Gap>
Expand Down Expand Up @@ -1002,4 +1017,39 @@ mod tests {
],
);
}

#[test]
fn test_unordered_events() {
let room_id = room_id!("!r0:matrix.org");
let other_room_id = room_id!("!r1:matrix.org");
let mut relational_linked_chunk = RelationalLinkedChunk::<char, ()>::new();

relational_linked_chunk.apply_updates(
room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['a', 'b', 'c'] },
Update::NewItemsChunk { previous: Some(CId::new(0)), new: CId::new(1), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['d', 'e', 'f'] },
],
);

relational_linked_chunk.apply_updates(
other_room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['x', 'y', 'z'] },
],
);

let mut events = relational_linked_chunk.unordered_events(room_id);

assert_eq!(*events.next().unwrap(), 'a');
assert_eq!(*events.next().unwrap(), 'b');
assert_eq!(*events.next().unwrap(), 'c');
assert_eq!(*events.next().unwrap(), 'd');
assert_eq!(*events.next().unwrap(), 'e');
assert_eq!(*events.next().unwrap(), 'f');
assert!(events.next().is_none());
}
}
69 changes: 66 additions & 3 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! A sqlite-based backend for the [`EventCacheStore`].

use std::{borrow::Cow, fmt, path::Path, sync::Arc};
use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};

use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
Expand All @@ -33,8 +33,8 @@ use matrix_sdk_base::{
media::{MediaRequestParameters, UniqueKey},
};
use matrix_sdk_store_encryption::StoreCipher;
use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior};
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
use tokio::fs;
use tracing::{debug, trace};

Expand Down Expand Up @@ -622,6 +622,69 @@ impl EventCacheStore for SqliteEventCacheStore {
Ok(())
}

async fn filter_duplicated_events(
&self,
room_id: &RoomId,
events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error> {
// Select all events that exist in the store, i.e. the duplicates.
let room_id = room_id.to_owned();
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);

self.acquire()
.await?
.with_transaction(move |txn| -> Result<_> {
txn.chunk_large_query_over(events, None, move |txn, events| {
let query = format!(
"SELECT event_id FROM events WHERE room_id = ? AND event_id IN ({})",
repeat_vars(events.len()),
);
let parameters = params_from_iter(
// parameter for `room_id = ?`
once(
hashed_room_id
.to_sql()
// SAFETY: it cannot fail since `Key::to_sql` never fails
.unwrap(),
)
// parameters for `event_id IN (…)`
.chain(events.iter().map(|event| {
event
.as_str()
.to_sql()
// SAFETY: it cannot fail since `str::to_sql` never fails
.unwrap()
})),
);

let mut duplicated_events = Vec::new();

for duplicated_event in txn
.prepare(&query)?
.query_map(parameters, |row| row.get::<_, Option<String>>(0))?
{
let duplicated_event = duplicated_event?;

let Some(duplicated_event) = duplicated_event else {
// Event ID is malformed, let's skip it.
continue;
};

let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
// Normally unreachable, but the event ID has been stored even if it is
// malformed, let's skip it.
continue;
};

duplicated_events.push(duplicated_event);
}

Ok(duplicated_events)
})
})
.await
}

async fn add_media_content(
&self,
request: &MediaRequestParameters,
Expand Down
Loading