Skip to content

Commit 1a2b09e

Browse files
committed
refactor: Implement try_take_leased_lock on SqliteEventCacheStore
1 parent f705251 commit 1a2b09e

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE TABLE "lease_locks" (
2+
"key" TEXT PRIMARY KEY NOT NULL,
3+
"holder" TEXT NOT NULL,
4+
"expiration" REAL NOT NULL
5+
);

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use matrix_sdk_base::{
77
media::{MediaRequest, UniqueKey},
88
};
99
use matrix_sdk_store_encryption::StoreCipher;
10+
use ruma::MilliSecondsSinceUnixEpoch;
1011
use rusqlite::OptionalExtension;
1112
use tokio::fs;
1213
use tracing::debug;
@@ -26,8 +27,8 @@ mod keys {
2627
///
2728
/// This is used to figure whether the SQLite database requires a migration.
2829
/// Every new SQL migration should imply a bump of this number, and changes in
29-
/// the [`SqliteEventCacheStore::run_migrations`] function.
30-
const DATABASE_VERSION: u8 = 1;
30+
/// the [`run_migrations`] function.
31+
const DATABASE_VERSION: u8 = 2;
3132

3233
/// A SQLite-based event cache store.
3334
#[derive(Clone)]
@@ -133,6 +134,14 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
133134
.await?;
134135
}
135136

137+
if version < 2 {
138+
conn.with_transaction(|txn| {
139+
txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
140+
txn.set_db_version(2)
141+
})
142+
.await?;
143+
}
144+
136145
Ok(())
137146
}
138147

@@ -145,8 +154,32 @@ impl EventCacheStore for SqliteEventCacheStore {
145154
lease_duration_ms: u32,
146155
key: &str,
147156
holder: &str,
148-
) -> Result<bool, Self::Error> {
149-
todo!()
157+
) -> Result<bool> {
158+
let key = key.to_owned();
159+
let holder = holder.to_owned();
160+
161+
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
162+
let expiration = now + lease_duration_ms as u64;
163+
164+
let num_touched = self
165+
.acquire()
166+
.await?
167+
.with_transaction(move |txn| {
168+
txn.execute(
169+
"INSERT INTO lease_locks (key, holder, expiration)
170+
VALUES (?1, ?2, ?3)
171+
ON CONFLICT (key)
172+
DO
173+
UPDATE SET holder = ?2, expiration = ?3
174+
WHERE holder = ?2
175+
OR expiration < ?4
176+
",
177+
(key, holder, u64::from(expiration), u64::from(now)),
178+
)
179+
})
180+
.await?;
181+
182+
Ok(num_touched == 1)
150183
}
151184

152185
async fn add_media_content(&self, request: &MediaRequest, content: Vec<u8>) -> Result<()> {

0 commit comments

Comments
 (0)