Skip to content

feat: Implement cross-process lock for the EventCache #4192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
18 changes: 8 additions & 10 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
// limitations under the License.

#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use std::sync::Arc;
use std::{
collections::{BTreeMap, BTreeSet},
fmt, iter,
sync::Arc,
ops::Deref,
};

use eyeball::{SharedObservable, Subscriber};
Expand Down Expand Up @@ -72,7 +72,7 @@ use crate::RoomMemberships;
use crate::{
deserialized_responses::{RawAnySyncOrStrippedTimelineEvent, SyncTimelineEvent},
error::{Error, Result},
event_cache_store::DynEventCacheStore,
event_cache_store::EventCacheStoreLock,
response_processors::AccountDataProcessor,
rooms::{
normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons},
Expand All @@ -96,7 +96,7 @@ pub struct BaseClient {
pub(crate) store: Store,

/// The store used by the event cache.
event_cache_store: Arc<DynEventCacheStore>,
event_cache_store: EventCacheStoreLock,

/// The store used for encryption.
///
Expand All @@ -115,8 +115,7 @@ pub struct BaseClient {
pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,

/// A sender that is used to communicate changes to room information. Each
/// event contains the room and a boolean whether this event should
/// trigger a room list update.
/// tick contains the room ID and the reasons that have generated this tick.
pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,

/// The strategy to use for picking recipient devices, when sending an
Expand Down Expand Up @@ -250,14 +249,13 @@ impl BaseClient {
}

/// Get a reference to the store.
#[allow(unknown_lints, clippy::explicit_auto_deref)]
pub fn store(&self) -> &DynStateStore {
&*self.store
self.store.deref()
}

/// Get a reference to the event cache store.
pub fn event_cache_store(&self) -> &DynEventCacheStore {
&*self.event_cache_store
pub fn event_cache_store(&self) -> &EventCacheStoreLock {
&self.event_cache_store
}

/// Is the client logged in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,79 @@ macro_rules! event_cache_store_integration_tests {
}
};
}

/// Macro generating tests for the event cache store, related to time (mostly
/// for the cross-process lock).
#[allow(unused_macros)]
#[macro_export]
macro_rules! event_cache_store_integration_tests_time {
() => {
#[cfg(not(target_arch = "wasm32"))]
mod event_cache_store_integration_tests_time {
use std::time::Duration;

use matrix_sdk_test::async_test;
use $crate::event_cache_store::IntoEventCacheStore;

use super::get_event_cache_store;

#[async_test]
async fn test_lease_locks() {
let store = get_event_cache_store().await.unwrap().into_event_cache_store();

let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired0);

// Should extend the lease automatically (same holder).
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired2);

// Should extend the lease automatically (same holder + time is ok).
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired3);

// Another attempt at taking the lock should fail, because it's taken.
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired4);

// Even if we insist.
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired5);

// That's a nice test we got here, go take a little nap.
tokio::time::sleep(Duration::from_millis(50)).await;

// Still too early.
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired55);

// Ok you can take another nap then.
tokio::time::sleep(Duration::from_millis(250)).await;

// At some point, we do get the lock.
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
assert!(acquired6);

tokio::time::sleep(Duration::from_millis(1)).await;

// The other gets it almost immediately too.
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired7);

tokio::time::sleep(Duration::from_millis(1)).await;

// But when we take a longer lease...
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired8);

// It blocks the other user.
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(!acquired9);

// We can hold onto our lease.
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired10);
}
}
};
}
22 changes: 19 additions & 3 deletions crates/matrix-sdk-base/src/event_cache_store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{num::NonZeroUsize, sync::RwLock as StdRwLock};
use std::{collections::HashMap, num::NonZeroUsize, sync::RwLock as StdRwLock, time::Instant};

use async_trait::async_trait;
use matrix_sdk_common::ring_buffer::RingBuffer;
use matrix_sdk_common::{
ring_buffer::RingBuffer, store_locks::memory_store_helper::try_take_leased_lock,
};
use ruma::{MxcUri, OwnedMxcUri};

use super::{EventCacheStore, EventCacheStoreError, Result};
Expand All @@ -28,14 +30,18 @@ use crate::media::{MediaRequest, UniqueKey as _};
#[derive(Debug)]
pub struct MemoryStore {
media: StdRwLock<RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>>,
leases: StdRwLock<HashMap<String, (String, Instant)>>,
}

// SAFETY: `new_unchecked` is safe because 20 is not zero.
const NUMBER_OF_MEDIAS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) };

impl Default for MemoryStore {
fn default() -> Self {
Self { media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)) }
Self {
media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)),
leases: Default::default(),
}
}
}

Expand All @@ -51,6 +57,15 @@ impl MemoryStore {
impl EventCacheStore for MemoryStore {
type Error = EventCacheStoreError;

async fn try_take_leased_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder))
}

async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
// Avoid duplication. Let's try to remove it first.
self.remove_media_content(request).await?;
Expand Down Expand Up @@ -130,4 +145,5 @@ mod tests {
}

event_cache_store_integration_tests!();
event_cache_store_integration_tests_time!();
}
97 changes: 96 additions & 1 deletion crates/matrix-sdk-base/src/event_cache_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
//! into the event cache for the actual storage. By default this brings an
//! in-memory store.

use std::str::Utf8Error;
use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};

#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
mod memory_store;
mod traits;

use matrix_sdk_common::store_locks::{
BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError,
};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;

#[cfg(any(test, feature = "testing"))]
Expand All @@ -36,6 +39,78 @@ pub use self::{
traits::{DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
};

/// The high-level public type to represent an `EventCacheStore` lock.
#[derive(Clone)]
pub struct EventCacheStoreLock {
/// The inner cross process lock that is used to lock the `EventCacheStore`.
cross_process_lock: CrossProcessStoreLock<LockableEventCacheStore>,

/// The store itself.
///
/// That's the only place where the store exists.
store: Arc<DynEventCacheStore>,
}

#[cfg(not(tarpaulin_include))]
impl fmt::Debug for EventCacheStoreLock {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
}
}

impl EventCacheStoreLock {
/// Create a new lock around the [`EventCacheStore`].
pub fn new<S>(store: S, key: String, holder: String) -> Self
where
S: IntoEventCacheStore,
{
let store = store.into_event_cache_store();

Self {
cross_process_lock: CrossProcessStoreLock::new(
LockableEventCacheStore(store.clone()),
key,
holder,
),
store,
}
}

/// Acquire a spin lock (see [`CrossProcessStoreLock::spin_lock`]).
pub async fn lock(&self) -> Result<EventCacheStoreLockGuard<'_>, LockStoreError> {
let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?;

Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
}
}

/// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`].
/// When this structure is dropped (falls out of scope), the lock will be
/// unlocked.
pub struct EventCacheStoreLockGuard<'a> {
/// The cross process lock guard.
#[allow(unused)]
cross_process_lock_guard: CrossProcessStoreLockGuard,

/// A reference to the store.
store: &'a DynEventCacheStore,
}

#[cfg(not(tarpaulin_include))]
impl<'a> fmt::Debug for EventCacheStoreLockGuard<'a> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
}
}

impl<'a> Deref for EventCacheStoreLockGuard<'a> {
type Target = DynEventCacheStore;

fn deref(&self) -> &Self::Target {
self.store
}
}

/// Event cache store specific error type.
#[derive(Debug, thiserror::Error)]
pub enum EventCacheStoreError {
Expand Down Expand Up @@ -83,3 +158,23 @@ impl EventCacheStoreError {

/// An `EventCacheStore` specific result type.
pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;

/// A type that wraps the [`EventCacheStore`] but implements [`BackingStore`] to
/// make it usable inside the cross process lock.
#[derive(Clone, Debug)]
struct LockableEventCacheStore(Arc<DynEventCacheStore>);

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl BackingStore for LockableEventCacheStore {
type LockError = EventCacheStoreError;

async fn try_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}
17 changes: 17 additions & 0 deletions crates/matrix-sdk-base/src/event_cache_store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ pub trait EventCacheStore: AsyncTraitDeps {
/// The error type used by this event cache store.
type Error: fmt::Debug + Into<EventCacheStoreError>;

/// Try to take a lock using the given store.
async fn try_take_leased_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;

/// Add a media file's content in the media store.
///
/// # Arguments
Expand Down Expand Up @@ -105,6 +113,15 @@ impl<T: fmt::Debug> fmt::Debug for EraseEventCacheStoreError<T> {
impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
type Error = EventCacheStoreError;

async fn try_take_leased_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
}

async fn add_media_content(
&self,
request: &MediaRequest,
Expand Down
22 changes: 16 additions & 6 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::warn;

use crate::{
event_cache_store::{DynEventCacheStore, IntoEventCacheStore},
event_cache_store,
rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};
Expand Down Expand Up @@ -489,7 +489,7 @@ pub struct StoreConfig {
#[cfg(feature = "e2e-encryption")]
pub(crate) crypto_store: Arc<DynCryptoStore>,
pub(crate) state_store: Arc<DynStateStore>,
pub(crate) event_cache_store: Arc<DynEventCacheStore>,
pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
}

#[cfg(not(tarpaulin_include))]
Expand All @@ -507,8 +507,11 @@ impl StoreConfig {
#[cfg(feature = "e2e-encryption")]
crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
state_store: Arc::new(MemoryStore::new()),
event_cache_store: crate::event_cache_store::MemoryStore::new()
.into_event_cache_store(),
event_cache_store: event_cache_store::EventCacheStoreLock::new(
event_cache_store::MemoryStore::new(),
"default-key".to_owned(),
"matrix-sdk-base".to_owned(),
Comment on lines +512 to +513
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The values provided here are scary, and should be controllable by the callers, otherwise it's not possible to distinguish a process from another. The key could be a constant used everywhere in the codebase (maybe buried into a single place, and then passed around — or buried a single time when creating an EventCacheStoreLock from an event cache store impl), but the holder's value has to be provided by the embedder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to address that as a follow-up PR: I want to introduce a "process holder name" inside the client that can be used by all cross-process lock, otherwise things start to be really clumsy. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnjbvr and I have agreed to remove these values with a configuration in the Client. I'm already working on this as a follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is, #4224

),
}
}

Expand All @@ -528,8 +531,15 @@ impl StoreConfig {
}

/// Set a custom implementation of an `EventCacheStore`.
pub fn event_cache_store(mut self, event_cache_store: impl IntoEventCacheStore) -> Self {
self.event_cache_store = event_cache_store.into_event_cache_store();
///
/// The `key` and `holder` arguments represent the key and holder inside the
/// [`CrossProcessStoreLock::new`][matrix_sdk_common::store_locks::CrossProcessStoreLock::new].
pub fn event_cache_store<S>(mut self, event_cache_store: S, key: String, holder: String) -> Self
where
S: event_cache_store::IntoEventCacheStore,
{
self.event_cache_store =
event_cache_store::EventCacheStoreLock::new(event_cache_store, key, holder);
self
}
}
Expand Down
Loading
Loading