Skip to content

Commit 8a3b113

Browse files
committed
feat(sdk): Implement event_cache::Deduplicator.
!wip
1 parent 95ae5d1 commit 8a3b113

File tree

4 files changed

+191
-3
lines changed

4 files changed

+191
-3
lines changed

Cargo.lock

Lines changed: 20 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/matrix-sdk/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async-channel = "2.2.1"
7575
async-stream = { workspace = true }
7676
async-trait = { workspace = true }
7777
axum = { version = "0.7.4", optional = true }
78+
bloomfilter = { version = "1.0.13", default-features = false }
7879
bytes = "1.1.0"
7980
bytesize = "1.1"
8081
chrono = { version = "0.4.23", optional = true }
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Copyright 2024 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::ops::Not;
16+
17+
use bloomfilter::Bloom;
18+
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
19+
use ruma::OwnedEventId;
20+
21+
use super::store::RoomEvents;
22+
23+
pub struct Deduplicator {
24+
bloom_filter: Bloom<OwnedEventId>,
25+
}
26+
27+
impl Deduplicator {
28+
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 800_000;
29+
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.001;
30+
const SEED_FOR_HASHER: &'static [u8; 32] = b"matrix_sdk_event_cache_deduptor!";
31+
32+
pub fn new() -> Self {
33+
Self {
34+
bloom_filter: Bloom::new_for_fp_rate_with_seed(
35+
Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS,
36+
Self::DESIRED_FALSE_POSITIVE_RATE,
37+
Self::SEED_FOR_HASHER,
38+
),
39+
}
40+
}
41+
42+
pub fn filter_and_learn<'a, I>(
43+
&'a mut self,
44+
events: I,
45+
room_events: &'a RoomEvents,
46+
) -> impl Iterator<Item = I::Item> + 'a
47+
where
48+
I: Iterator<Item = SyncTimelineEvent> + 'a,
49+
{
50+
events.filter(|event| {
51+
let Some(event_id) = event.event_id() else {
52+
// The event has no `event_id`. Safe path: filter it out.
53+
return false;
54+
};
55+
56+
if self.bloom_filter.check_and_set(&event_id) {
57+
// Bloom filter has false positives. We are NOT sure the event
58+
// is NOT present. Even if the false positive rate is low, we
59+
// need to iterate over all events to ensure it isn't present.
60+
61+
room_events
62+
.revents()
63+
.any(|(_position, other_event)| {
64+
other_event.event_id().as_ref() == Some(&event_id)
65+
})
66+
.not()
67+
} else {
68+
// Bloom filter has no false negatives. We are sure the event is NOT present: we
69+
// can keep it in the iterator.
70+
true
71+
}
72+
})
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use assert_matches2::assert_let;
79+
use matrix_sdk_test::{EventBuilder, ALICE};
80+
use ruma::{events::room::message::RoomMessageEventContent, owned_event_id, EventId};
81+
82+
use super::*;
83+
84+
fn sync_timeline_event(event_builder: &EventBuilder, event_id: &EventId) -> SyncTimelineEvent {
85+
SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id(
86+
&*ALICE,
87+
&event_id,
88+
RoomMessageEventContent::text_plain("foo"),
89+
))
90+
}
91+
92+
#[test]
93+
fn test_filter_no_duplicate() {
94+
let event_builder = EventBuilder::new();
95+
96+
let event_id_0 = owned_event_id!("$ev0");
97+
let event_id_1 = owned_event_id!("$ev1");
98+
let event_id_2 = owned_event_id!("$ev2");
99+
100+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
101+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
102+
let event_2 = sync_timeline_event(&event_builder, &event_id_2);
103+
104+
let mut deduplicator = Deduplicator::new();
105+
let room_events = RoomEvents::new();
106+
107+
let mut events =
108+
deduplicator.filter_and_learn([event_0, event_1, event_2].into_iter(), &room_events);
109+
110+
assert_let!(Some(event) = events.next());
111+
assert_eq!(event.event_id(), Some(event_id_0));
112+
113+
assert_let!(Some(event) = events.next());
114+
assert_eq!(event.event_id(), Some(event_id_1));
115+
116+
assert_let!(Some(event) = events.next());
117+
assert_eq!(event.event_id(), Some(event_id_2));
118+
119+
assert!(events.next().is_none());
120+
}
121+
122+
#[test]
123+
fn test_filter_duplicates() {
124+
let event_builder = EventBuilder::new();
125+
126+
let event_id_0 = owned_event_id!("$ev0");
127+
let event_id_1 = owned_event_id!("$ev1");
128+
let event_id_2 = owned_event_id!("$ev2");
129+
130+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
131+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
132+
let event_2 = sync_timeline_event(&event_builder, &event_id_2);
133+
134+
let mut deduplicator = Deduplicator::new();
135+
let mut room_events = RoomEvents::new();
136+
137+
// Simulate `event_1` is inserted inside `room_events`.
138+
{
139+
let mut events =
140+
deduplicator.filter_and_learn([event_1.clone()].into_iter(), &room_events);
141+
142+
assert_let!(Some(event_1) = events.next());
143+
assert_eq!(event_1.event_id(), Some(event_id_1));
144+
145+
assert!(events.next().is_none());
146+
147+
drop(events); // make the borrow checker happy.
148+
149+
// Now we can push `event_1` inside `room_events`.
150+
room_events.push_event(event_1);
151+
}
152+
153+
// `event_1` will be duplicated.
154+
{
155+
let mut events = deduplicator
156+
.filter_and_learn([event_0, event_1, event_2].into_iter(), &room_events);
157+
158+
assert_let!(Some(event) = events.next());
159+
assert_eq!(event.event_id(), Some(event_id_0));
160+
161+
// `event_1` is missing.
162+
163+
assert_let!(Some(event) = events.next());
164+
assert_eq!(event.event_id(), Some(event_id_2));
165+
166+
assert!(events.next().is_none());
167+
}
168+
}
169+
}

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use self::{
7777
};
7878
use crate::{client::WeakClient, room::WeakRoom, Client};
7979

80+
mod deduplicator;
8081
mod linked_chunk;
8182
mod pagination;
8283
mod store;

0 commit comments

Comments
 (0)