Skip to content

Commit 28cd3c4

Browse files
committed
event cache: reuse the paginator internally
Fixes #3355.
1 parent a3f6e0f commit 28cd3c4

File tree

9 files changed

+304
-293
lines changed

9 files changed

+304
-293
lines changed

bindings/matrix-sdk-ffi/src/timeline/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ use anyhow::{Context, Result};
1818
use as_variant::as_variant;
1919
use eyeball_im::VectorDiff;
2020
use futures_util::{pin_mut, StreamExt};
21-
use matrix_sdk::attachment::{
22-
AttachmentConfig, AttachmentInfo, BaseAudioInfo, BaseFileInfo, BaseImageInfo,
23-
BaseThumbnailInfo, BaseVideoInfo, Thumbnail,
21+
use matrix_sdk::{
22+
attachment::{
23+
AttachmentConfig, AttachmentInfo, BaseAudioInfo, BaseFileInfo, BaseImageInfo,
24+
BaseThumbnailInfo, BaseVideoInfo, Thumbnail,
25+
},
26+
event_cache::paginator::PaginatorState,
2427
};
25-
use matrix_sdk_ui::timeline::{EventItemOrigin, PaginationStatus, Profile, TimelineDetails};
28+
use matrix_sdk_ui::timeline::{EventItemOrigin, Profile, TimelineDetails};
2629
use mime::Mime;
2730
use ruma::{
2831
events::{
@@ -160,7 +163,7 @@ impl Timeline {
160163
self.inner.fetch_members().await
161164
}
162165

163-
pub fn subscribe_to_back_pagination_status(
166+
pub async fn subscribe_to_back_pagination_status(
164167
&self,
165168
listener: Box<dyn PaginationStatusListener>,
166169
) -> Result<Arc<TaskHandle>, ClientError> {
@@ -588,7 +591,7 @@ pub trait TimelineListener: Sync + Send {
588591

589592
#[uniffi::export(callback_interface)]
590593
pub trait PaginationStatusListener: Sync + Send {
591-
fn on_update(&self, status: PaginationStatus);
594+
fn on_update(&self, status: PaginatorState);
592595
}
593596

594597
#[derive(Clone, uniffi::Object)]

crates/matrix-sdk-ui/src/timeline/builder.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::{collections::BTreeSet, sync::Arc};
1616

17-
use eyeball::SharedObservable;
1817
use futures_util::{pin_mut, StreamExt};
1918
use matrix_sdk::{event_cache::RoomEventCacheUpdate, executor::spawn, Room};
2019
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
@@ -28,10 +27,7 @@ use super::{
2827
queue::send_queued_messages,
2928
Error, Timeline, TimelineDropHandle, TimelineFocus,
3029
};
31-
use crate::{
32-
timeline::{event_item::RemoteEventOrigin, PaginationStatus},
33-
unable_to_decrypt_hook::UtdHookManager,
34-
};
30+
use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
3531

3632
/// Builder that allows creating and configuring various parts of a
3733
/// [`Timeline`].
@@ -314,7 +310,6 @@ impl TimelineBuilder {
314310

315311
let timeline = Timeline {
316312
inner,
317-
back_pagination_status: SharedObservable::new(PaginationStatus::Idle),
318313
msg_sender,
319314
event_cache: room_event_cache,
320315
drop_handle: Arc::new(TimelineDropHandle {

crates/matrix-sdk-ui/src/timeline/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
1919
use std::{pin::Pin, sync::Arc, task::Poll};
2020

21-
use eyeball::SharedObservable;
2221
use eyeball_im::VectorDiff;
2322
use futures_core::Stream;
2423
use imbl::Vector;
@@ -96,7 +95,7 @@ pub use self::{
9695
event_type_filter::TimelineEventTypeFilter,
9796
inner::default_event_filter,
9897
item::{TimelineItem, TimelineItemKind},
99-
pagination::{PaginationOptions, PaginationOutcome, PaginationStatus},
98+
pagination::{PaginationOptions, PaginationOutcome},
10099
polls::PollResult,
101100
reactions::ReactionSenderData,
102101
sliding_sync_ext::SlidingSyncRoomExt,
@@ -130,9 +129,6 @@ pub struct Timeline {
130129

131130
/// References to long-running tasks held by the timeline.
132131
drop_handle: Arc<TimelineDropHandle>,
133-
134-
/// Observable for whether a backward pagination is currently running.
135-
pub(crate) back_pagination_status: SharedObservable<PaginationStatus>,
136132
}
137133

138134
// Implements hash etc

crates/matrix-sdk-ui/src/timeline/pagination.rs

Lines changed: 31 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -12,38 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{fmt, ops::ControlFlow, sync::Arc, time::Duration};
16-
17-
use eyeball::{SharedObservable, Subscriber};
18-
use matrix_sdk::event_cache::{self, BackPaginationOutcome};
15+
use std::{fmt, ops::ControlFlow, sync::Arc};
16+
17+
use eyeball::Subscriber;
18+
use matrix_sdk::event_cache::{
19+
self,
20+
paginator::{PaginatorError, PaginatorState},
21+
BackPaginationOutcome, EventCacheError,
22+
};
1923
use tracing::{instrument, trace, warn};
2024

2125
use super::Error;
2226
use crate::timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd};
2327

24-
struct ResetStatusGuard {
25-
status: SharedObservable<PaginationStatus>,
26-
target: Option<PaginationStatus>,
27-
}
28-
29-
impl ResetStatusGuard {
30-
fn new(status: SharedObservable<PaginationStatus>, target: PaginationStatus) -> Self {
31-
Self { status, target: Some(target) }
32-
}
33-
34-
fn disarm(mut self) {
35-
self.target = None;
36-
}
37-
}
38-
39-
impl Drop for ResetStatusGuard {
40-
fn drop(&mut self) {
41-
if let Some(target) = self.target.take() {
42-
self.status.set_if_not_eq(target);
43-
}
44-
}
45-
}
46-
4728
impl super::Timeline {
4829
/// Add more events to the start of the timeline.
4930
///
@@ -88,34 +69,27 @@ impl super::Timeline {
8869
&self,
8970
mut options: PaginationOptions<'_>,
9071
) -> event_cache::Result<bool> {
91-
let back_pagination_status = &self.back_pagination_status;
92-
93-
if back_pagination_status.get() == PaginationStatus::TimelineEndReached {
94-
warn!("Start of timeline reached, ignoring backwards-pagination request");
95-
return Ok(true);
96-
}
97-
98-
if back_pagination_status.set_if_not_eq(PaginationStatus::Paginating).is_none() {
99-
warn!("Another back-pagination is already running in the background");
100-
return Ok(false);
101-
}
102-
103-
let reset_status_guard =
104-
ResetStatusGuard::new(back_pagination_status.clone(), PaginationStatus::Idle);
105-
106-
// The first time, we allow to wait a bit for *a* back-pagination token to come
107-
// over via sync.
108-
const WAIT_FOR_TOKEN_TIMEOUT: Duration = Duration::from_secs(3);
109-
110-
let mut token =
111-
self.event_cache.oldest_backpagination_token(Some(WAIT_FOR_TOKEN_TIMEOUT)).await?;
112-
11372
let initial_options = options.clone();
11473
let mut outcome = PaginationOutcome::default();
11574

11675
while let Some(batch_size) = options.next_event_limit(outcome) {
11776
loop {
118-
match self.event_cache.backpaginate(batch_size, token).await? {
77+
let result = self.event_cache.paginate_backwards(batch_size).await;
78+
79+
let event_cache_outcome = match result {
80+
Ok(outcome) => outcome,
81+
82+
Err(EventCacheError::BackpaginationError(
83+
PaginatorError::InvalidPreviousState { actual, .. },
84+
)) if actual == PaginatorState::Paginating => {
85+
warn!("Another pagination request is already happening, returning early");
86+
return Ok(false);
87+
}
88+
89+
Err(err) => return Err(err),
90+
};
91+
92+
match event_cache_outcome {
11993
BackPaginationOutcome::Success { events, reached_start } => {
12094
let num_events = events.len();
12195
trace!("Back-pagination succeeded with {num_events} events");
@@ -130,11 +104,6 @@ impl super::Timeline {
130104
.await;
131105

132106
if reached_start {
133-
// Don't reset the status to `Idle`…
134-
reset_status_guard.disarm();
135-
// …and set it to `TimelineEndReached` instead.
136-
back_pagination_status
137-
.set_if_not_eq(PaginationStatus::TimelineEndReached);
138107
return Ok(true);
139108
}
140109

@@ -148,8 +117,8 @@ impl super::Timeline {
148117

149118
if num_events == 0 {
150119
// As an exceptional contract: if there were no events in the response,
151-
// see if we had another back-pagination token, and retry the request.
152-
token = self.event_cache.oldest_backpagination_token(None).await?;
120+
// and we've not hit the start of the timeline, so retry until we get
121+
// some events.
153122
continue;
154123
}
155124
}
@@ -163,22 +132,20 @@ impl super::Timeline {
163132
}
164133
}
165134

166-
// Retrieve the next earliest back-pagination token.
167-
token = self.event_cache.oldest_backpagination_token(None).await?;
168-
169135
// Exit the inner loop, and ask for another limit.
170136
break;
171137
}
172138
}
173139

174-
// The status is automatically reset to idle by `reset_status_guard`.
175-
176140
Ok(false)
177141
}
178142

179143
/// Subscribe to the back-pagination status of the timeline.
180-
pub fn back_pagination_status(&self) -> Subscriber<PaginationStatus> {
181-
self.back_pagination_status.subscribe()
144+
///
145+
/// Note: this may send multiple Paginating/Idle sequences during a single
146+
/// call to [`Self::paginate_backwards()`].
147+
pub fn back_pagination_status(&self) -> Subscriber<PaginatorState> {
148+
self.event_cache.pagination_status()
182149
}
183150
}
184151

@@ -330,19 +297,6 @@ pub struct PaginationOutcome {
330297
pub total_items_updated: u64,
331298
}
332299

333-
/// The status of a pagination.
334-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335-
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
336-
pub enum PaginationStatus {
337-
/// No pagination happening.
338-
Idle,
339-
/// Timeline is paginating for this end.
340-
Paginating,
341-
/// An end of the timeline (front or back) has been reached by this
342-
/// pagination.
343-
TimelineEndReached,
344-
}
345-
346300
#[cfg(test)]
347301
mod tests {
348302
use std::{

crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@ use std::{sync::Arc, time::Duration};
1616

1717
use assert_matches2::assert_let;
1818
use eyeball_im::VectorDiff;
19-
use futures_util::{
20-
future::{join, join3},
21-
FutureExt,
19+
use futures_util::future::{join, join3};
20+
use matrix_sdk::{
21+
config::SyncSettings, event_cache::paginator::PaginatorState,
22+
test_utils::logged_in_client_with_server,
2223
};
23-
use matrix_sdk::{config::SyncSettings, test_utils::logged_in_client_with_server};
2424
use matrix_sdk_test::{
2525
async_test, EventBuilder, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, ALICE, BOB,
2626
};
2727
use matrix_sdk_ui::timeline::{
28-
AnyOtherFullStateEventContent, PaginationOptions, PaginationStatus, RoomExt,
29-
TimelineItemContent,
28+
AnyOtherFullStateEventContent, PaginationOptions, RoomExt, TimelineItemContent,
3029
};
3130
use once_cell::sync::Lazy;
3231
use ruma::{
@@ -81,7 +80,7 @@ async fn test_back_pagination() {
8180
server.reset().await;
8281
};
8382
let observe_paginating = async {
84-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating));
83+
assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating));
8584
};
8685
join(paginate, observe_paginating).await;
8786

@@ -136,8 +135,10 @@ async fn test_back_pagination() {
136135
.mount(&server)
137136
.await;
138137

139-
timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap();
140-
assert_next_eq!(back_pagination_status, PaginationStatus::TimelineEndReached);
138+
let hit_start =
139+
timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap();
140+
assert!(hit_start);
141+
assert_next_eq!(back_pagination_status, PaginatorState::Idle);
141142
}
142143

143144
#[async_test]
@@ -274,8 +275,8 @@ async fn test_wait_for_token() {
274275
.unwrap();
275276
};
276277
let observe_paginating = async {
277-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating));
278-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Idle));
278+
assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating));
279+
assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Idle));
279280
};
280281
let sync = async {
281282
// Make sure syncing starts a little bit later than pagination
@@ -430,18 +431,41 @@ async fn test_timeline_reset_while_paginating() {
430431
timeline
431432
.live_paginate_backwards(PaginationOptions::simple_request(10).wait_for_token())
432433
.await
433-
.unwrap();
434+
.unwrap()
434435
};
436+
435437
let observe_paginating = async {
436-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating));
437-
// timeline start reached because second pagination response contains
438-
// no end field
439-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::TimelineEndReached));
438+
let mut seen_paginating = false;
439+
440+
// Observe paginating updates: we want to make sure we see at least once
441+
// Paginating, and that it settles with Idle.
442+
while let Ok(update) =
443+
timeout(Duration::from_millis(500), back_pagination_status.next()).await
444+
{
445+
match update {
446+
Some(state) => {
447+
if state == PaginatorState::Paginating {
448+
seen_paginating = true;
449+
}
450+
}
451+
None => break,
452+
}
453+
}
454+
455+
assert!(seen_paginating);
456+
assert_eq!(back_pagination_status.next_now(), PaginatorState::Idle);
440457
};
458+
441459
let sync = async {
442460
client.sync_once(sync_settings.clone()).await.unwrap();
443461
};
444-
timeout(Duration::from_secs(2), join3(paginate, observe_paginating, sync)).await.unwrap();
462+
463+
let (hit_start, _, _) =
464+
timeout(Duration::from_secs(5), join3(paginate, observe_paginating, sync)).await.unwrap();
465+
466+
// timeline start reached because second pagination response contains no end
467+
// field.
468+
assert!(hit_start);
445469

446470
// No events in back-pagination responses, day divider + event from latest
447471
// sync is present
@@ -570,7 +594,7 @@ async fn test_empty_chunk() {
570594
server.reset().await;
571595
};
572596
let observe_paginating = async {
573-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating));
597+
assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating));
574598
};
575599
join(paginate, observe_paginating).await;
576600

@@ -669,7 +693,7 @@ async fn test_until_num_items_with_empty_chunk() {
669693
server.reset().await;
670694
};
671695
let observe_paginating = async {
672-
assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating));
696+
assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating));
673697
};
674698
join(paginate, observe_paginating).await;
675699

0 commit comments

Comments
 (0)