Skip to content

feat(event cache): deduplicate using a store when available #4662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 220 additions & 30 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,102 @@
use std::{collections::BTreeSet, fmt, sync::Mutex};

use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use tracing::warn;
use matrix_sdk_base::event_cache::store::EventCacheStoreLock;
use ruma::{OwnedEventId, OwnedRoomId};
use tracing::{debug, warn};

use super::{
room::events::{Event, RoomEvents},
EventCacheError,
};

pub enum Deduplicator {
InMemory(BloomFilterDeduplicator),
PersistentStore(StoreDeduplicator),
}

impl Deduplicator {
/// Create an empty deduplicator instance that uses an internal Bloom
/// filter.
///
/// Such a deduplicator is stateful, with no initial known events, and it
/// will learn over time by using a Bloom filter which events are
/// duplicates or not.
///
/// When the persistent storage of the event cache is enabled by default,
/// this constructor (and the associated variant) will be removed.
pub fn new_memory_based() -> Self {
Self::InMemory(BloomFilterDeduplicator::new())
}

/// Create new store-based deduplicator that will run queries against the
/// store to find if any event is deduplicated or not.
///
/// This deduplicator is stateless.
///
/// When the persistent storage of the event cache is enabled by default,
/// this will become the default, and [`Deduplicator`] will be replaced
/// with [`StoreDeduplicator`].
pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
Self::PersistentStore(StoreDeduplicator { room_id, store })
}

/// Find duplicates in the given collection of events, and return both
/// valid events (those with an event id) as well as the event ids of
/// duplicate events.
pub async fn filter_duplicate_events(
&self,
events: Vec<Event>,
room_events: &RoomEvents,
) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError> {
match self {
Deduplicator::InMemory(dedup) => Ok(dedup.filter_duplicate_events(events, room_events)),
Deduplicator::PersistentStore(dedup) => dedup.filter_duplicate_events(events).await,
}
}
}

/// A deduplication mechanism based on the persistent storage associated to the
/// event cache.
///
/// It will use queries to the persistent storage to figure when events are
/// duplicates or not, making it entirely stateless.
pub struct StoreDeduplicator {
/// The room this deduplicator applies to.
room_id: OwnedRoomId,
/// The actual event cache store implementation used to query events.
store: EventCacheStoreLock,
}

impl StoreDeduplicator {
async fn filter_duplicate_events(
&self,
mut events: Vec<Event>,
) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError> {
let store = self.store.lock().await?;

// Collect event ids as we "validate" events (i.e. check they have a valid event
// id.)
let mut event_ids = Vec::new();

events.retain(|event| {
if let Some(event_id) = event.event_id() {
event_ids.push(event_id);
true
} else {
false
}
});

use super::room::events::{Event, RoomEvents};
// Let the store do its magic ✨
let duplicates = store.filter_duplicated_events(&self.room_id, event_ids).await?;

/// `Deduplicator` is an efficient type to find duplicated events.
Ok((events, duplicates))
}
}

/// `BloomFilterDeduplicator` is an efficient type to find duplicated events,
/// using an in-memory cache.
///
/// It uses a [bloom filter] to provide a memory efficient probabilistic answer
/// to: “has event E been seen already?”. False positives are possible, while
Expand All @@ -31,47 +122,70 @@ use super::room::events::{Event, RoomEvents};
/// positive or not
///
/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
pub struct Deduplicator {
pub struct BloomFilterDeduplicator {
bloom_filter: Mutex<GrowableBloom>,
}

impl fmt::Debug for Deduplicator {
impl fmt::Debug for BloomFilterDeduplicator {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("Deduplicator").finish_non_exhaustive()
}
}

impl Deduplicator {
impl BloomFilterDeduplicator {
// Note: don't use too high numbers here, or the amount of allocated memory will
// explode. See https://github.com/matrix-org/matrix-rust-sdk/pull/4231 for details.
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000;
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;

/// Create a new `Deduplicator` with no prior knowledge of known events.
#[cfg(test)]
pub fn new() -> Self {
Self::with_initial_events(std::iter::empty())
}

/// Create a new `Deduplicator` filled with initial events.
///
/// This won't detect duplicates in the initial events, only learn about
/// those events.
pub fn with_initial_events<'a>(events: impl Iterator<Item = &'a Event>) -> Self {
let mut bloom_filter = GrowableBloomBuilder::new()
fn new() -> Self {
let bloom_filter = GrowableBloomBuilder::new()
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
.build();
for e in events {
let Some(event_id) = e.event_id() else {
warn!("initial event in deduplicator had no event id");
continue;
};
bloom_filter.insert(event_id);
}
Self { bloom_filter: Mutex::new(bloom_filter) }
}

/// Find duplicates in the given collection of events, and return both
/// valid events (those with an event id) as well as the event ids of
/// duplicate events.
fn filter_duplicate_events(
&self,
events: Vec<Event>,
room_events: &RoomEvents,
) -> (Vec<Event>, Vec<OwnedEventId>) {
let mut duplicated_event_ids = Vec::new();

let events = self
.scan_and_learn(events.into_iter(), room_events)
.filter_map(|decorated_event| match decorated_event {
Decoration::Unique(event) => Some(event),
Decoration::Duplicated(event) => {
debug!(event_id = ?event.event_id(), "Found a duplicated event");

duplicated_event_ids.push(
event
.event_id()
// SAFETY: An event with no ID is decorated as
// `Decoration::Invalid`. Thus, it's
// safe to unwrap the `Option<OwnedEventId>` here.
.expect("The event has no ID"),
);

// Keep the new event!
Some(event)
}
Decoration::Invalid(event) => {
warn!(?event, "Found an event with no ID");
None
}
})
.collect::<Vec<_>>();

(events, duplicated_event_ids)
}

/// Scan a collection of events and detect duplications.
///
/// This method takes a collection of events `new_events_to_scan` and
Expand All @@ -82,7 +196,7 @@ impl Deduplicator {
/// Each scanned event will update `Self`'s internal state.
///
/// `existing_events` represents all events of a room that already exist.
pub fn scan_and_learn<'a, I>(
fn scan_and_learn<'a, I>(
&'a self,
new_events_to_scan: I,
existing_events: &'a RoomEvents,
Expand Down Expand Up @@ -141,7 +255,7 @@ impl Deduplicator {

/// Information about the scanned collection of events.
#[derive(Debug)]
pub enum Decoration<I> {
enum Decoration<I> {
/// This event is not duplicated.
Unique(I),

Expand Down Expand Up @@ -179,7 +293,7 @@ mod tests {
let event_1 = sync_timeline_event(&event_id_1);
let event_2 = sync_timeline_event(&event_id_2);

let deduplicator = Deduplicator::new();
let deduplicator = BloomFilterDeduplicator::new();
let existing_events = RoomEvents::new();

let mut events =
Expand All @@ -205,7 +319,7 @@ mod tests {
let event_0 = sync_timeline_event(&event_id_0);
let event_1 = sync_timeline_event(&event_id_1);

let deduplicator = Deduplicator::new();
let deduplicator = BloomFilterDeduplicator::new();
let existing_events = RoomEvents::new();

let mut events = deduplicator.scan_and_learn(
Expand Down Expand Up @@ -240,7 +354,7 @@ mod tests {
let event_1 = sync_timeline_event(&event_id_1);
let event_2 = sync_timeline_event(&event_id_2);

let deduplicator = Deduplicator::new();
let deduplicator = BloomFilterDeduplicator::new();
let mut existing_events = RoomEvents::new();

// Simulate `event_1` is inserted inside `existing_events`.
Expand Down Expand Up @@ -305,7 +419,7 @@ mod tests {
let mut dedups = Vec::with_capacity(num_rooms);

for _ in 0..num_rooms {
let dedup = Deduplicator::new();
let dedup = BloomFilterDeduplicator::new();
let existing_events = RoomEvents::new();

for i in 0..num_events {
Expand All @@ -319,4 +433,80 @@ mod tests {
dedups.push(dedup);
}
}

#[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
#[matrix_sdk_test::async_test]
async fn test_storage_deduplication() {
use std::sync::Arc;

use matrix_sdk_base::{
event_cache::store::{EventCacheStore as _, MemoryStore},
linked_chunk::{ChunkIdentifier, Position, Update},
};
use matrix_sdk_test::{ALICE, BOB};
use ruma::{event_id, room_id, serde::Raw};

let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));

let event_cache_store = Arc::new(MemoryStore::new());

let eid1 = event_id!("$1");
let eid2 = event_id!("$2");
let eid3 = event_id!("$3");

let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
// An invalid event (doesn't have an event id.).
let ev4 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());

// Prefill the store with ev1 and ev2.
event_cache_store
.handle_linked_chunk_updates(
room_id,
vec![
// Non empty items chunk.
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev1.clone()],
},
// And another items chunk, non-empty again.
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev2.clone()],
},
],
)
.await
.unwrap();

// Wrap the store into its lock.
let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());

let deduplicator =
StoreDeduplicator { room_id: room_id.to_owned(), store: event_cache_store };

let (valid_events, duplicates) =
deduplicator.filter_duplicate_events(vec![ev1, ev2, ev3, ev4]).await.unwrap();

assert_eq!(valid_events.len(), 3);
assert_eq!(valid_events[0].event_id().as_deref(), Some(eid1));
assert_eq!(valid_events[1].event_id().as_deref(), Some(eid2));
assert_eq!(valid_events[2].event_id().as_deref(), Some(eid3));

assert_eq!(duplicates.len(), 2);
assert_eq!(duplicates[0], eid1);
assert_eq!(duplicates[1], eid2);
}
}
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl RoomPagination {
.collect::<Vec<_>>();

let (new_events, duplicated_event_ids, all_deduplicated) =
state.collect_valid_and_duplicated_events(sync_events.clone().into_iter());
state.collect_valid_and_duplicated_events(sync_events.clone()).await?;

let (backpagination_outcome, sync_timeline_events_diffs) = state
.with_events_mut(move |room_events| {
Expand Down
Loading
Loading