Skip to content

Commit d45adde

Browse files
Hywanbnjbvr
authored andcommitted
feat(base): Add EventCacheStore::filter_duplicated_events.
This patch adds and implements the `EventCacheStore::filter_duplicated_events` method. It is implemented on the `MemoryStore` and the `SqliteEventCacheStore`. This method remove the unique events and reutrn the duplicated events.
1 parent ed16e91 commit d45adde

File tree

5 files changed

+246
-5
lines changed

5 files changed

+246
-5
lines changed

crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ pub trait EventCacheStoreIntegrationTests {
123123

124124
/// Test that removing a room from storage empties all associated data.
125125
async fn test_remove_room(&self);
126+
127+
/// Test that filtering duplicated events works as expected.
128+
async fn test_filter_duplicated_events(&self);
126129
}
127130

128131
fn rebuild_linked_chunk(raws: Vec<RawChunk<Event, Gap>>) -> Option<LinkedChunk<3, Event, Gap>> {
@@ -502,6 +505,79 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
502505
let r1_linked_chunk = self.reload_linked_chunk(r1).await.unwrap();
503506
assert!(!r1_linked_chunk.is_empty());
504507
}
508+
509+
async fn test_filter_duplicated_events(&self) {
510+
let room_id = room_id!("!r0:matrix.org");
511+
let another_room_id = room_id!("!r1:matrix.org");
512+
let event = |msg: &str| make_test_event(room_id, msg);
513+
514+
let event_comte = event("comté");
515+
let event_brigand = event("brigand du jorat");
516+
let event_raclette = event("raclette");
517+
let event_morbier = event("morbier");
518+
let event_gruyere = event("gruyère");
519+
let event_tome = event("tome");
520+
let event_mont_dor = event("mont d'or");
521+
522+
self.handle_linked_chunk_updates(
523+
room_id,
524+
vec![
525+
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
526+
Update::PushItems {
527+
at: Position::new(CId::new(0), 0),
528+
items: vec![event_comte.clone(), event_brigand.clone()],
529+
},
530+
Update::NewGapChunk {
531+
previous: Some(CId::new(0)),
532+
new: CId::new(1),
533+
next: None,
534+
gap: Gap { prev_token: "brillat-savarin".to_owned() },
535+
},
536+
Update::NewItemsChunk { previous: Some(CId::new(1)), new: CId::new(2), next: None },
537+
Update::PushItems {
538+
at: Position::new(CId::new(2), 0),
539+
items: vec![event_morbier.clone(), event_mont_dor.clone()],
540+
},
541+
],
542+
)
543+
.await
544+
.unwrap();
545+
546+
// Add other events in another room, to ensure filtering take the `room_id` into
547+
// account.
548+
self.handle_linked_chunk_updates(
549+
another_room_id,
550+
vec![
551+
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
552+
Update::PushItems {
553+
at: Position::new(CId::new(0), 0),
554+
items: vec![event_tome.clone()],
555+
},
556+
],
557+
)
558+
.await
559+
.unwrap();
560+
561+
let duplicated_events = self
562+
.filter_duplicated_events(
563+
room_id,
564+
vec![
565+
event_comte.event_id().unwrap().to_owned(),
566+
event_raclette.event_id().unwrap().to_owned(),
567+
event_morbier.event_id().unwrap().to_owned(),
568+
event_gruyere.event_id().unwrap().to_owned(),
569+
event_tome.event_id().unwrap().to_owned(),
570+
event_mont_dor.event_id().unwrap().to_owned(),
571+
],
572+
)
573+
.await
574+
.unwrap();
575+
576+
assert_eq!(duplicated_events.len(), 3);
577+
assert_eq!(duplicated_events[0], event_comte.event_id().unwrap());
578+
assert_eq!(duplicated_events[1], event_morbier.event_id().unwrap());
579+
assert_eq!(duplicated_events[2], event_mont_dor.event_id().unwrap());
580+
}
505581
}
506582

507583
/// Macro building to allow your `EventCacheStore` implementation to run the
@@ -584,6 +660,13 @@ macro_rules! event_cache_store_integration_tests {
584660
get_event_cache_store().await.unwrap().into_event_cache_store();
585661
event_cache_store.test_remove_room().await;
586662
}
663+
664+
#[async_test]
665+
async fn test_filter_duplicated_events() {
666+
let event_cache_store =
667+
get_event_cache_store().await.unwrap().into_event_cache_store();
668+
event_cache_store.test_filter_duplicated_events().await;
669+
}
587670
}
588671
};
589672
}

crates/matrix-sdk-base/src/event_cache/store/memory_store.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use matrix_sdk_common::{
2626
};
2727
use ruma::{
2828
time::{Instant, SystemTime},
29-
MxcUri, OwnedMxcUri, RoomId,
29+
MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
3030
};
3131

3232
use super::{
@@ -148,6 +148,35 @@ impl EventCacheStore for MemoryStore {
148148
Ok(())
149149
}
150150

151+
async fn filter_duplicated_events(
152+
&self,
153+
room_id: &RoomId,
154+
mut events: Vec<OwnedEventId>,
155+
) -> Result<Vec<OwnedEventId>, Self::Error> {
156+
// Collect all duplicated events.
157+
let inner = self.inner.read().unwrap();
158+
159+
let mut duplicated_events = Vec::new();
160+
161+
for event in inner.events.unordered_events(room_id) {
162+
// If `events` is empty, we can short-circuit.
163+
if events.is_empty() {
164+
break;
165+
}
166+
167+
if let Some(event_id_a) = event.event_id() {
168+
// This event exists in the store event!
169+
if let Some(position) =
170+
events.iter().position(|event_id_b| &event_id_a == event_id_b)
171+
{
172+
duplicated_events.push(events.remove(position));
173+
}
174+
}
175+
}
176+
177+
Ok(duplicated_events)
178+
}
179+
151180
async fn add_media_content(
152181
&self,
153182
request: &MediaRequestParameters,

crates/matrix-sdk-base/src/event_cache/store/traits.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use matrix_sdk_common::{
1919
linked_chunk::{RawChunk, Update},
2020
AsyncTraitDeps,
2121
};
22-
use ruma::{MxcUri, RoomId};
22+
use ruma::{MxcUri, OwnedEventId, RoomId};
2323

2424
use super::{
2525
media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy},
@@ -80,6 +80,14 @@ pub trait EventCacheStore: AsyncTraitDeps {
8080
/// using the above [`Self::handle_linked_chunk_updates`] methods.
8181
async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error>;
8282

83+
/// Given a set of event ID, remove the unique events and return the
84+
/// duplicated events.
85+
async fn filter_duplicated_events(
86+
&self,
87+
room_id: &RoomId,
88+
events: Vec<OwnedEventId>,
89+
) -> Result<Vec<OwnedEventId>, Self::Error>;
90+
8391
/// Add a media file's content in the media store.
8492
///
8593
/// # Arguments
@@ -247,6 +255,14 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
247255
self.0.clear_all_rooms_chunks().await.map_err(Into::into)
248256
}
249257

258+
async fn filter_duplicated_events(
259+
&self,
260+
room_id: &RoomId,
261+
events: Vec<OwnedEventId>,
262+
) -> Result<Vec<OwnedEventId>, Self::Error> {
263+
self.0.filter_duplicated_events(room_id, events).await.map_err(Into::into)
264+
}
265+
250266
async fn add_media_content(
251267
&self,
252268
request: &MediaRequestParameters,

crates/matrix-sdk-common/src/linked_chunk/relational.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,21 @@ impl<Item, Gap> RelationalLinkedChunk<Item, Gap> {
292292
}
293293
}
294294
}
295+
296+
/// Return an iterator that yields events of a particular room with no
297+
/// particular order.
298+
pub fn unordered_events<'a>(&'a self, room_id: &'a RoomId) -> impl Iterator<Item = &'a Item> {
299+
self.items.iter().filter_map(move |item_row| {
300+
if item_row.room_id == room_id {
301+
match &item_row.item {
302+
Either::Item(item) => Some(item),
303+
Either::Gap(..) => None,
304+
}
305+
} else {
306+
None
307+
}
308+
})
309+
}
295310
}
296311

297312
impl<Item, Gap> RelationalLinkedChunk<Item, Gap>
@@ -1002,4 +1017,39 @@ mod tests {
10021017
],
10031018
);
10041019
}
1020+
1021+
#[test]
1022+
fn test_unordered_events() {
1023+
let room_id = room_id!("!r0:matrix.org");
1024+
let other_room_id = room_id!("!r1:matrix.org");
1025+
let mut relational_linked_chunk = RelationalLinkedChunk::<char, ()>::new();
1026+
1027+
relational_linked_chunk.apply_updates(
1028+
room_id,
1029+
vec![
1030+
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
1031+
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['a', 'b', 'c'] },
1032+
Update::NewItemsChunk { previous: Some(CId::new(0)), new: CId::new(1), next: None },
1033+
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['d', 'e', 'f'] },
1034+
],
1035+
);
1036+
1037+
relational_linked_chunk.apply_updates(
1038+
other_room_id,
1039+
vec![
1040+
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
1041+
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['x', 'y', 'z'] },
1042+
],
1043+
);
1044+
1045+
let mut events = relational_linked_chunk.unordered_events(room_id);
1046+
1047+
assert_eq!(*events.next().unwrap(), 'a');
1048+
assert_eq!(*events.next().unwrap(), 'b');
1049+
assert_eq!(*events.next().unwrap(), 'c');
1050+
assert_eq!(*events.next().unwrap(), 'd');
1051+
assert_eq!(*events.next().unwrap(), 'e');
1052+
assert_eq!(*events.next().unwrap(), 'f');
1053+
assert!(events.next().is_none());
1054+
}
10051055
}

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
//! A sqlite-based backend for the [`EventCacheStore`].
1616
17-
use std::{borrow::Cow, fmt, path::Path, sync::Arc};
17+
use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
1818

1919
use async_trait::async_trait;
2020
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
@@ -33,8 +33,8 @@ use matrix_sdk_base::{
3333
media::{MediaRequestParameters, UniqueKey},
3434
};
3535
use matrix_sdk_store_encryption::StoreCipher;
36-
use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId};
37-
use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior};
36+
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
37+
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
3838
use tokio::fs;
3939
use tracing::{debug, trace};
4040

@@ -622,6 +622,69 @@ impl EventCacheStore for SqliteEventCacheStore {
622622
Ok(())
623623
}
624624

625+
async fn filter_duplicated_events(
626+
&self,
627+
room_id: &RoomId,
628+
events: Vec<OwnedEventId>,
629+
) -> Result<Vec<OwnedEventId>, Self::Error> {
630+
// Select all events that exist in the store, i.e. the duplicates.
631+
let room_id = room_id.to_owned();
632+
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
633+
634+
self.acquire()
635+
.await?
636+
.with_transaction(move |txn| -> Result<_> {
637+
txn.chunk_large_query_over(events, None, move |txn, events| {
638+
let query = format!(
639+
"SELECT event_id FROM events WHERE room_id = ? AND event_id IN ({})",
640+
repeat_vars(events.len()),
641+
);
642+
let parameters = params_from_iter(
643+
// parameter for `room_id = ?`
644+
once(
645+
hashed_room_id
646+
.to_sql()
647+
// SAFETY: it cannot fail since `Key::to_sql` never fails
648+
.unwrap(),
649+
)
650+
// parameters for `event_id IN (…)`
651+
.chain(events.iter().map(|event| {
652+
event
653+
.as_str()
654+
.to_sql()
655+
// SAFETY: it cannot fail since `str::to_sql` never fails
656+
.unwrap()
657+
})),
658+
);
659+
660+
let mut duplicated_events = Vec::new();
661+
662+
for duplicated_event in txn
663+
.prepare(&query)?
664+
.query_map(parameters, |row| row.get::<_, Option<String>>(0))?
665+
{
666+
let duplicated_event = duplicated_event?;
667+
668+
let Some(duplicated_event) = duplicated_event else {
669+
// Event ID is malformed, let's skip it.
670+
continue;
671+
};
672+
673+
let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
674+
// Normally unreachable, but the event ID has been stored even if it is
675+
// malformed, let's skip it.
676+
continue;
677+
};
678+
679+
duplicated_events.push(duplicated_event);
680+
}
681+
682+
Ok(duplicated_events)
683+
})
684+
})
685+
.await
686+
}
687+
625688
async fn add_media_content(
626689
&self,
627690
request: &MediaRequestParameters,

0 commit comments

Comments
 (0)