Skip to content

Commit a5537a8

Browse files
authored
fix(event cache): don't ditch a previous-batch token when we didn't have initial events (#4936)
See #4891 that shows a case where we should have saved the previous-batch token, and instead ditched it, in the previous version. Changes include: - moving the code deciding to keep or ditch the `previous-batch` token into `append_events_locked`. - tweak the condition to ditch, so that the `previous-batch` token is ditched only if we didn't have events in the event cache in the first place, in addition to having storage + the timeline not being marked as limited explicitly. Credits to @zecakeh for the test case.
1 parent a27d6e2 commit a5537a8

File tree

2 files changed

+139
-23
lines changed

2 files changed

+139
-23
lines changed

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -380,21 +380,11 @@ impl RoomEventCacheInner {
380380
// Add all the events to the backend.
381381
trace!("adding new events");
382382

383-
// If we have storage, only keep the previous-batch token if we have a limited
384-
// timeline. Otherwise, we know about all the events, and we don't need to
385-
// back-paginate, so we wouldn't make use of the given previous-batch token.
386-
//
387-
// If we don't have storage, even if the timeline isn't limited, we may not have
388-
// saved the previous events in any cache, so we should always be
389-
// able to retrieve those.
390-
let prev_batch =
391-
if has_storage && !timeline.limited { None } else { timeline.prev_batch };
392-
393383
let mut state = self.state.write().await;
394384
self.append_events_locked(
385+
has_storage,
395386
&mut state,
396-
timeline.events,
397-
prev_batch,
387+
timeline,
398388
ephemeral_events,
399389
ambiguity_changes,
400390
)
@@ -438,10 +428,17 @@ impl RoomEventCacheInner {
438428
});
439429

440430
// Push the new events.
431+
432+
// This method is only used when we don't have storage, and
433+
// it's conservative to consider that this new timeline is "limited",
434+
// since we don't know if we have a gap or not.
435+
let has_storage = false;
436+
let limited = true;
437+
441438
self.append_events_locked(
439+
has_storage,
442440
&mut state,
443-
timeline_events,
444-
prev_batch.clone(),
441+
Timeline { limited, prev_batch, events: timeline_events },
445442
ephemeral_events,
446443
ambiguity_changes,
447444
)
@@ -456,28 +453,37 @@ impl RoomEventCacheInner {
456453
/// This is a private implementation. It must not be exposed publicly.
457454
async fn append_events_locked(
458455
&self,
456+
has_storage: bool,
459457
state: &mut RoomEventCacheState,
460-
timeline_events: Vec<TimelineEvent>,
461-
prev_batch: Option<String>,
458+
timeline: Timeline,
462459
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
463460
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
464461
) -> Result<()> {
465-
if timeline_events.is_empty()
462+
let mut prev_batch = timeline.prev_batch;
463+
if timeline.events.is_empty()
466464
&& prev_batch.is_none()
467465
&& ephemeral_events.is_empty()
468466
&& ambiguity_changes.is_empty()
469467
{
470468
return Ok(());
471469
}
472470

471+
// Ditch the previous-batch token if we have storage, the sync isn't limited and
472+
// we've seen at least one event in the past. In this case (and only this one),
473+
// we should definitely know what the head of the timeline is (either we
474+
// know about all the events, or we have a gap somewhere).
475+
if has_storage && !timeline.limited && state.events().events().next().is_some() {
476+
prev_batch = None;
477+
}
478+
473479
let (
474480
DeduplicationOutcome {
475481
all_events: events,
476482
in_memory_duplicated_event_ids,
477483
in_store_duplicated_event_ids,
478484
},
479485
all_duplicates,
480-
) = state.collect_valid_and_duplicated_events(timeline_events).await?;
486+
) = state.collect_valid_and_duplicated_events(timeline.events).await?;
481487

482488
// During a sync, when a duplicated event is found, the old event is removed and
483489
// the new event is added.
@@ -532,11 +538,11 @@ impl RoomEventCacheInner {
532538

533539
timeline_event_diffs.extend(new_timeline_event_diffs);
534540

535-
if prev_batch.is_some() && !all_duplicates {
536-
// If there was a previous batch token, and there's at least one non-duplicated
537-
// new event, unload the chunks so it only contains the last
538-
// one; otherwise, there might be a valid gap in between, and
539-
// observers may not render it (yet).
541+
if timeline.limited && prev_batch.is_some() && !all_duplicates {
542+
// If there was a previous batch token for a limited timeline, and there's at
543+
// least one non-duplicated new event, unload the chunks so it
544+
// only contains the last one; otherwise, there might be a valid
545+
// gap in between, and observers may not render it (yet).
540546
//
541547
// We must do this *after* the above call to `.with_events_mut`, so the new
542548
// events and gaps are properly persisted to storage.

crates/matrix-sdk/tests/integration/event_cache.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2627,3 +2627,113 @@ async fn test_clear_all_rooms() {
26272627
event_cache_store.load_last_chunk(sleeping_room_id).await.unwrap();
26282628
assert!(maybe_last_chunk.is_none());
26292629
}
2630+
2631+
#[async_test]
2632+
async fn test_sync_while_back_paginate() {
2633+
let server = MatrixMockServer::new().await;
2634+
2635+
let room_id = room_id!("!galette:saucisse.bzh");
2636+
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2637+
2638+
// Previous batch of events which will be received via /messages, in reverse
2639+
// chronological order.
2640+
let prev_events = vec![
2641+
f.text_msg("messages3").event_id(event_id!("$messages3")).into_raw_timeline(),
2642+
f.text_msg("messages2").event_id(event_id!("$messages2")).into_raw_timeline(),
2643+
f.text_msg("messages1").event_id(event_id!("$messages1")).into_raw_timeline(),
2644+
];
2645+
2646+
// Batch of events which will be received via /sync, in chronological
2647+
// order.
2648+
let sync_events = [
2649+
f.text_msg("sync1").event_id(event_id!("$sync1")).into_raw_timeline(),
2650+
f.text_msg("sync2").event_id(event_id!("$sync2")).into_raw_timeline(),
2651+
f.text_msg("sync3").event_id(event_id!("$sync3")).into_raw_timeline(),
2652+
];
2653+
2654+
let state_memory_store = matrix_sdk_base::store::MemoryStore::new();
2655+
let store_config = StoreConfig::new("le_store".to_owned())
2656+
.event_cache_store(Arc::new(MemoryStore::new()))
2657+
.state_store(state_memory_store);
2658+
2659+
{
2660+
// First, initialize the sync so the client is aware of the room, in the state
2661+
// store.
2662+
let client = server.client_builder().store_config(store_config.clone()).build().await;
2663+
server.sync_joined_room(&client, room_id).await;
2664+
}
2665+
2666+
// Then, use a new client that will restore the state from the state store, and
2667+
// with an empty event cache store.
2668+
let client = server.client_builder().store_config(store_config).build().await;
2669+
let room = client.get_room(room_id).unwrap();
2670+
2671+
client.event_cache().subscribe().unwrap();
2672+
client.event_cache().enable_storage().unwrap();
2673+
2674+
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2675+
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
2676+
assert!(initial_events.is_empty());
2677+
2678+
// Mock /messages in case we use the prev_batch token from sync.
2679+
server
2680+
.mock_room_messages()
2681+
.match_from("token-before-sync-from-sync")
2682+
.ok(RoomMessagesResponseTemplate::default()
2683+
.end_token("token-before-messages")
2684+
.events(prev_events))
2685+
.named("messages")
2686+
.mount()
2687+
.await;
2688+
// Mock /messages in case we use no token.
2689+
server
2690+
.mock_room_messages()
2691+
.ok(RoomMessagesResponseTemplate::default()
2692+
.end_token("token-before-sync-from-messages")
2693+
.events(sync_events.clone().into_iter().rev().collect()))
2694+
.named("messages")
2695+
.mount()
2696+
.await;
2697+
2698+
// Spawn back pagination.
2699+
let pagination = room_event_cache.pagination();
2700+
let back_pagination_handle =
2701+
spawn(async move { pagination.run_backwards_once(3).await.unwrap() });
2702+
2703+
// Receive a non-limited sync while back pagination is happening.
2704+
server
2705+
.sync_room(
2706+
&client,
2707+
JoinedRoomBuilder::new(room_id)
2708+
.set_timeline_prev_batch("token-before-sync-from-sync")
2709+
.add_timeline_bulk(sync_events.into_iter().map(ruma::serde::Raw::cast)),
2710+
)
2711+
.await;
2712+
2713+
assert_let_timeout!(
2714+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
2715+
);
2716+
assert_eq!(diffs.len(), 1);
2717+
assert_let!(VectorDiff::Append { values } = &diffs[0]);
2718+
2719+
assert_eq!(values.len(), 3);
2720+
assert_event_matches_msg(&values[0], "sync1");
2721+
assert_event_matches_msg(&values[1], "sync2");
2722+
assert_event_matches_msg(&values[2], "sync3");
2723+
2724+
// Back pagination should succeed, and we don't have reached the start.
2725+
let outcome = back_pagination_handle.await.unwrap();
2726+
assert!(outcome.reached_start.not());
2727+
assert_eq!(outcome.events.len(), 3);
2728+
2729+
// And the back-paginated events come down from the subscriber too.
2730+
assert_let_timeout!(
2731+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
2732+
);
2733+
assert_eq!(diffs.len(), 3);
2734+
assert_let!(VectorDiff::Insert { index: 0, value: _ } = &diffs[0]);
2735+
assert_let!(VectorDiff::Insert { index: 1, value: _ } = &diffs[1]);
2736+
assert_let!(VectorDiff::Insert { index: 2, value: _ } = &diffs[2]);
2737+
2738+
assert!(subscriber.is_empty());
2739+
}

0 commit comments

Comments
 (0)