Skip to content

Commit a2835a7

Browse files
committed
feat(sdk): Introduce event_cache::Deduplicator.
This patch introduces `Deduplicator`, an efficient type to detect duplicated events in the event cache. It uses a bloom filter, and decorates a collection of events with `Decoration`, which an enum that marks whether an event is unique, duplicated or invalid.
1 parent ce9dc73 commit a2835a7

File tree

4 files changed

+269
-0
lines changed

4 files changed

+269
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/matrix-sdk/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ eyeball-im = { workspace = true }
8282
eyre = { version = "0.6.8", optional = true }
8383
futures-core = { workspace = true }
8484
futures-util = { workspace = true }
85+
growable-bloom-filter = { workspace = true }
8586
http = { workspace = true }
8687
imbl = { workspace = true, features = ["serde"] }
8788
indexmap = "2.0.2"
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
// Copyright 2024 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16+
//! to learn more.
17+
18+
use std::{collections::BTreeSet, sync::Mutex};
19+
20+
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21+
22+
use super::store::{Event, RoomEvents};
23+
24+
/// `Deduplicator` is an efficient type to find duplicated events.
25+
///
26+
/// It uses a [bloom filter] to provide a memory efficient probabilistic answer
27+
/// to: “has event E been seen already?”. False positives are possible, while
28+
/// false negatives are impossible. In the case of a positive reply, we fallback
29+
/// to a linear (backward) search on all events to check whether it's a false
30+
/// positive or not
31+
///
32+
/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
33+
pub struct Deduplicator {
34+
bloom_filter: Mutex<GrowableBloom>,
35+
}
36+
37+
impl Deduplicator {
38+
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 800_000;
39+
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.001;
40+
41+
/// Create a new `Deduplicator`.
42+
pub fn new() -> Self {
43+
Self {
44+
bloom_filter: Mutex::new(
45+
GrowableBloomBuilder::new()
46+
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
47+
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
48+
.build(),
49+
),
50+
}
51+
}
52+
53+
/// Scan a collection of events and detect duplications.
54+
///
55+
/// This method takes a collection of events `events_to_scan` and returns a
56+
/// new collection of events, where each event is decorated by a
57+
/// [`Decoration`], so that the caller can decide what to do with these
58+
/// events.
59+
///
60+
/// Each scanned event will update `Self`'s internal state.
61+
///
62+
/// `existing_events` represents all events of a room that already exist.
63+
pub fn scan_and_learn<'a, I>(
64+
&'a self,
65+
events_to_scan: I,
66+
existing_events: &'a RoomEvents,
67+
) -> impl Iterator<Item = Decoration<I::Item>> + 'a
68+
where
69+
I: Iterator<Item = Event> + 'a,
70+
{
71+
let mut already_seen = BTreeSet::new();
72+
73+
events_to_scan.map(move |event| {
74+
let Some(event_id) = event.event_id() else {
75+
// The event has no `event_id`.
76+
return Decoration::Invalid(event);
77+
};
78+
79+
if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
80+
// Bloom filter has false positives. We are NOT sure the event is NOT present.
81+
// Even if the false positive rate is low, we need to iterate over all events to
82+
// ensure it isn't present.
83+
84+
// But first, let's ensure `event` is not a duplicate from `events_to_scan`,
85+
// i.e. if the iterator itself contains duplicated events! We use a `BTreeSet`,
86+
// otherwise using a bloom filter again may generate false positives.
87+
if already_seen.contains(&event_id) {
88+
// The iterator contains a duplicated `event`.
89+
return Decoration::Duplicated(event);
90+
}
91+
92+
// Now we can iterate over all events to ensure `event` is not present in
93+
// `existing_events`.
94+
let duplicated = existing_events.revents().any(|(_position, other_event)| {
95+
other_event.event_id().as_ref() == Some(&event_id)
96+
});
97+
98+
already_seen.insert(event_id);
99+
100+
if duplicated {
101+
Decoration::Duplicated(event)
102+
} else {
103+
Decoration::Unique(event)
104+
}
105+
} else {
106+
already_seen.insert(event_id);
107+
108+
// Bloom filter has no false negatives. We are sure the event is NOT present: we
109+
// can keep it in the iterator.
110+
Decoration::Unique(event)
111+
}
112+
})
113+
}
114+
}
115+
116+
/// Information about the scanned collection of events.
117+
#[derive(Debug)]
118+
pub enum Decoration<I> {
119+
/// This event is not duplicated.
120+
Unique(I),
121+
122+
/// This event is duplicated.
123+
Duplicated(I),
124+
125+
/// This event is invalid (i.e. not well formed).
126+
Invalid(I),
127+
}
128+
129+
#[cfg(test)]
130+
mod tests {
131+
use assert_matches2::assert_let;
132+
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
133+
use matrix_sdk_test::{EventBuilder, ALICE};
134+
use ruma::{events::room::message::RoomMessageEventContent, owned_event_id, EventId};
135+
136+
use super::*;
137+
138+
fn sync_timeline_event(event_builder: &EventBuilder, event_id: &EventId) -> SyncTimelineEvent {
139+
SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id(
140+
*ALICE,
141+
event_id,
142+
RoomMessageEventContent::text_plain("foo"),
143+
))
144+
}
145+
146+
#[test]
147+
fn test_filter_no_duplicate() {
148+
let event_builder = EventBuilder::new();
149+
150+
let event_id_0 = owned_event_id!("$ev0");
151+
let event_id_1 = owned_event_id!("$ev1");
152+
let event_id_2 = owned_event_id!("$ev2");
153+
154+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
155+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
156+
let event_2 = sync_timeline_event(&event_builder, &event_id_2);
157+
158+
let deduplicator = Deduplicator::new();
159+
let existing_events = RoomEvents::new();
160+
161+
let mut events =
162+
deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);
163+
164+
assert_let!(Some(Decoration::Unique(event)) = events.next());
165+
assert_eq!(event.event_id(), Some(event_id_0));
166+
167+
assert_let!(Some(Decoration::Unique(event)) = events.next());
168+
assert_eq!(event.event_id(), Some(event_id_1));
169+
170+
assert_let!(Some(Decoration::Unique(event)) = events.next());
171+
assert_eq!(event.event_id(), Some(event_id_2));
172+
173+
assert!(events.next().is_none());
174+
}
175+
176+
#[test]
177+
fn test_filter_duplicates_in_new_events() {
178+
let event_builder = EventBuilder::new();
179+
180+
let event_id_0 = owned_event_id!("$ev0");
181+
let event_id_1 = owned_event_id!("$ev1");
182+
183+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
184+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
185+
186+
let deduplicator = Deduplicator::new();
187+
let existing_events = RoomEvents::new();
188+
189+
let mut events = deduplicator.scan_and_learn(
190+
[
191+
event_0.clone(), // OK
192+
event_0, // Not OK
193+
event_1, // OK
194+
]
195+
.into_iter(),
196+
&existing_events,
197+
);
198+
199+
assert_let!(Some(Decoration::Unique(event)) = events.next());
200+
assert_eq!(event.event_id(), Some(event_id_0.clone()));
201+
202+
assert_let!(Some(Decoration::Duplicated(event)) = events.next());
203+
assert_eq!(event.event_id(), Some(event_id_0));
204+
205+
assert_let!(Some(Decoration::Unique(event)) = events.next());
206+
assert_eq!(event.event_id(), Some(event_id_1));
207+
208+
assert!(events.next().is_none());
209+
}
210+
211+
#[test]
212+
fn test_filter_duplicates_with_existing_events() {
213+
let event_builder = EventBuilder::new();
214+
215+
let event_id_0 = owned_event_id!("$ev0");
216+
let event_id_1 = owned_event_id!("$ev1");
217+
let event_id_2 = owned_event_id!("$ev2");
218+
219+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
220+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
221+
let event_2 = sync_timeline_event(&event_builder, &event_id_2);
222+
223+
let deduplicator = Deduplicator::new();
224+
let mut existing_events = RoomEvents::new();
225+
226+
// Simulate `event_1` is inserted inside `existing_events`.
227+
{
228+
let mut events =
229+
deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events);
230+
231+
assert_let!(Some(Decoration::Unique(event_1)) = events.next());
232+
assert_eq!(event_1.event_id(), Some(event_id_1.clone()));
233+
234+
assert!(events.next().is_none());
235+
236+
drop(events); // make the borrow checker happy.
237+
238+
// Now we can push `event_1` inside `existing_events`.
239+
existing_events.push_events([event_1]);
240+
}
241+
242+
// `event_1` will be duplicated.
243+
{
244+
let mut events = deduplicator.scan_and_learn(
245+
[
246+
event_0, // OK
247+
event_1, // Not OK
248+
event_2, // Ok
249+
]
250+
.into_iter(),
251+
&existing_events,
252+
);
253+
254+
assert_let!(Some(Decoration::Unique(event)) = events.next());
255+
assert_eq!(event.event_id(), Some(event_id_0));
256+
257+
assert_let!(Some(Decoration::Duplicated(event)) = events.next());
258+
assert_eq!(event.event_id(), Some(event_id_1));
259+
260+
assert_let!(Some(Decoration::Unique(event)) = events.next());
261+
assert_eq!(event.event_id(), Some(event_id_2));
262+
263+
assert!(events.next().is_none());
264+
}
265+
}
266+
}

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span};
5353
use self::paginator::PaginatorError;
5454
use crate::{client::WeakClient, Client};
5555

56+
mod deduplicator;
5657
mod linked_chunk;
5758
mod pagination;
5859
mod room;

0 commit comments

Comments
 (0)