Skip to content

Commit e75192f

Browse files
committed
feat(sdk): Remove old events if duplicated.
1 parent 8143346 commit e75192f

File tree

2 files changed

+221
-27
lines changed

2 files changed

+221
-27
lines changed

crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,13 @@ impl Position {
947947
pub fn index(&self) -> usize {
948948
self.1
949949
}
950+
951+
pub(super) fn move_index_to_the_left(&mut self) {
952+
self.1 = self
953+
.1
954+
.checked_sub(1)
955+
.expect("Cannot move position's index to the left because it's already 0");
956+
}
950957
}
951958

952959
/// An iterator over a [`LinkedChunk`] that traverses the chunk in backward

crates/matrix-sdk/src/event_cache/store.rs

Lines changed: 214 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt;
15+
use std::{cmp::Ordering, fmt};
1616

1717
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
18+
use ruma::OwnedEventId;
1819
use tracing::error;
1920

2021
use super::{
@@ -64,25 +65,34 @@ impl RoomEvents {
6465
///
6566
/// For the moment, duplicated events will be logged but not removed from
6667
/// the resulting iterator.
67-
fn deduplicate<'a, I>(&'a self, events: I) -> impl Iterator<Item = Event> + 'a
68+
fn deduplicate<'a, I>(&'a mut self, events: I) -> (Vec<Event>, Vec<OwnedEventId>)
6869
where
6970
I: Iterator<Item = Event> + 'a,
7071
{
71-
self.deduplicator.scan_and_learn(events, self).map(
72-
|decorated_event| match decorated_event {
73-
Decoration::Ok(event) => event,
72+
let mut duplicated_event_ids = Vec::new();
73+
74+
let deduplicated_events = self
75+
.deduplicator
76+
.scan_and_learn(events, self)
77+
.filter_map(|decorated_event| match decorated_event {
78+
Decoration::Ok(event) => Some(event),
7479
Decoration::Duplicated(event) => {
75-
error!(?event, "Found a duplicated event");
80+
error!(event_id = ?event.event_id(), "Found a duplicated event");
81+
82+
duplicated_event_ids.push(event.event_id().expect("The event has no ID"));
7683

77-
event
84+
// Keep the new event!
85+
Some(event)
7886
}
7987
Decoration::Invalid(event) => {
8088
error!(?event, "Found an invalid event");
8189

82-
event
90+
None
8391
}
84-
},
85-
)
92+
})
93+
.collect();
94+
95+
(deduplicated_events, duplicated_event_ids)
8696
}
8797

8898
/// Push events after all events or gaps.
@@ -92,9 +102,35 @@ impl RoomEvents {
92102
where
93103
I: IntoIterator<Item = Event>,
94104
{
95-
let events = self.deduplicate(events.into_iter()).collect::<Vec<_>>();
105+
let (events, duplicated_event_ids) = self.deduplicate(events.into_iter());
96106

97-
self.chunks.push_items_back(events)
107+
// Remove the _old_ duplicated events!
108+
{
109+
// We don't have to worry the removals can change the position of the existing
110+
// events, because we are pushing all _new_ `events` at the back.
111+
for duplicated_event_id in duplicated_event_ids {
112+
let Some(duplicated_event_position) =
113+
self.revents().find_map(|(position, event)| {
114+
(event.event_id().as_ref() == Some(&duplicated_event_id))
115+
.then_some(position)
116+
})
117+
else {
118+
error!(
119+
?duplicated_event_id,
120+
"A duplicated event has been detected, but it's position seems unknown"
121+
);
122+
123+
continue;
124+
};
125+
126+
self.chunks
127+
.remove_item_at(duplicated_event_position)
128+
.expect("Failed to remove an event we have just found");
129+
}
130+
}
131+
132+
// Push new `events`.
133+
self.chunks.push_items_back(events);
98134
}
99135

100136
/// Push a gap after all events or gaps.
@@ -103,11 +139,67 @@ impl RoomEvents {
103139
}
104140

105141
/// Insert events at a specified position.
106-
pub fn insert_events_at<I>(&mut self, events: I, position: Position) -> Result<(), Error>
142+
pub fn insert_events_at<I>(&mut self, events: I, mut position: Position) -> Result<(), Error>
107143
where
108144
I: IntoIterator<Item = Event>,
109145
{
110-
let events = self.deduplicate(events.into_iter()).collect::<Vec<_>>();
146+
let (events, duplicated_event_ids) = self.deduplicate(events.into_iter());
147+
148+
// Remove the _old_ duplicated events!
149+
{
150+
// We **have to worry* the removals can change the position of the existing
151+
// events. We **have** to update the `position` argument value for each removal.
152+
for duplicated_event_id in duplicated_event_ids {
153+
let Some(duplicated_event_position) =
154+
self.revents().find_map(|(position, event)| {
155+
(event.event_id().as_ref() == Some(&duplicated_event_id))
156+
.then_some(position)
157+
})
158+
else {
159+
error!(
160+
?duplicated_event_id,
161+
"A duplicated event has been detected, but it's position seems unknown"
162+
);
163+
164+
continue;
165+
};
166+
167+
self.chunks
168+
.remove_item_at(duplicated_event_position)
169+
.expect("Failed to remove an event we have just found");
170+
171+
// A `Position` is composed of a `ChunkIdentifier` and an index.
172+
// The `ChunkIdentifier` is stable, i.e. it won't change if an
173+
// event is removed in another chunk. It means we only need to
174+
// update `position` if the removal happened in **the same
175+
// chunk**.
176+
if duplicated_event_position.chunk_identifier() == position.chunk_identifier() {
177+
// Now we can compare the the position indices.
178+
match duplicated_event_position.index().cmp(&position.index()) {
179+
// `duplicated_event_position`'s index < `position`'s index
180+
Ordering::Less => {
181+
// An event has been removed _before_ the new
182+
// events: `position` needs to be shifted to the
183+
// left by 1.
184+
position.move_index_to_the_left();
185+
}
186+
187+
// `duplicated_event_position`'s index == `position`'s index
188+
Ordering::Equal => {
189+
// An event has been removed at the same position of
190+
// the new events: `position` does _NOT_ need tp be
191+
// modified.
192+
}
193+
194+
// `duplicated_event_position`'s index > `position`'s index
195+
Ordering::Greater => {
196+
// An event has been removed _after_ the new events:
197+
// `position` does _NOT_ need to be modified.
198+
}
199+
}
200+
}
201+
}
202+
}
111203

112204
self.chunks.insert_items_at(events, position)
113205
}
@@ -132,7 +224,33 @@ impl RoomEvents {
132224
where
133225
I: IntoIterator<Item = Event>,
134226
{
135-
let events = self.deduplicate(events.into_iter()).collect::<Vec<_>>();
227+
let (events, duplicated_event_ids) = self.deduplicate(events.into_iter());
228+
229+
// Remove the _old_ duplicated events!
230+
{
231+
// We don't have to worry the removals can change the position of the existing
232+
// events, because we are replacing a gap: its identifier will not change
233+
// because of the removals.
234+
for duplicated_event_id in duplicated_event_ids {
235+
let Some(duplicated_event_position) =
236+
self.revents().find_map(|(position, event)| {
237+
(event.event_id().as_ref() == Some(&duplicated_event_id))
238+
.then_some(position)
239+
})
240+
else {
241+
error!(
242+
?duplicated_event_id,
243+
"A duplicated event has been detected, but it's position seems unknown"
244+
);
245+
246+
continue;
247+
};
248+
249+
self.chunks
250+
.remove_item_at(duplicated_event_position)
251+
.expect("Failed to remove an event we have just found");
252+
}
253+
}
136254

137255
self.chunks.replace_gap_at(events, gap_identifier)
138256
}
@@ -240,11 +358,11 @@ mod tests {
240358
let event_builder = EventBuilder::new();
241359

242360
let (event_id_0, event_0) = new_event(&event_builder, "$ev0");
361+
let (event_id_1, event_1) = new_event(&event_builder, "$ev1");
243362

244363
let mut room_events = RoomEvents::new();
245364

246-
room_events.push_events([event_0.clone()]);
247-
room_events.push_events([event_0]);
365+
room_events.push_events([event_0.clone(), event_1]);
248366

249367
{
250368
let mut events = room_events.events();
@@ -254,6 +372,26 @@ mod tests {
254372
assert_eq!(position.index(), 0);
255373
assert_eq!(event.event_id().unwrap(), event_id_0);
256374

375+
assert_let!(Some((position, event)) = events.next());
376+
assert_eq!(position.chunk_identifier(), 0);
377+
assert_eq!(position.index(), 1);
378+
assert_eq!(event.event_id().unwrap(), event_id_1);
379+
380+
assert!(events.next().is_none());
381+
}
382+
383+
// Everything is alright. Now let's push a duplicated event.
384+
room_events.push_events([event_0]);
385+
386+
{
387+
let mut events = room_events.events();
388+
389+
// The first `event_id_0` has been removed.
390+
assert_let!(Some((position, event)) = events.next());
391+
assert_eq!(position.chunk_identifier(), 0);
392+
assert_eq!(position.index(), 0);
393+
assert_eq!(event.event_id().unwrap(), event_id_1);
394+
257395
assert_let!(Some((position, event)) = events.next());
258396
assert_eq!(position.chunk_identifier(), 0);
259397
assert_eq!(position.index(), 1);
@@ -352,25 +490,25 @@ mod tests {
352490
}
353491

354492
#[test]
355-
fn test_insert_events_at_with_dupicates() {
493+
fn test_insert_events_at_with_duplicates() {
356494
let event_builder = EventBuilder::new();
357495

358496
let (event_id_0, event_0) = new_event(&event_builder, "$ev0");
359497
let (event_id_1, event_1) = new_event(&event_builder, "$ev1");
498+
let (event_id_2, event_2) = new_event(&event_builder, "$ev2");
499+
let (event_id_3, event_3) = new_event(&event_builder, "$ev3");
360500

361501
let mut room_events = RoomEvents::new();
362502

363-
room_events.push_events([event_0, event_1.clone()]);
503+
room_events.push_events([event_0.clone(), event_1, event_2]);
364504

365-
let position_of_event_1 = room_events
505+
let position_of_event_2 = room_events
366506
.events()
367507
.find_map(|(position, event)| {
368-
(event.event_id().unwrap() == event_id_1).then_some(position)
508+
(event.event_id().unwrap() == event_id_2).then_some(position)
369509
})
370510
.unwrap();
371511

372-
room_events.insert_events_at([event_1], position_of_event_1).unwrap();
373-
374512
{
375513
let mut events = room_events.events();
376514

@@ -387,8 +525,38 @@ mod tests {
387525
assert_let!(Some((position, event)) = events.next());
388526
assert_eq!(position.chunk_identifier(), 0);
389527
assert_eq!(position.index(), 2);
528+
assert_eq!(event.event_id().unwrap(), event_id_2);
529+
530+
assert!(events.next().is_none());
531+
}
532+
533+
// Everything is alright. Now let's insert a duplicated events!
534+
room_events.insert_events_at([event_0, event_3], position_of_event_2).unwrap();
535+
536+
{
537+
let mut events = room_events.events();
538+
539+
// The first `event_id_0` has been removed.
540+
assert_let!(Some((position, event)) = events.next());
541+
assert_eq!(position.chunk_identifier(), 0);
542+
assert_eq!(position.index(), 0);
390543
assert_eq!(event.event_id().unwrap(), event_id_1);
391544

545+
assert_let!(Some((position, event)) = events.next());
546+
assert_eq!(position.chunk_identifier(), 0);
547+
assert_eq!(position.index(), 1);
548+
assert_eq!(event.event_id().unwrap(), event_id_0);
549+
550+
assert_let!(Some((position, event)) = events.next());
551+
assert_eq!(position.chunk_identifier(), 0);
552+
assert_eq!(position.index(), 2);
553+
assert_eq!(event.event_id().unwrap(), event_id_3);
554+
555+
assert_let!(Some((position, event)) = events.next());
556+
assert_eq!(position.chunk_identifier(), 0);
557+
assert_eq!(position.index(), 3);
558+
assert_eq!(event.event_id().unwrap(), event_id_2);
559+
392560
assert!(events.next().is_none());
393561
}
394562
}
@@ -507,10 +675,11 @@ mod tests {
507675

508676
let (event_id_0, event_0) = new_event(&event_builder, "$ev0");
509677
let (event_id_1, event_1) = new_event(&event_builder, "$ev1");
678+
let (event_id_2, event_2) = new_event(&event_builder, "$ev2");
510679

511680
let mut room_events = RoomEvents::new();
512681

513-
room_events.push_events([event_0.clone()]);
682+
room_events.push_events([event_0.clone(), event_1]);
514683
room_events.push_gap(Gap { prev_token: "hello".to_owned() });
515684

516685
let chunk_identifier_of_gap = room_events
@@ -519,8 +688,6 @@ mod tests {
519688
.unwrap()
520689
.chunk_identifier();
521690

522-
room_events.replace_gap_at([event_0, event_1], chunk_identifier_of_gap).unwrap();
523-
524691
{
525692
let mut events = room_events.events();
526693

@@ -529,6 +696,26 @@ mod tests {
529696
assert_eq!(position.index(), 0);
530697
assert_eq!(event.event_id().unwrap(), event_id_0);
531698

699+
assert_let!(Some((position, event)) = events.next());
700+
assert_eq!(position.chunk_identifier(), 0);
701+
assert_eq!(position.index(), 1);
702+
assert_eq!(event.event_id().unwrap(), event_id_1);
703+
704+
assert!(events.next().is_none());
705+
}
706+
707+
// Everything is alright. Now let's replace a gap with a duplicated event.
708+
room_events.replace_gap_at([event_0, event_2], chunk_identifier_of_gap).unwrap();
709+
710+
{
711+
let mut events = room_events.events();
712+
713+
// The first `event_id_0` has been removed.
714+
assert_let!(Some((position, event)) = events.next());
715+
assert_eq!(position.chunk_identifier(), 0);
716+
assert_eq!(position.index(), 0);
717+
assert_eq!(event.event_id().unwrap(), event_id_1);
718+
532719
assert_let!(Some((position, event)) = events.next());
533720
assert_eq!(position.chunk_identifier(), 2);
534721
assert_eq!(position.index(), 0);
@@ -537,7 +724,7 @@ mod tests {
537724
assert_let!(Some((position, event)) = events.next());
538725
assert_eq!(position.chunk_identifier(), 2);
539726
assert_eq!(position.index(), 1);
540-
assert_eq!(event.event_id().unwrap(), event_id_1);
727+
assert_eq!(event.event_id().unwrap(), event_id_2);
541728

542729
assert!(events.next().is_none());
543730
}

0 commit comments

Comments
 (0)