Skip to content

Commit 5e10ccc

Browse files
authored
Add more logging for crypto store generation counter (#3207)
It's a bit unclear whether the crypto-store generation counter is doing the right thing in terms of causing us to reload the OlmMachine. There is a suspicion that things might be keeping hold of references to the old OlmMachine. This PR attempts to add the generation number to the logging for any operations that hold the cross-process lock. It's obviously not bulletproof: for example, it is possible for the OlmMachine to be replaced without holding the lock; but hopefully this will at least help us understand what's going on.
1 parent 6f9147d commit 5e10ccc

File tree

5 files changed

+101
-63
lines changed

5 files changed

+101
-63
lines changed

crates/matrix-sdk-crypto/src/machine.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1927,6 +1927,8 @@ impl OlmMachine {
19271927
None => 0,
19281928
};
19291929

1930+
tracing::debug!("Initialising crypto store generation at {}", gen);
1931+
19301932
self.inner
19311933
.store
19321934
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
@@ -1939,19 +1941,32 @@ impl OlmMachine {
19391941

19401942
/// If needs be, update the local and on-disk crypto store generation.
19411943
///
1942-
/// Returns true whether another user has modified the internal generation
1943-
/// counter, and as such we've incremented and updated it in the
1944-
/// database.
1945-
///
19461944
/// ## Requirements
19471945
///
19481946
/// - This assumes that `initialize_crypto_store_generation` has been called
19491947
/// beforehand.
19501948
/// - This requires that the crypto store lock has been acquired.
1951-
pub async fn maintain_crypto_store_generation(
1952-
&self,
1949+
///
1950+
/// # Arguments
1951+
///
1952+
/// * `generation` - The in-memory generation counter (or rather, the
1953+
/// `Mutex` wrapping it). This defines the "expected" generation on entry,
1954+
/// and, if we determine an update is needed, is updated to hold the "new"
1955+
/// generation.
1956+
///
1957+
/// # Returns
1958+
///
1959+
/// A tuple containing:
1960+
///
1961+
/// * A `bool`, set to `true` if another process has updated the generation
1962+
/// number in the `Store` since our expected value, and as such we've
1963+
/// incremented and updated it in the database. Otherwise, `false`.
1964+
///
1965+
/// * The (possibly updated) generation counter.
1966+
pub async fn maintain_crypto_store_generation<'a>(
1967+
&'a self,
19531968
generation: &Mutex<Option<u64>>,
1954-
) -> StoreResult<bool> {
1969+
) -> StoreResult<(bool, u64)> {
19551970
let mut gen_guard = generation.lock().await;
19561971

19571972
// The database value must be there:
@@ -1973,10 +1988,10 @@ impl OlmMachine {
19731988
CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
19741989
})?);
19751990

1976-
let expected_gen = match gen_guard.as_ref() {
1991+
let new_gen = match gen_guard.as_ref() {
19771992
Some(expected_gen) => {
19781993
if actual_gen == *expected_gen {
1979-
return Ok(false);
1994+
return Ok((false, actual_gen));
19801995
}
19811996
// Increment the biggest, and store it everywhere.
19821997
actual_gen.max(*expected_gen).wrapping_add(1)
@@ -1992,22 +2007,19 @@ impl OlmMachine {
19922007
"Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
19932008
*gen_guard,
19942009
actual_gen,
1995-
expected_gen
2010+
new_gen
19962011
);
19972012

19982013
// Update known value.
1999-
*gen_guard = Some(expected_gen);
2014+
*gen_guard = Some(new_gen);
20002015

20012016
// Update value in database.
20022017
self.inner
20032018
.store
2004-
.set_custom_value(
2005-
Self::CURRENT_GENERATION_STORE_KEY,
2006-
expected_gen.to_le_bytes().to_vec(),
2007-
)
2019+
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, new_gen.to_le_bytes().to_vec())
20082020
.await?;
20092021

2010-
Ok(true)
2022+
Ok((true, new_gen))
20112023
}
20122024

20132025
/// Manage dehydrated devices.

crates/matrix-sdk-ui/src/encryption_sync_service.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
//!
2727
//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
2828
29-
use std::time::Duration;
29+
use std::{pin::Pin, time::Duration};
3030

3131
use async_stream::stream;
3232
use futures_core::stream::Stream;
3333
use futures_util::{pin_mut, StreamExt};
3434
use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS};
3535
use ruma::{api::client::sync::sync_events::v4, assign};
3636
use tokio::sync::OwnedMutexGuard;
37-
use tracing::{debug, trace};
37+
use tracing::{debug, instrument, trace, Span};
3838

3939
/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
4040
///
@@ -143,6 +143,7 @@ impl EncryptionSyncService {
143143
/// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
144144
/// most one encryption sync running at any time. See its documentation
145145
/// for more details.
146+
#[instrument(skip_all, fields(store_generation))]
146147
pub async fn run_fixed_iterations(
147148
self,
148149
num_iterations: u8,
@@ -152,7 +153,7 @@ impl EncryptionSyncService {
152153

153154
pin_mut!(sync);
154155

155-
let _lock_guard = if self.with_locking {
156+
let lock_guard = if self.with_locking {
156157
let mut lock_guard =
157158
self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
158159

@@ -192,6 +193,8 @@ impl EncryptionSyncService {
192193
None
193194
};
194195

196+
Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
197+
195198
for _ in 0..num_iterations {
196199
match sync.next().await {
197200
Some(Ok(update_summary)) => {
@@ -241,17 +244,7 @@ impl EncryptionSyncService {
241244
pin_mut!(sync);
242245

243246
loop {
244-
let guard = if self.with_locking {
245-
self.client
246-
.encryption()
247-
.spin_lock_store(Some(60000))
248-
.await
249-
.map_err(Error::LockError)?
250-
} else {
251-
None
252-
};
253-
254-
match sync.next().await {
247+
match self.next_sync_with_lock(&mut sync).await? {
255248
Some(Ok(update_summary)) => {
256249
// This API is only concerned with the e2ee and to-device extensions.
257250
// Warn if anything weird has been received from the proxy.
@@ -264,18 +257,12 @@ impl EncryptionSyncService {
264257

265258
// Cool cool, let's do it again.
266259
trace!("Encryption sync received an update!");
267-
268-
drop(guard);
269-
270260
yield Ok(());
271261
continue;
272262
}
273263

274264
Some(Err(err)) => {
275265
trace!("Encryption sync stopped because of an error: {err:#}");
276-
277-
drop(guard);
278-
279266
yield Err(Error::SlidingSync(err));
280267
break;
281268
}
@@ -289,6 +276,24 @@ impl EncryptionSyncService {
289276
})
290277
}
291278

279+
/// Helper function for `sync`. Take the cross-process store lock, and call
280+
/// `sync.next()`
281+
#[instrument(skip_all, fields(store_generation))]
282+
async fn next_sync_with_lock<Item>(
283+
&self,
284+
sync: &mut Pin<&mut impl Stream<Item = Item>>,
285+
) -> Result<Option<Item>, Error> {
286+
let guard = if self.with_locking {
287+
self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
288+
} else {
289+
None
290+
};
291+
292+
Span::current().record("store_generation", guard.map(|guard| guard.generation()));
293+
294+
Ok(sync.next().await)
295+
}
296+
292297
/// Requests that the underlying sliding sync be stopped.
293298
///
294299
/// This will unlock the cross-process lock, if taken.

crates/matrix-sdk/src/encryption/mod.rs

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,20 @@ pub enum VerificationState {
201201
Unverified,
202202
}
203203

204+
/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
205+
#[derive(Debug)]
206+
pub struct CrossProcessLockStoreGuardWithGeneration {
207+
_guard: CrossProcessStoreLockGuard,
208+
generation: u64,
209+
}
210+
211+
impl CrossProcessLockStoreGuardWithGeneration {
212+
/// Return the Crypto Store generation associated with this store lock.
213+
pub fn generation(&self) -> u64 {
214+
self.generation
215+
}
216+
}
217+
204218
impl Client {
205219
pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
206220
self.base_client().olm_machine().await
@@ -1247,21 +1261,30 @@ impl Encryption {
12471261

12481262
/// Maybe reload the `OlmMachine` after acquiring the lock for the first
12491263
/// time.
1250-
async fn on_lock_newly_acquired(&self) -> Result<(), Error> {
1264+
///
1265+
/// Returns the current generation number.
1266+
async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
12511267
let olm_machine_guard = self.client.olm_machine().await;
12521268
if let Some(olm_machine) = olm_machine_guard.as_ref() {
1253-
// If the crypto store generation has changed,
1254-
if olm_machine
1269+
let (new_gen, generation_number) = olm_machine
12551270
.maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1256-
.await?
1257-
{
1271+
.await?;
1272+
// If the crypto store generation has changed,
1273+
if new_gen {
12581274
// (get rid of the reference to the current crypto store first)
12591275
drop(olm_machine_guard);
12601276
// Recreate the OlmMachine.
12611277
self.client.base_client().regenerate_olm().await?;
12621278
}
1279+
Ok(generation_number)
1280+
} else {
1281+
// XXX: not sure this is reachable. Seems like the OlmMachine should always have
1282+
// been initialised by the time we get here. Ideally we'd panic, or return an
1283+
// error, but for now I'm just adding some logging to check if it
1284+
// happens, and returning the magic number 0.
1285+
warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1286+
Ok(0)
12631287
}
1264-
Ok(())
12651288
}
12661289

12671290
/// If a lock was created with [`Self::enable_cross_process_store_lock`],
@@ -1272,13 +1295,13 @@ impl Encryption {
12721295
pub async fn spin_lock_store(
12731296
&self,
12741297
max_backoff: Option<u32>,
1275-
) -> Result<Option<CrossProcessStoreLockGuard>, Error> {
1298+
) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
12761299
if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
12771300
let guard = lock.spin_lock(max_backoff).await?;
12781301

1279-
self.on_lock_newly_acquired().await?;
1302+
let generation = self.on_lock_newly_acquired().await?;
12801303

1281-
Ok(Some(guard))
1304+
Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
12821305
} else {
12831306
Ok(None)
12841307
}
@@ -1288,15 +1311,19 @@ impl Encryption {
12881311
/// attempts to lock it once.
12891312
///
12901313
/// Returns a guard to the lock, if it was obtained.
1291-
pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessStoreLockGuard>, Error> {
1314+
pub async fn try_lock_store_once(
1315+
&self,
1316+
) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
12921317
if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
12931318
let maybe_guard = lock.try_lock_once().await?;
12941319

1295-
if maybe_guard.is_some() {
1296-
self.on_lock_newly_acquired().await?;
1297-
}
1320+
let Some(guard) = maybe_guard else {
1321+
return Ok(None);
1322+
};
1323+
1324+
let generation = self.on_lock_newly_acquired().await?;
12981325

1299-
Ok(maybe_guard)
1326+
Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
13001327
} else {
13011328
Ok(None)
13021329
}

crates/matrix-sdk/src/room/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,12 +1448,13 @@ impl Room {
14481448
// TODO: expose this publicly so people can pre-share a group session if
14491449
// e.g. a user starts to type a message for a room.
14501450
#[cfg(feature = "e2e-encryption")]
1451-
#[instrument(skip_all, fields(room_id = ?self.room_id()))]
1451+
#[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
14521452
async fn preshare_room_key(&self) -> Result<()> {
14531453
self.ensure_room_joined()?;
14541454

14551455
// Take and release the lock on the store, if needs be.
1456-
let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1456+
let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1457+
tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
14571458

14581459
self.client
14591460
.locks()

crates/matrix-sdk/src/sliding_sync/mod.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -722,24 +722,19 @@ impl SlidingSync {
722722
pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
723723
debug!("Starting sync stream");
724724

725-
let sync_span = Span::current();
726725
let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
727726

728727
stream! {
729728
loop {
730-
sync_span.in_scope(|| {
731-
debug!("Sync stream is running");
732-
});
729+
debug!("Sync stream is running");
733730

734731
select! {
735732
biased;
736733

737734
internal_message = internal_channel_receiver.recv() => {
738735
use SlidingSyncInternalMessage::*;
739736

740-
sync_span.in_scope(|| {
741-
debug!(?internal_message, "Sync stream has received an internal message");
742-
});
737+
debug!(?internal_message, "Sync stream has received an internal message");
743738

744739
match internal_message {
745740
Err(_) | Ok(SyncLoopStop) => {
@@ -752,7 +747,7 @@ impl SlidingSync {
752747
}
753748
}
754749

755-
update_summary = self.sync_once().instrument(sync_span.clone()) => {
750+
update_summary = self.sync_once() => {
756751
match update_summary {
757752
Ok(updates) => {
758753
yield Ok(updates);
@@ -767,9 +762,7 @@ impl SlidingSync {
767762
Err(error) => {
768763
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
769764
// The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
770-
sync_span.in_scope(|| async {
771-
self.expire_session().await;
772-
}).await;
765+
self.expire_session().await;
773766
}
774767

775768
yield Err(error);

0 commit comments

Comments
 (0)