Skip to content

Commit 3d653d3

Browse files
committed
fix(sqlite): Design a new schema to get faster insertions.
This patch is twofold. First off, it provides a new schema allowing to improve the performance of `SqliteEventCacheStore` for 100_000 events from 6.7k events/sec to 284k events/sec on my machine. Second, it now assumes that `EventCacheStore` does NOT store invalid events. It was already the case, but the SQLite schema was not rejecting invalid event in case some were handled. It's now explicitely forbidden.
1 parent b22bb3e commit 3d653d3

File tree

3 files changed

+111
-31
lines changed

3 files changed

+111
-31
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
DROP INDEX "linked_chunks_id_and_room_id";
2+
DROP INDEX "linked_chunks_event_id_and_room_id";
3+
DROP TABLE "events";
4+
DROP TABLE "gaps";
5+
DROP TABLE "linked_chunks";
6+
7+
CREATE TABLE "linked_chunks" (
8+
-- Which room does this chunk belong to? (hashed key shared with the two other tables)
9+
"room_id" BLOB NOT NULL,
10+
-- Identifier of the chunk, unique per room. Corresponds to a `ChunkIdentifier`.
11+
"id" INTEGER NOT NULL,
12+
13+
-- Previous chunk in the linked list. Corresponds to a `ChunkIdentifier`.
14+
"previous" INTEGER,
15+
-- Next chunk in the linked list. Corresponds to a `ChunkIdentifier`.
16+
"next" INTEGER,
17+
-- Type of underlying entries: E for events, G for gaps
18+
"type" TEXT CHECK("type" IN ('E', 'G')) NOT NULL,
19+
20+
-- Primary key is composed of the room ID and the chunk identifier.
21+
-- Such pairs must be unique.
22+
PRIMARY KEY (room_id, id)
23+
)
24+
WITHOUT ROWID;
25+
26+
CREATE TABLE "gaps" (
27+
-- Which room does this event belong to? (hashed key shared with linked_chunks)
28+
"room_id" BLOB NOT NULL,
29+
-- Which chunk does this gap refer to? Corresponds to a `ChunkIdentifier`.
30+
"chunk_id" INTEGER NOT NULL,
31+
32+
-- The previous batch token of a gap (encrypted value).
33+
"prev_token" BLOB NOT NULL,
34+
35+
-- Primary key is composed of the room ID and the chunk identifier.
36+
-- Such pairs must be unique.
37+
PRIMARY KEY (room_id, chunk_id),
38+
39+
-- If the owning chunk gets deleted, delete the entry too.
40+
FOREIGN KEY (chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE
41+
)
42+
WITHOUT ROWID;
43+
44+
-- Items for an event chunk.
45+
CREATE TABLE "events" (
46+
-- Which room does this event belong to? (hashed key shared with linked_chunks)
47+
"room_id" BLOB NOT NULL,
48+
-- Which chunk does this event refer to? Corresponds to a `ChunkIdentifier`.
49+
"chunk_id" INTEGER NOT NULL,
50+
51+
-- `OwnedEventId` for events.
52+
"event_id" BLOB NOT NULL,
53+
-- JSON serialized `TimelineEvent` (encrypted value).
54+
"content" BLOB NOT NULL,
55+
-- Position (index) in the chunk.
56+
"position" INTEGER NOT NULL,
57+
58+
-- Primary key is the event ID.
59+
PRIMARY KEY (event_id),
60+
61+
-- We need a uniqueness constraint over the `room_id`, `chunk_id` and
62+
-- `position` tuple because (i) they must be unique, (ii) it dramatically
63+
-- improves the performance.
64+
UNIQUE (room_id, chunk_id, position),
65+
66+
-- If the owning chunk gets deleted, delete the entry too.
67+
FOREIGN KEY (room_id, chunk_id) REFERENCES linked_chunks(room_id, id) ON DELETE CASCADE
68+
)
69+
WITHOUT ROWID;

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
1919
use async_trait::async_trait;
2020
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
2121
use matrix_sdk_base::{
22+
deserialized_responses::TimelineEvent,
2223
event_cache::{
2324
store::{
2425
media::{
@@ -38,7 +39,7 @@ use matrix_sdk_store_encryption::StoreCipher;
3839
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
3940
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
4041
use tokio::fs;
41-
use tracing::{debug, trace};
42+
use tracing::{debug, error, trace};
4243

4344
use crate::{
4445
error::{Error, Result},
@@ -64,7 +65,7 @@ mod keys {
6465
/// This is used to figure whether the SQLite database requires a migration.
6566
/// Every new SQL migration should imply a bump of this number, and changes in
6667
/// the [`run_migrations`] function.
67-
const DATABASE_VERSION: u8 = 5;
68+
const DATABASE_VERSION: u8 = 6;
6869

6970
/// The string used to identify a chunk of type events, in the `type` field in
7071
/// the database.
@@ -351,6 +352,14 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
351352
.await?;
352353
}
353354

355+
if version < 6 {
356+
conn.with_transaction(|txn| {
357+
txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
358+
txn.set_db_version(6)
359+
})
360+
.await?;
361+
}
362+
354363
Ok(())
355364
}
356365

@@ -499,11 +508,19 @@ impl EventCacheStore for SqliteEventCacheStore {
499508
"INSERT INTO events(chunk_id, room_id, event_id, content, position) VALUES (?, ?, ?, ?, ?)"
500509
)?;
501510

502-
for (i, event) in items.into_iter().enumerate() {
511+
let invalid_event = |event: TimelineEvent| {
512+
let Some(event_id) = event.event_id() else {
513+
error!(%room_id, "Trying to push an event with no ID");
514+
return None;
515+
};
516+
517+
Some((event_id.to_string(), event))
518+
};
519+
520+
for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
503521
let serialized = serde_json::to_vec(&event)?;
504522
let content = this.encode_value(serialized)?;
505523

506-
let event_id = event.event_id().map(|event_id| event_id.to_string());
507524
let index = at.index() + i;
508525

509526
statement.execute((chunk_id, &hashed_room_id, event_id, content, index))?;
@@ -520,7 +537,10 @@ impl EventCacheStore for SqliteEventCacheStore {
520537
let content = this.encode_value(serialized)?;
521538

522539
// The event id should be the same, but just in case it changed…
523-
let event_id = event.event_id().map(|event_id| event_id.to_string());
540+
let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
541+
error!(%room_id, "Trying to replace an event with a new one that has no ID");
542+
continue;
543+
};
524544

525545
txn.execute(
526546
r#"
@@ -829,22 +849,18 @@ impl EventCacheStore for SqliteEventCacheStore {
829849
.prepare(&query)?
830850
.query_map(parameters, |row| {
831851
Ok((
832-
row.get::<_, Option<String>>(0)?,
852+
row.get::<_, String>(0)?,
833853
row.get::<_, u64>(1)?,
834854
row.get::<_, usize>(2)?
835855
))
836856
})?
837857
{
838858
let (duplicated_event, chunk_identifier, index) = duplicated_event?;
839859

840-
let Some(duplicated_event) = duplicated_event else {
841-
// Event ID is malformed, let's skip it.
842-
continue;
843-
};
844-
845-
let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
860+
let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
846861
// Normally unreachable, but the event ID has been stored even if it is
847862
// malformed, let's skip it.
863+
error!(%duplicated_event, %room_id, "Reading an malformed event ID");
848864
continue;
849865
};
850866

crates/matrix-sdk/src/event_cache/deduplicator.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{collections::BTreeSet, fmt, sync::Mutex};
2020
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
2121
use matrix_sdk_base::{event_cache::store::EventCacheStoreLock, linked_chunk::Position};
2222
use ruma::{OwnedEventId, OwnedRoomId};
23-
use tracing::{debug, warn};
23+
use tracing::{debug, error};
2424

2525
use super::{
2626
room::events::{Event, RoomEvents},
@@ -204,26 +204,22 @@ impl BloomFilterDeduplicator {
204204

205205
let events = self
206206
.scan_and_learn(events.into_iter(), room_events)
207-
.filter_map(|decorated_event| match decorated_event {
208-
Decoration::Unique(event) => Some(event),
207+
.map(|decorated_event| match decorated_event {
208+
Decoration::Unique(event) => event,
209209
Decoration::Duplicated((event, position)) => {
210210
debug!(event_id = ?event.event_id(), "Found a duplicated event");
211211

212212
let event_id = event
213213
.event_id()
214-
// SAFETY: An event with no ID is decorated as
215-
// `Decoration::Invalid`. Thus, it's
216-
// safe to unwrap the `Option<OwnedEventId>` here.
214+
// SAFETY: An event with no ID is not possible, as invalid events are
215+
// already filtered out. Thus, it's safe to unwrap the
216+
// `Option<OwnedEventId>` here.
217217
.expect("The event has no ID");
218218

219219
duplicated_event_ids.push((event_id, position));
220220

221221
// Keep the new event!
222-
Some(event)
223-
}
224-
Decoration::Invalid(event) => {
225-
warn!(?event, "Found an event with no ID");
226-
None
222+
event
227223
}
228224
})
229225
.collect::<Vec<_>>();
@@ -253,13 +249,15 @@ impl BloomFilterDeduplicator {
253249
where
254250
I: Iterator<Item = Event> + 'a,
255251
{
256-
new_events_to_scan.map(move |event| {
252+
new_events_to_scan.filter_map(move |event| {
257253
let Some(event_id) = event.event_id() else {
258-
// The event has no `event_id`.
259-
return Decoration::Invalid(event);
254+
// The event has no `event_id`. This is normally unreachable as event with no ID
255+
// are already filtered out.
256+
error!(?event, "Found an event with no ID");
257+
return None;
260258
};
261259

262-
if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
260+
Some(if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
263261
// Oh oh, it looks like we have found a duplicate!
264262
//
265263
// However, bloom filters have false positives. We are NOT sure the event is NOT
@@ -282,7 +280,7 @@ impl BloomFilterDeduplicator {
282280
// Bloom filter has no false negatives. We are sure the event is NOT present: we
283281
// can keep it in the iterator.
284282
Decoration::Unique(event)
285-
}
283+
})
286284
})
287285
}
288286
}
@@ -295,9 +293,6 @@ enum Decoration<I> {
295293

296294
/// This event is duplicated.
297295
Duplicated((I, Position)),
298-
299-
/// This event is invalid (i.e. not well formed).
300-
Invalid(I),
301296
}
302297

303298
pub(super) struct DeduplicationOutcome {

0 commit comments

Comments
 (0)