Skip to content

Commit 72314f1

Browse files
authored
feat(sdk): Add RoomPagination::run_backwards(…, until)
feat(sdk): Add `RoomPagination::run_backwards(…, until)`
2 parents 7dd08c3 + 9a1de1d commit 72314f1

File tree

4 files changed

+276
-59
lines changed

4 files changed

+276
-59
lines changed

crates/matrix-sdk-ui/src/timeline/pagination.rs

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::ops::ControlFlow;
16+
1517
use async_rx::StreamExt as _;
1618
use async_stream::stream;
1719
use futures_core::Stream;
@@ -67,49 +69,44 @@ impl super::Timeline {
6769
pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
6870
let pagination = self.event_cache.pagination();
6971

70-
loop {
71-
let result = pagination.run_backwards(batch_size).await;
72-
73-
let event_cache_outcome = match result {
74-
Ok(outcome) => outcome,
75-
76-
Err(EventCacheError::BackpaginationError(
77-
PaginatorError::InvalidPreviousState {
78-
actual: PaginatorState::Paginating, ..
79-
},
80-
)) => {
81-
warn!("Another pagination request is already happening, returning early");
82-
return Ok(false);
83-
}
84-
85-
Err(err) => return Err(err),
86-
};
87-
88-
let BackPaginationOutcome { events, reached_start } = event_cache_outcome;
89-
90-
let num_events = events.len();
91-
trace!("Back-pagination succeeded with {num_events} events");
92-
93-
self.inner
94-
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
95-
.await;
96-
97-
if reached_start {
98-
return Ok(true);
72+
let result = pagination
73+
.run_backwards(
74+
batch_size,
75+
|BackPaginationOutcome { events, reached_start },
76+
_timeline_has_been_reset| async move {
77+
let num_events = events.len();
78+
trace!("Back-pagination succeeded with {num_events} events");
79+
80+
// TODO(hywan): Remove, and let spread events via
81+
// `matrix_sdk::event_cache::RoomEventCacheUpdate` from
82+
// `matrix_sdk::event_cache::RoomPagination::run_backwards`.
83+
self.inner
84+
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
85+
.await;
86+
87+
if num_events == 0 && !reached_start {
88+
// As an exceptional contract: if there were no events in the response,
89+
// and we've not hit the start of the timeline, retry until we get
90+
// some events or reach the start of the timeline.
91+
return ControlFlow::Continue(());
92+
}
93+
94+
ControlFlow::Break(reached_start)
95+
},
96+
)
97+
.await;
98+
99+
match result {
100+
Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState {
101+
actual: PaginatorState::Paginating,
102+
..
103+
})) => {
104+
warn!("Another pagination request is already happening, returning early");
105+
Ok(false)
99106
}
100107

101-
if num_events == 0 {
102-
// As an exceptional contract: if there were no events in the response,
103-
// and we've not hit the start of the timeline, retry until we get
104-
// some events or reach the start of the timeline.
105-
continue;
106-
}
107-
108-
// Exit the inner loop, and ask for another limit.
109-
break;
108+
result => result,
110109
}
111-
112-
Ok(false)
113110
}
114111

115112
/// Subscribe to the back-pagination status of a live timeline.

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ mod pagination;
7676
mod store;
7777

7878
pub mod paginator;
79-
pub use pagination::RoomPagination;
79+
pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating};
8080

8181
/// An error observed in the [`EventCache`].
8282
#[derive(thiserror::Error, Debug)]

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
//! A sub-object for running pagination tasks on a given room.
1616
17-
use std::{sync::Arc, time::Duration};
17+
use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
1818

1919
use eyeball::Subscriber;
2020
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
@@ -59,17 +59,75 @@ impl RoomPagination {
5959
/// This automatically takes care of waiting for a pagination token from
6060
/// sync, if we haven't done that before.
6161
///
62+
/// The `until` argument is an async closure that returns a [`ControlFlow`]
63+
/// to decide whether a new pagination must be run or not. It's helpful when
64+
/// the server replies with e.g. a certain set of events, but we would like
65+
/// more, or the event we are looking for isn't part of this set: in this
66+
/// case, `until` returns [`Control::Continue`], otherwise it returns
67+
/// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as
68+
/// its sole argument.
69+
///
6270
/// # Errors
6371
///
6472
/// It may return an error if the pagination token used during
6573
/// back-pagination has disappeared while we started the pagination. In
6674
/// that case, it's desirable to call the method again.
67-
#[instrument(skip(self))]
68-
pub async fn run_backwards(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
75+
///
76+
/// # Example
77+
///
78+
/// To do a single run:
79+
///
80+
/// ```rust
81+
/// use std::ops::ControlFlow;
82+
///
83+
/// use matrix_sdk::event_cache::{
84+
/// BackPaginationOutcome,
85+
/// RoomPagination,
86+
/// TimelineHasBeenResetWhilePaginating
87+
/// };
88+
///
89+
/// # async fn foo(room_pagination: RoomPagination) {
90+
/// let result = room_pagination.run_backwards(
91+
/// 42,
92+
/// |BackPaginationOutcome { events, reached_start },
93+
/// _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move {
94+
/// // Do something with `events` and `reached_start` maybe?
95+
/// let _ = events;
96+
/// let _ = reached_start;
97+
///
98+
/// ControlFlow::Break(())
99+
/// }
100+
/// ).await;
101+
/// # }
102+
#[instrument(skip(self, until))]
103+
pub async fn run_backwards<Until, Break, UntilFuture>(
104+
&self,
105+
batch_size: u16,
106+
mut until: Until,
107+
) -> Result<Break>
108+
where
109+
Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture,
110+
UntilFuture: Future<Output = ControlFlow<Break, ()>>,
111+
{
112+
let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
113+
69114
loop {
70-
if let Some(result) = self.run_backwards_impl(batch_size).await? {
71-
return Ok(result);
115+
if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
116+
match until(outcome, timeline_has_been_reset).await {
117+
ControlFlow::Continue(()) => {
118+
trace!("back-pagination continues");
119+
120+
timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
121+
122+
continue;
123+
}
124+
125+
ControlFlow::Break(value) => return Ok(value),
126+
}
72127
}
128+
129+
timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes;
130+
73131
debug!("back-pagination has been internally restarted because of a timeline reset.");
74132
}
75133
}
@@ -259,6 +317,16 @@ impl RoomPagination {
259317
}
260318
}
261319

320+
/// A type representing whether the timeline has been reset.
321+
#[derive(Debug)]
322+
pub enum TimelineHasBeenResetWhilePaginating {
323+
/// The timeline has been reset.
324+
Yes,
325+
326+
/// The timeline has not been reset.
327+
No,
328+
}
329+
262330
#[cfg(test)]
263331
mod tests {
264332
// Those tests require time to work, and it does not on wasm32.

0 commit comments

Comments
 (0)