12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: { collections:: BTreeSet , ops :: Not } ;
15
+ use std:: collections:: BTreeSet ;
16
16
17
- use bloomfilter :: Bloom ;
17
+ use growable_bloom_filter :: { GrowableBloom , GrowableBloomBuilder } ;
18
18
use matrix_sdk_base:: deserialized_responses:: SyncTimelineEvent ;
19
- use ruma:: OwnedEventId ;
20
19
21
20
use super :: store:: RoomEvents ;
22
21
23
22
pub struct Deduplicator {
24
- bloom_filter : Bloom < OwnedEventId > ,
23
+ bloom_filter : GrowableBloom ,
25
24
}
26
25
27
26
impl Deduplicator {
28
27
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS : usize = 800_000 ;
29
28
const DESIRED_FALSE_POSITIVE_RATE : f64 = 0.001 ;
30
- const SEED_FOR_HASHER : & ' static [ u8 ; 32 ] = b"matrix_sdk_event_cache_deduptor!" ;
31
29
30
+ /// Create a new `Self`.
32
31
pub fn new ( ) -> Self {
33
32
Self {
34
- bloom_filter : Bloom :: new_for_fp_rate_with_seed (
35
- Self :: APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS ,
36
- Self :: DESIRED_FALSE_POSITIVE_RATE ,
37
- Self :: SEED_FOR_HASHER ,
38
- ) ,
33
+ bloom_filter : GrowableBloomBuilder :: new ( )
34
+ . estimated_insertions ( Self :: APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS )
35
+ . desired_error_ratio ( Self :: DESIRED_FALSE_POSITIVE_RATE )
36
+ . build ( ) ,
39
37
}
40
38
}
41
39
42
- pub fn filter_and_learn < ' a , I > (
40
+ /// Scan a collection of events and detect duplications.
41
+ ///
42
+ /// This method takes a collection of events `events_to_scan` and returns a
43
+ /// new collection of events, where each event is decorated by a
44
+ /// [`Decoration`], so that the caller can decide what to do with these
45
+ /// events.
46
+ ///
47
+ /// Each scanned event will update `Self`'s internal state.
48
+ ///
49
+ /// `existing_events` represents all events of a room that already exist.
50
+ pub fn scan_and_learn < ' a , I > (
43
51
& ' a mut self ,
44
- events : I ,
45
- room_events : & ' a RoomEvents ,
46
- ) -> impl Iterator < Item = I :: Item > + ' a
52
+ events_to_scan : I ,
53
+ existing_events : & ' a RoomEvents ,
54
+ ) -> impl Iterator < Item = Decoration < I :: Item > > + ' a
47
55
where
48
56
I : Iterator < Item = SyncTimelineEvent > + ' a ,
49
57
{
50
58
let mut already_seen = BTreeSet :: new ( ) ;
51
59
52
- events . filter ( move |event| {
60
+ events_to_scan . map ( move |event| {
53
61
let Some ( event_id) = event. event_id ( ) else {
54
- // The event has no `event_id`. Safe path: filter it out.
55
- return false ;
62
+ // The event has no `event_id`.
63
+ return Decoration :: Invalid ( event ) ;
56
64
} ;
57
65
58
66
if self . bloom_filter . check_and_set ( & event_id) {
@@ -64,33 +72,47 @@ impl Deduplicator {
64
72
// iterator itself contains duplicated events! We use a `BTreetSet`, otherwise
65
73
// using a bloom filter again may generate false positives.
66
74
if already_seen. contains ( & event_id) {
67
- // The iterator contains a duplicated `event`. Let's filter it out.
68
- return false ;
75
+ // The iterator contains a duplicated `event`.
76
+ return Decoration :: Duplicated ( event ) ;
69
77
}
70
78
71
79
// Now we can iterate over all events to ensure `event` is not present in
72
- // `room_events`.
73
- let result = room_events
74
- . revents ( )
75
- . any ( |( _position, other_event) | {
76
- other_event. event_id ( ) . as_ref ( ) == Some ( & event_id)
77
- } )
78
- . not ( ) ;
80
+ // `existing_events`.
81
+ let duplicated = existing_events. revents ( ) . any ( |( _position, other_event) | {
82
+ other_event. event_id ( ) . as_ref ( ) == Some ( & event_id)
83
+ } ) ;
79
84
80
85
already_seen. insert ( event_id) ;
81
86
82
- result
87
+ if duplicated {
88
+ Decoration :: Duplicated ( event)
89
+ } else {
90
+ Decoration :: Ok ( event)
91
+ }
83
92
} else {
84
93
already_seen. insert ( event_id) ;
85
94
86
95
// Bloom filter has no false negatives. We are sure the event is NOT present: we
87
96
// can keep it in the iterator.
88
- true
97
+ Decoration :: Ok ( event )
89
98
}
90
99
} )
91
100
}
92
101
}
93
102
103
+ /// Information about the scanned collection of events.
104
+ #[ derive( Debug ) ]
105
+ pub enum Decoration < I > {
106
+ /// This event is not duplicated.
107
+ Ok ( I ) ,
108
+
109
+ /// This event is duplicated.
110
+ Duplicated ( I ) ,
111
+
112
+ /// This event is invalid (i.e. not well formed).
113
+ Invalid ( I ) ,
114
+ }
115
+
94
116
#[ cfg( test) ]
95
117
mod tests {
96
118
use assert_matches2:: assert_let;
@@ -120,18 +142,18 @@ mod tests {
120
142
let event_2 = sync_timeline_event ( & event_builder, & event_id_2) ;
121
143
122
144
let mut deduplicator = Deduplicator :: new ( ) ;
123
- let room_events = RoomEvents :: new ( ) ;
145
+ let existing_events = RoomEvents :: new ( ) ;
124
146
125
147
let mut events =
126
- deduplicator. filter_and_learn ( [ event_0, event_1, event_2] . into_iter ( ) , & room_events ) ;
148
+ deduplicator. scan_and_learn ( [ event_0, event_1, event_2] . into_iter ( ) , & existing_events ) ;
127
149
128
- assert_let ! ( Some ( event) = events. next( ) ) ;
150
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
129
151
assert_eq ! ( event. event_id( ) , Some ( event_id_0) ) ;
130
152
131
- assert_let ! ( Some ( event) = events. next( ) ) ;
153
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
132
154
assert_eq ! ( event. event_id( ) , Some ( event_id_1) ) ;
133
155
134
- assert_let ! ( Some ( event) = events. next( ) ) ;
156
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
135
157
assert_eq ! ( event. event_id( ) , Some ( event_id_2) ) ;
136
158
137
159
assert ! ( events. next( ) . is_none( ) ) ;
@@ -148,22 +170,25 @@ mod tests {
148
170
let event_1 = sync_timeline_event ( & event_builder, & event_id_1) ;
149
171
150
172
let mut deduplicator = Deduplicator :: new ( ) ;
151
- let room_events = RoomEvents :: new ( ) ;
173
+ let existing_events = RoomEvents :: new ( ) ;
152
174
153
- let mut events = deduplicator. filter_and_learn (
175
+ let mut events = deduplicator. scan_and_learn (
154
176
[
155
177
event_0. clone ( ) , // OK
156
178
event_0, // Not OK
157
179
event_1, // OK
158
180
]
159
181
. into_iter ( ) ,
160
- & room_events ,
182
+ & existing_events ,
161
183
) ;
162
184
163
- assert_let ! ( Some ( event) = events. next( ) ) ;
185
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
186
+ assert_eq ! ( event. event_id( ) , Some ( event_id_0. clone( ) ) ) ;
187
+
188
+ assert_let ! ( Some ( Decoration :: Duplicated ( event) ) = events. next( ) ) ;
164
189
assert_eq ! ( event. event_id( ) , Some ( event_id_0) ) ;
165
190
166
- assert_let ! ( Some ( event) = events. next( ) ) ;
191
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
167
192
assert_eq ! ( event. event_id( ) , Some ( event_id_1) ) ;
168
193
169
194
assert ! ( events. next( ) . is_none( ) ) ;
@@ -182,35 +207,43 @@ mod tests {
182
207
let event_2 = sync_timeline_event ( & event_builder, & event_id_2) ;
183
208
184
209
let mut deduplicator = Deduplicator :: new ( ) ;
185
- let mut room_events = RoomEvents :: new ( ) ;
210
+ let mut existing_events = RoomEvents :: new ( ) ;
186
211
187
- // Simulate `event_1` is inserted inside `room_events `.
212
+ // Simulate `event_1` is inserted inside `existing_events `.
188
213
{
189
214
let mut events =
190
- deduplicator. filter_and_learn ( [ event_1. clone ( ) ] . into_iter ( ) , & room_events ) ;
215
+ deduplicator. scan_and_learn ( [ event_1. clone ( ) ] . into_iter ( ) , & existing_events ) ;
191
216
192
- assert_let ! ( Some ( event_1) = events. next( ) ) ;
193
- assert_eq ! ( event_1. event_id( ) , Some ( event_id_1) ) ;
217
+ assert_let ! ( Some ( Decoration :: Ok ( event_1) ) = events. next( ) ) ;
218
+ assert_eq ! ( event_1. event_id( ) , Some ( event_id_1. clone ( ) ) ) ;
194
219
195
220
assert ! ( events. next( ) . is_none( ) ) ;
196
221
197
222
drop ( events) ; // make the borrow checker happy.
198
223
199
- // Now we can push `event_1` inside `room_events `.
200
- room_events . push_event ( event_1) ;
224
+ // Now we can push `event_1` inside `existing_events `.
225
+ existing_events . push_events ( [ event_1. clone ( ) ] ) ;
201
226
}
202
227
203
228
// `event_1` will be duplicated.
204
229
{
205
- let mut events = deduplicator
206
- . filter_and_learn ( [ event_0, event_1, event_2] . into_iter ( ) , & room_events) ;
207
-
208
- assert_let ! ( Some ( event) = events. next( ) ) ;
230
+ let mut events = deduplicator. scan_and_learn (
231
+ [
232
+ event_0, // OK
233
+ event_1, // Not OK
234
+ event_2, // Ok
235
+ ]
236
+ . into_iter ( ) ,
237
+ & existing_events,
238
+ ) ;
239
+
240
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
209
241
assert_eq ! ( event. event_id( ) , Some ( event_id_0) ) ;
210
242
211
- // `event_1` is missing.
243
+ assert_let ! ( Some ( Decoration :: Duplicated ( event) ) = events. next( ) ) ;
244
+ assert_eq ! ( event. event_id( ) , Some ( event_id_1) ) ;
212
245
213
- assert_let ! ( Some ( event) = events. next( ) ) ;
246
+ assert_let ! ( Some ( Decoration :: Ok ( event) ) = events. next( ) ) ;
214
247
assert_eq ! ( event. event_id( ) , Some ( event_id_2) ) ;
215
248
216
249
assert ! ( events. next( ) . is_none( ) ) ;
0 commit comments