Skip to content

Commit 87a6037

Browse files
committed
refactor(event cache): consolidate logic around returning the previous gap token
1 parent ee710e3 commit 87a6037

File tree

4 files changed

+138
-79
lines changed

4 files changed

+138
-79
lines changed

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -178,38 +178,70 @@ impl RoomPagination {
178178
// to load from storage first, then from network if storage indicated
179179
// there's no previous events chunk to load.
180180

181-
match self.inner.state.write().await.load_more_events_backwards().await? {
182-
LoadMoreEventsBackwardsOutcome::Gap => {
183-
// We have a gap, so resolve it with a network back-pagination.
184-
}
181+
loop {
182+
let mut state_guard = self.inner.state.write().await;
183+
184+
match state_guard.load_more_events_backwards().await? {
185+
LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
186+
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
187+
188+
// Release the state guard while waiting, to not deadlock the sync task.
189+
drop(state_guard);
190+
191+
// Otherwise, wait for a notification that we received a previous-batch token.
192+
trace!("waiting for a pagination token…");
193+
let _ = timeout(
194+
self.inner.pagination_batch_token_notifier.notified(),
195+
DEFAULT_WAIT_FOR_TOKEN_DURATION,
196+
)
197+
.await;
198+
trace!("done waiting");
199+
200+
self.inner.state.write().await.waited_for_initial_prev_token = true;
201+
202+
// Retry!
203+
//
204+
// Note: the next call to `load_more_events_backwards` can't return
205+
// `WaitForInitialPrevToken` because we've just set to
206+
// `waited_for_initial_prev_token`, so this is not an infinite loop.
207+
//
208+
// Note 2: not a recursive call, because recursive and async have a bad time
209+
// together.
210+
continue;
211+
}
185212

186-
LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
187-
return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }))
188-
}
213+
LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
214+
// We have a gap, so resolve it with a network back-pagination.
215+
drop(state_guard);
216+
return self.paginate_backwards_with_network(batch_size, prev_token).await;
217+
}
189218

190-
LoadMoreEventsBackwardsOutcome::Events {
191-
events,
192-
timeline_event_diffs,
193-
reached_start,
194-
} => {
195-
if !timeline_event_diffs.is_empty() {
196-
let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
197-
diffs: timeline_event_diffs,
198-
origin: EventsOrigin::Pagination,
199-
});
219+
LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
220+
return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
200221
}
201222

202-
return Ok(Some(BackPaginationOutcome {
223+
LoadMoreEventsBackwardsOutcome::Events {
224+
events,
225+
timeline_event_diffs,
203226
reached_start,
204-
// This is a backwards pagination. `BackPaginationOutcome` expects events to
205-
// be in “reverse order”.
206-
events: events.into_iter().rev().collect(),
207-
}));
227+
} => {
228+
if !timeline_event_diffs.is_empty() {
229+
let _ =
230+
self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
231+
diffs: timeline_event_diffs,
232+
origin: EventsOrigin::Pagination,
233+
});
234+
}
235+
236+
return Ok(Some(BackPaginationOutcome {
237+
reached_start,
238+
// This is a backwards pagination. `BackPaginationOutcome` expects events to
239+
// be in “reverse order”.
240+
events: events.into_iter().rev().collect(),
241+
}));
242+
}
208243
}
209244
}
210-
211-
// Alright, try network.
212-
self.paginate_backwards_with_network(batch_size).await
213245
}
214246

215247
/// Run a single pagination request (/messages) to the server.
@@ -221,20 +253,8 @@ impl RoomPagination {
221253
async fn paginate_backwards_with_network(
222254
&self,
223255
batch_size: u16,
256+
prev_token: Option<String>,
224257
) -> Result<Option<BackPaginationOutcome>> {
225-
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
226-
227-
let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
228-
229-
let prev_token = match prev_token {
230-
PaginationToken::HasMore(token) => Some(token),
231-
PaginationToken::None => None,
232-
PaginationToken::HitEnd => {
233-
debug!("Not back-paginating since we've reached the start of the timeline.");
234-
return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
235-
}
236-
};
237-
238258
let (events, new_gap) = {
239259
let Some(room) = self.inner.weak_room.get() else {
240260
// The client is shutting down, return an empty default response.
@@ -264,12 +284,12 @@ impl RoomPagination {
264284

265285
// Check that the previous token still exists; otherwise it's a sign that the
266286
// room's timeline has been cleared.
267-
let prev_gap_id = if let Some(token) = prev_token {
268-
let gap_id = state.events().chunk_identifier(|chunk| {
287+
let prev_gap_chunk_id = if let Some(token) = prev_token {
288+
let gap_chunk_id = state.events().chunk_identifier(|chunk| {
269289
matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
270290
});
271291

272-
if gap_id.is_none() {
292+
if gap_chunk_id.is_none() {
273293
// We got a previous-batch token from the linked chunk *before* running the
274294
// request, but it is missing *after* completing the
275295
// request.
@@ -279,12 +299,14 @@ impl RoomPagination {
279299
return Ok(None);
280300
}
281301

282-
gap_id
302+
gap_chunk_id
283303
} else {
284304
None
285305
};
286306

287-
self.handle_network_pagination_result(state, events, new_gap, prev_gap_id).await.map(Some)
307+
self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
308+
.await
309+
.map(Some)
288310
}
289311

290312
/// Handle the result of a successful network back-pagination.

crates/matrix-sdk/src/event_cache/room/events.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use as_variant::as_variant;
1516
use eyeball_im::VectorDiff;
1617
pub use matrix_sdk_base::event_cache::{Event, Gap};
1718
use matrix_sdk_base::{
@@ -345,6 +346,15 @@ impl RoomEvents {
345346

346347
result
347348
}
349+
350+
/// Return the latest gap, if any.
351+
///
352+
/// Latest means "closest to the end", or, since events are ordered
353+
/// according to the sync ordering, this means "the most recent one".
354+
pub fn rgap(&self) -> Option<Gap> {
355+
self.rchunks()
356+
.find_map(|chunk| as_variant!(chunk.content(), ChunkContent::Gap(gap) => gap.clone()))
357+
}
348358
}
349359

350360
// Private implementations, implementation specific.

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -631,11 +631,15 @@ impl RoomEventCacheInner {
631631
}
632632

633633
/// Internal type to represent the output of
634-
/// `RoomEventCacheState::load_more_events_backwards`.
634+
/// [`RoomEventCacheState::load_more_events_backwards`].
635635
#[derive(Debug)]
636636
pub(super) enum LoadMoreEventsBackwardsOutcome {
637637
/// A gap has been inserted.
638-
Gap,
638+
Gap {
639+
/// The previous batch token to be used as the "end" parameter in the
640+
/// back-pagination request.
641+
prev_token: Option<String>,
642+
},
639643

640644
/// The start of the timeline has been reached.
641645
StartOfTimeline,
@@ -646,6 +650,9 @@ pub(super) enum LoadMoreEventsBackwardsOutcome {
646650
timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
647651
reached_start: bool,
648652
},
653+
654+
/// The caller must wait for the initial previous-batch token, and retry.
655+
WaitForInitialPrevToken,
649656
}
650657

651658
// Use a private module to hide `events` to this parent module.
@@ -811,79 +818,96 @@ mod private {
811818
Ok((deduplication_outcome, all_duplicates))
812819
}
813820

821+
/// Given a fully-loaded linked chunk with no gaps, return the
822+
/// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
823+
fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
824+
// If we never received events for this room, this means we've never
825+
// received a sync for that room, because every room must have at least a
826+
// room creation event. Otherwise, we have reached the start of the
827+
// timeline.
828+
if self.events.events().next().is_some() {
829+
// If there's at least one event, this means we've reached the start of the
830+
// timeline, since the chunk is fully loaded.
831+
LoadMoreEventsBackwardsOutcome::StartOfTimeline
832+
} else if !self.waited_for_initial_prev_token {
833+
// There's no events. Since we haven't yet, wait for an initial previous-token.
834+
LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
835+
} else {
836+
// Otherwise, we've already waited, *and* received no previous-batch token from
837+
// the sync, *and* there are still no events in the fully-loaded
838+
// chunk: start back-pagination from the end of the room.
839+
LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
840+
}
841+
}
842+
814843
/// Load more events backwards if the last chunk is **not** a gap.
815-
#[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
816844
pub(in super::super) async fn load_more_events_backwards(
817845
&mut self,
818846
) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
819847
let Some(store) = self.store.get() else {
820-
// No store: no events to insert. Pretend the caller has to act as if a gap was
821-
// present.
822-
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
848+
// No store to reload events from. Pretend the caller has to act as if a gap was
849+
// present. Limited syncs will always clear and push a gap, in this mode.
850+
// There's no lazy-loading.
851+
852+
// Look for a gap in the in-memory chunk, iterating in reverse so as to get the
853+
// most recent one.
854+
if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
855+
return Ok(LoadMoreEventsBackwardsOutcome::Gap {
856+
prev_token: Some(prev_token),
857+
});
858+
}
859+
860+
return Ok(self.conclude_load_more_for_fully_loaded_chunk());
823861
};
824862

825863
// If any in-memory chunk is a gap, don't load more events, and let the caller
826864
// resolve the gap.
827-
if self.events.chunks().any(|chunk| chunk.is_gap()) {
828-
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
865+
if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
866+
return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
829867
}
830868

831869
// Because `first_chunk` is `not `Send`, get this information before the
832870
// `.await` point, so that this `Future` can implement `Send`.
833871
let first_chunk_identifier =
834872
self.events.chunks().next().expect("a linked chunk is never empty").identifier();
835873

836-
let room_id = &self.room;
837874
let store = store.lock().await?;
838875

839876
// The first chunk is not a gap, we can load its previous chunk.
840877
let new_first_chunk =
841-
match store.load_previous_chunk(room_id, first_chunk_identifier).await {
878+
match store.load_previous_chunk(&self.room, first_chunk_identifier).await {
842879
Ok(Some(new_first_chunk)) => {
843880
// All good, let's continue with this chunk.
844881
new_first_chunk
845882
}
846883

847884
Ok(None) => {
848-
// No previous chunk: no events to insert. This means one of two things:
849-
// - either the linked chunk is at the start of the timeline,
850-
// - or we haven't received any back-pagination token yet, and we should
851-
// wait for one.
852-
if self.waited_for_initial_prev_token {
853-
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
854-
}
855-
// If we haven't waited yet, we request to resolve the gap, once we get the
856-
// previous-batch token from sync.
857-
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
885+
// There's no previous chunk. The chunk is now fully-loaded. Conclude.
886+
return Ok(self.conclude_load_more_for_fully_loaded_chunk());
858887
}
859888

860889
Err(err) => {
861890
error!("error when loading the previous chunk of a linked chunk: {err}");
862891

863892
// Clear storage for this room.
864-
store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
893+
store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
865894

866895
// Return the error.
867896
return Err(err.into());
868897
}
869898
};
870899

871-
let events = match &new_first_chunk.content {
872-
ChunkContent::Gap(_) => None,
873-
ChunkContent::Items(events) => {
874-
// We've reached the start on disk, if and only if, there was no chunk prior to
875-
// the one we just loaded.
876-
let reached_start = new_first_chunk.previous.is_none();
900+
let chunk_content = new_first_chunk.content.clone();
877901

878-
Some((events.clone(), reached_start))
879-
}
880-
};
902+
// We've reached the start on disk, if and only if, there was no chunk prior to
903+
// the one we just loaded.
904+
let reached_start = new_first_chunk.previous.is_none();
881905

882906
if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
883907
error!("error when inserting the previous chunk into its linked chunk: {err}");
884908

885909
// Clear storage for this room.
886-
store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
910+
store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
887911

888912
// Return the error.
889913
return Err(err.into());
@@ -896,9 +920,12 @@ mod private {
896920
// However, we want to get updates as `VectorDiff`s.
897921
let timeline_event_diffs = self.events.updates_as_vector_diffs();
898922

899-
Ok(match events {
900-
None => LoadMoreEventsBackwardsOutcome::Gap,
901-
Some((events, reached_start)) => LoadMoreEventsBackwardsOutcome::Events {
923+
Ok(match chunk_content {
924+
ChunkContent::Gap(gap) => {
925+
LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
926+
}
927+
928+
ChunkContent::Items(events) => LoadMoreEventsBackwardsOutcome::Events {
902929
events,
903930
timeline_event_diffs,
904931
reached_start,
@@ -2027,7 +2054,7 @@ mod tests {
20272054
// But if I manually reload more of the chunk, the gap will be present.
20282055
assert_matches!(
20292056
state.load_more_events_backwards().await.unwrap(),
2030-
LoadMoreEventsBackwardsOutcome::Gap
2057+
LoadMoreEventsBackwardsOutcome::Gap { .. }
20312058
);
20322059

20332060
num_gaps = 0;

crates/matrix-sdk/tests/integration/event_cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,8 +738,8 @@ async fn test_backpaginating_without_token() {
738738
assert!(reached_start);
739739

740740
// And we get notified about the new event.
741-
assert_event_matches_msg(&events[0], "hi");
742741
assert_eq!(events.len(), 1);
742+
assert_event_matches_msg(&events[0], "hi");
743743

744744
assert_let_timeout!(
745745
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()

0 commit comments

Comments
 (0)