Skip to content

Commit bb0f598

Browse files
committed
feat(sdk): Add RoomPagination::run_backwards(…, until).
This patch adds a new argument to `RoomPagination::run_backwards`: `until`. It becomes: pub async fn run_backwards<F, B, Fut>(&self, batch_size: u16, mut until: F ) -> Result<B> where F: FnMut(BackPaginationOutcome) -> Fut, Fut: Future<Output = ControlFlow<B, ()>>, The idea behind `until` is to run pagination _until_ `until` returns `ControlFlow::Break`, otherwise it continues paginating. This is useful is many scenearii (cf. the documentation). This is also and primarily the first step to stop adding events directly from the pagination, and starts adding events only and strictly only from `event_cache::RoomEventCacheUpdate` (again, see the `TODO` in the documentation). This is not done in this patch for the sake of ease of review.
1 parent 1dc3709 commit bb0f598

File tree

3 files changed

+95
-50
lines changed

3 files changed

+95
-50
lines changed

crates/matrix-sdk-ui/src/timeline/pagination.rs

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::ops::ControlFlow;
16+
1517
use async_rx::StreamExt as _;
1618
use async_stream::stream;
1719
use futures_core::Stream;
@@ -67,49 +69,45 @@ impl super::Timeline {
6769
pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
6870
let pagination = self.event_cache.pagination();
6971

70-
loop {
71-
let result = pagination.run_backwards(batch_size).await;
72-
73-
let event_cache_outcome = match result {
74-
Ok(outcome) => outcome,
75-
76-
Err(EventCacheError::BackpaginationError(
77-
PaginatorError::InvalidPreviousState {
78-
actual: PaginatorState::Paginating, ..
79-
},
80-
)) => {
81-
warn!("Another pagination request is already happening, returning early");
82-
return Ok(false);
83-
}
84-
85-
Err(err) => return Err(err),
86-
};
87-
88-
let BackPaginationOutcome { events, reached_start } = event_cache_outcome;
89-
90-
let num_events = events.len();
91-
trace!("Back-pagination succeeded with {num_events} events");
92-
93-
self.inner
94-
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
95-
.await;
96-
97-
if reached_start {
98-
return Ok(true);
72+
let reached_start = pagination
73+
.run_backwards(
74+
batch_size,
75+
|BackPaginationOutcome { events, reached_start }| async move {
76+
let num_events = events.len();
77+
trace!("Back-pagination succeeded with {num_events} events");
78+
79+
// TODO(hywan): Remove, and let spread events via
80+
// `matrix_sdk::event_cache::RoomEventCacheUpdate` from
81+
// `matrix_sdk::event_cache::RoomPagination::run_backwards`.
82+
self.inner
83+
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
84+
.await;
85+
86+
if num_events == 0 && !reached_start {
87+
// As an exceptional contract: if there were no events in the response,
88+
// and we've not hit the start of the timeline, retry until we get
89+
// some events or reach the start of the timeline.
90+
return ControlFlow::Continue(());
91+
}
92+
93+
ControlFlow::Break(reached_start)
94+
},
95+
)
96+
.await;
97+
98+
match reached_start {
99+
Ok(reached_start) => Ok(reached_start),
100+
101+
Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState {
102+
actual: PaginatorState::Paginating,
103+
..
104+
})) => {
105+
warn!("Another pagination request is already happening, returning early");
106+
Ok(false)
99107
}
100108

101-
if num_events == 0 {
102-
// As an exceptional contract: if there were no events in the response,
103-
// and we've not hit the start of the timeline, retry until we get
104-
// some events or reach the start of the timeline.
105-
continue;
106-
}
107-
108-
// Exit the inner loop, and ask for another limit.
109-
break;
109+
Err(err) => Err(err),
110110
}
111-
112-
Ok(false)
113111
}
114112

115113
/// Subscribe to the back-pagination status of a live timeline.

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
//! A sub-object for running pagination tasks on a given room.
1616
17-
use std::{sync::Arc, time::Duration};
17+
use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
1818

1919
use eyeball::Subscriber;
2020
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
@@ -59,17 +59,60 @@ impl RoomPagination {
5959
/// This automatically takes care of waiting for a pagination token from
6060
/// sync, if we haven't done that before.
6161
///
62+
/// The `until` argument is an async closure that returns a [`ControlFlow`]
63+
/// to decide whether a new pagination must be run or not. It's helpful when
64+
/// the server replies with e.g. a certain set of events, but we would like
65+
/// more, or the event we are looking for isn't part of this set: in this
66+
/// case, `until` returns [`Control::Continue`], otherwise it returns
67+
/// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as
68+
/// its sole argument.
69+
///
6270
/// # Errors
6371
///
6472
/// It may return an error if the pagination token used during
6573
/// back-pagination has disappeared while we started the pagination. In
6674
/// that case, it's desirable to call the method again.
67-
#[instrument(skip(self))]
68-
pub async fn run_backwards(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
75+
///
76+
/// # Example
77+
///
78+
/// To do a single run:
79+
///
80+
/// ```rust
81+
/// use std::ops::ControlFlow;
82+
///
83+
/// use matrix_sdk::event_cache::{BackPaginationOutcome, RoomPagination};
84+
///
85+
/// # async fn foo(room_pagination: RoomPagination) {
86+
/// let result = room_pagination.run_backwards(
87+
/// 42,
88+
/// |BackPaginationOutcome { events, reached_start }| async move {
89+
/// // Do something with `events` and `reached_start` maybe?
90+
/// let _ = events;
91+
/// let _ = reached_start;
92+
///
93+
/// ControlFlow::Break(())
94+
/// }
95+
/// ).await;
96+
/// # }
97+
#[instrument(skip(self, until))]
98+
pub async fn run_backwards<F, B, Fut>(&self, batch_size: u16, mut until: F) -> Result<B>
99+
where
100+
F: FnMut(BackPaginationOutcome) -> Fut,
101+
Fut: Future<Output = ControlFlow<B, ()>>,
102+
{
69103
loop {
70-
if let Some(result) = self.run_backwards_impl(batch_size).await? {
71-
return Ok(result);
104+
if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
105+
match until(outcome).await {
106+
ControlFlow::Continue(()) => {
107+
debug!("back-pagination continues");
108+
109+
continue;
110+
}
111+
112+
ControlFlow::Break(value) => return Ok(value),
113+
}
72114
}
115+
73116
debug!("back-pagination has been internally restarted because of a timeline reset.");
74117
}
75118
}

crates/matrix-sdk/tests/integration/event_cache.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::{ops::ControlFlow, time::Duration};
22

33
use assert_matches2::{assert_let, assert_matches};
44
use matrix_sdk::{
@@ -24,6 +24,10 @@ use wiremock::{
2424

2525
use crate::mock_sync;
2626

27+
pub async fn once(outcome: BackPaginationOutcome) -> ControlFlow<BackPaginationOutcome, ()> {
28+
ControlFlow::Break(outcome)
29+
}
30+
2731
#[async_test]
2832
async fn test_must_explicitly_subscribe() {
2933
let (client, server) = logged_in_client_with_server().await;
@@ -362,7 +366,7 @@ async fn test_backpaginate_once() {
362366

363367
assert!(pagination.get_or_wait_for_token().await.is_some());
364368

365-
pagination.run_backwards(20).await.unwrap()
369+
pagination.run_backwards(20, once).await.unwrap()
366370
};
367371

368372
// I'll get all the previous events, in "reverse" order (same as the response).
@@ -450,7 +454,7 @@ async fn test_backpaginate_multiple_iterations() {
450454
let pagination = room_event_cache.pagination();
451455
while pagination.get_or_wait_for_token().await.is_some() {
452456
let BackPaginationOutcome { reached_start, events } =
453-
pagination.run_backwards(20).await.unwrap();
457+
pagination.run_backwards(20, once).await.unwrap();
454458

455459
if !global_reached_start {
456460
global_reached_start = reached_start;
@@ -586,7 +590,7 @@ async fn test_reset_while_backpaginating() {
586590

587591
let backpagination = spawn({
588592
let pagination = room_event_cache.pagination();
589-
async move { pagination.run_backwards(20).await }
593+
async move { pagination.run_backwards(20, once).await }
590594
});
591595

592596
// Receive the sync response (which clears the timeline).
@@ -656,7 +660,7 @@ async fn test_backpaginating_without_token() {
656660
// If we try to back-paginate with a token, it will hit the end of the timeline
657661
// and give us the resulting event.
658662
let BackPaginationOutcome { events, reached_start } =
659-
pagination.run_backwards(20).await.unwrap();
663+
pagination.run_backwards(20, once).await.unwrap();
660664

661665
assert!(reached_start);
662666

0 commit comments

Comments
 (0)