Skip to content

Commit 0db7664

Browse files
committed
feat(sdk): Add LinkedChunkUpdatesSubscriber.
This patch implements `LinkedChunkUpdates::subscribe` and `LinkedChunkUpdateSubscriber`, which itself implements `Stream`. This patch splits `LinkedChunkUpdates` into `LinkedChunkUpdatesInner`, so that the latter can be shared with `LinkedChunkUpdatesSubscriber`.
1 parent 7621068 commit 0db7664

File tree

1 file changed

+187
-43
lines changed

1 file changed

+187
-43
lines changed

crates/matrix-sdk/src/event_cache/linked_chunk.rs

Lines changed: 187 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@ use std::{
1919
fmt,
2020
marker::PhantomData,
2121
ops::Not,
22+
pin::Pin,
2223
ptr::NonNull,
23-
sync::atomic::{AtomicU64, Ordering},
24+
sync::{
25+
atomic::{AtomicU64, Ordering},
26+
Arc, RwLock, Weak,
27+
},
28+
task::{Context, Poll, Waker},
2429
};
2530

31+
use futures_core::Stream;
32+
2633
/// Errors of [`LinkedChunk`].
2734
#[derive(thiserror::Error, Debug)]
2835
pub enum LinkedChunkError {
@@ -129,6 +136,10 @@ where
129136
///
130137
/// Get a value for this type with [`LinkedChunk::updates`].
131138
pub struct LinkedChunkUpdates<Item, Gap> {
139+
inner: Arc<RwLock<LinkedChunkUpdatesInner<Item, Gap>>>,
140+
}
141+
142+
struct LinkedChunkUpdatesInner<Item, Gap> {
132143
/// All the updates that have not been peeked nor taken.
133144
updates: Vec<LinkedChunkUpdate<Item, Gap>>,
134145

@@ -137,23 +148,24 @@ pub struct LinkedChunkUpdates<Item, Gap> {
137148

138149
/// The last index used by the last call of [`Self::peek`].
139150
last_peeked_index: usize,
140-
}
141151

142-
impl<Item, Gap> LinkedChunkUpdates<Item, Gap> {
143-
/// Create a new [`Self`].
144-
fn new() -> Self {
145-
Self { updates: Vec::new(), last_taken_index: 0, last_peeked_index: 0 }
146-
}
152+
wakers: Vec<Waker>,
153+
}
147154

155+
impl<Item, Gap> LinkedChunkUpdatesInner<Item, Gap> {
148156
/// Push a new update.
149157
fn push(&mut self, update: LinkedChunkUpdate<Item, Gap>) {
150158
self.updates.push(update);
159+
160+
for waker in self.wakers.drain(..) {
161+
waker.wake();
162+
}
151163
}
152164

153165
/// Take new updates.
154166
///
155167
/// Updates that have been taken will not be read again.
156-
pub fn take(&mut self) -> &[LinkedChunkUpdate<Item, Gap>] {
168+
fn take(&mut self) -> &[LinkedChunkUpdate<Item, Gap>] {
157169
// Let's garbage collect unused updates.
158170
self.garbage_collect();
159171

@@ -184,18 +196,17 @@ impl<Item, Gap> LinkedChunkUpdates<Item, Gap> {
184196
slice
185197
}
186198

187-
/// Return `true` if there is new updates that can be read with
188-
/// [`Self::take`].
189-
fn has_new_takable_updates(&self) -> bool {
190-
self.last_taken_index < self.updates.len()
191-
}
192-
193199
/// Return `true` if there is new update that can be read with
194200
/// [`Self::peek`].
195201
fn has_new_peekable_updates(&self) -> bool {
196202
self.last_peeked_index < self.updates.len()
197203
}
198204

205+
/// Return the number of updates in the buffer.
206+
fn len(&self) -> usize {
207+
self.updates.len()
208+
}
209+
199210
/// Garbage collect unused updates. An update is considered unused when it's
200211
/// been read by `Self::take` **and** by `Self::peek`.
201212
///
@@ -212,10 +223,91 @@ impl<Item, Gap> LinkedChunkUpdates<Item, Gap> {
212223
self.last_peeked_index -= min_index;
213224
}
214225
}
226+
}
215227

216-
/// Return the number of updates in the buffer.
217-
fn len(&self) -> usize {
218-
self.updates.len()
228+
impl<Item, Gap> LinkedChunkUpdates<Item, Gap> {
229+
/// Create a new [`Self`].
230+
fn new() -> Self {
231+
Self {
232+
inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner {
233+
updates: Vec::new(),
234+
last_taken_index: 0,
235+
last_peeked_index: 0,
236+
wakers: Vec::new(),
237+
})),
238+
}
239+
}
240+
241+
/// Push a new update.
242+
fn push(&mut self, update: LinkedChunkUpdate<Item, Gap>) {
243+
self.inner.write().unwrap().push(update);
244+
}
245+
246+
/// Take new updates.
247+
///
248+
/// Updates that have been taken will not be read again.
249+
pub fn take(&mut self) -> Vec<LinkedChunkUpdate<Item, Gap>>
250+
where
251+
Item: Clone,
252+
Gap: Clone,
253+
{
254+
self.inner.write().unwrap().take().to_owned()
255+
}
256+
257+
/// Return `true` if there is new updates that can be read with
258+
/// [`Self::take`].
259+
pub fn has_new_takable_updates(&self) -> bool {
260+
let inner = self.inner.read().unwrap();
261+
262+
inner.last_taken_index < inner.updates.len()
263+
}
264+
265+
/// Subscribe to updates by using a [`Stream`].
266+
///
267+
/// TODO: only one subscriber must exist so far because multiple concurrent
268+
/// subscriber would conflict on the garbage collector. It's not complex to
269+
/// fix, I will do it.
270+
fn subscribe(&self) -> LinkedChunkUpdatesSubscriber<Item, Gap> {
271+
LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner) }
272+
}
273+
}
274+
275+
/// A subscriber to [`LinkedChunkUpdates`]. It is helpful to receive updates via
276+
/// a [`Stream`].
277+
struct LinkedChunkUpdatesSubscriber<Item, Gap> {
278+
/// Weak reference to [`LinkedChunkUpdatesInner`].
279+
///
280+
/// Using a weak reference allows [`LinkedChunkUpdates`] to be dropped
281+
/// freely even if a subscriber exists.
282+
updates: Weak<RwLock<LinkedChunkUpdatesInner<Item, Gap>>>,
283+
}
284+
285+
impl<Item, Gap> Stream for LinkedChunkUpdatesSubscriber<Item, Gap>
286+
where
287+
Item: Clone,
288+
Gap: Clone,
289+
{
290+
type Item = Vec<LinkedChunkUpdate<Item, Gap>>;
291+
292+
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
293+
let Some(updates) = self.updates.upgrade() else {
294+
// The `LinkedChunkUpdates` has been dropped. It's time close this stream.
295+
return Poll::Ready(None);
296+
};
297+
298+
let mut updates = updates.write().unwrap();
299+
300+
// No updates to peek.
301+
if updates.has_new_peekable_updates().not() {
302+
// Let's register the waker.
303+
updates.wakers.push(context.waker().clone());
304+
305+
// The stream is pending.
306+
return Poll::Pending;
307+
}
308+
309+
// There is updates to peek! Let's forward them in this stream.
310+
return Poll::Ready(Some(updates.peek().to_owned()));
219311
}
220312
}
221313

@@ -1301,13 +1393,13 @@ where
13011393

13021394
#[cfg(test)]
13031395
mod tests {
1304-
use std::ops::Not;
1305-
13061396
use assert_matches::assert_matches;
1397+
use futures_util::pin_mut;
1398+
use stream_assert::{assert_closed, assert_next_eq, assert_pending};
13071399

13081400
use super::{
13091401
Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk,
1310-
LinkedChunkError, Position,
1402+
LinkedChunkError, Not, Position,
13111403
};
13121404

13131405
/// A macro to test the items and the gap of a `LinkedChunk`.
@@ -1426,9 +1518,9 @@ mod tests {
14261518
{
14271519
let updates = linked_chunk.updates().unwrap();
14281520

1429-
assert!(updates.has_new_peekable_updates().not());
1521+
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
14301522
assert!(updates.has_new_takable_updates().not());
1431-
assert!(updates.peek().is_empty());
1523+
assert!(updates.inner.write().unwrap().peek().is_empty());
14321524
assert!(updates.take().is_empty());
14331525
}
14341526

@@ -1440,25 +1532,25 @@ mod tests {
14401532

14411533
{
14421534
// Inspect number of updates in memory.
1443-
assert_eq!(updates.len(), 1);
1535+
assert_eq!(updates.inner.read().unwrap().len(), 1);
14441536
}
14451537

14461538
// Peek the update.
14471539
assert!(updates.has_new_takable_updates());
1448-
assert!(updates.has_new_peekable_updates());
1540+
assert!(updates.inner.read().unwrap().has_new_peekable_updates());
14491541
assert_eq!(
1450-
updates.peek(),
1542+
updates.inner.write().unwrap().peek(),
14511543
&[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
14521544
);
14531545

14541546
// No more update to peek.
14551547
assert!(updates.has_new_takable_updates());
1456-
assert!(updates.has_new_peekable_updates().not());
1457-
assert!(updates.peek().is_empty());
1548+
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
1549+
assert!(updates.inner.write().unwrap().peek().is_empty());
14581550

14591551
{
14601552
// Inspect number of updates in memory.
1461-
assert_eq!(updates.len(), 1);
1553+
assert_eq!(updates.inner.read().unwrap().len(), 1);
14621554
}
14631555
}
14641556

@@ -1469,19 +1561,19 @@ mod tests {
14691561
let updates = linked_chunk.updates().unwrap();
14701562

14711563
// Inspect number of updates in memory.
1472-
assert_eq!(updates.len(), 2);
1564+
assert_eq!(updates.inner.read().unwrap().len(), 2);
14731565

14741566
// Peek the update…
14751567
assert!(updates.has_new_takable_updates());
1476-
assert!(updates.has_new_peekable_updates());
1568+
assert!(updates.inner.read().unwrap().has_new_peekable_updates());
14771569
assert_eq!(
1478-
updates.peek(),
1570+
updates.inner.write().unwrap().peek(),
14791571
&[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },]
14801572
);
14811573

14821574
{
14831575
// Inspect number of updates in memory.
1484-
assert_eq!(updates.len(), 2);
1576+
assert_eq!(updates.inner.read().unwrap().len(), 2);
14851577
}
14861578

14871579
// … and take the update.
@@ -1495,19 +1587,19 @@ mod tests {
14951587

14961588
{
14971589
// Inspect number of updates in memory.
1498-
assert_eq!(updates.len(), 2);
1590+
assert_eq!(updates.inner.read().unwrap().len(), 2);
14991591
}
15001592

15011593
// No more update to peek or to take.
15021594
assert!(updates.has_new_takable_updates().not());
1503-
assert!(updates.has_new_peekable_updates().not());
1504-
assert!(updates.peek().is_empty());
1595+
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
1596+
assert!(updates.inner.write().unwrap().peek().is_empty());
15051597
assert!(updates.take().is_empty());
15061598

15071599
{
15081600
// Inspect number of updates in memory.
15091601
// The updates have been garbage collected.
1510-
assert_eq!(updates.len(), 0);
1602+
assert_eq!(updates.inner.read().unwrap().len(), 0);
15111603
}
15121604
}
15131605

@@ -1519,40 +1611,92 @@ mod tests {
15191611

15201612
{
15211613
// Inspect number of updates in memory.
1522-
assert_eq!(updates.len(), 1);
1614+
assert_eq!(updates.inner.read().unwrap().len(), 1);
15231615
}
15241616

15251617
// Take and peek the update.
15261618
assert!(updates.has_new_takable_updates());
1527-
assert!(updates.has_new_peekable_updates());
1619+
assert!(updates.inner.read().unwrap().has_new_peekable_updates());
15281620
assert_eq!(
15291621
updates.take(),
15301622
&[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },]
15311623
);
15321624
assert_eq!(
1533-
updates.peek(),
1625+
updates.inner.write().unwrap().peek(),
15341626
&[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },]
15351627
);
15361628

15371629
{
15381630
// Inspect number of updates in memory.
1539-
assert_eq!(updates.len(), 1);
1631+
assert_eq!(updates.inner.read().unwrap().len(), 1);
15401632
}
15411633

15421634
// No more update to peek or to take.
15431635
assert!(updates.has_new_takable_updates().not());
1544-
assert!(updates.has_new_peekable_updates().not());
1545-
assert!(updates.peek().is_empty());
1636+
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
1637+
assert!(updates.inner.write().unwrap().peek().is_empty());
15461638
assert!(updates.take().is_empty());
15471639

15481640
{
15491641
// Inspect number of updates in memory.
15501642
// The update has been garbage collected.
1551-
assert_eq!(updates.len(), 0);
1643+
assert_eq!(updates.inner.read().unwrap().len(), 0);
15521644
}
15531645
}
15541646
}
15551647

1648+
#[test]
1649+
fn test_updates_stream() {
1650+
use super::LinkedChunkUpdate::*;
1651+
1652+
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
1653+
1654+
let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
1655+
pin_mut!(updates_subscriber);
1656+
1657+
// No update, stream is pending.
1658+
assert_pending!(updates_subscriber);
1659+
1660+
// Let's generate an update.
1661+
linked_chunk.push_items_back(['a']);
1662+
1663+
// There is an update! Right after that, the stream is pending again.
1664+
assert_next_eq!(
1665+
updates_subscriber,
1666+
&[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
1667+
);
1668+
assert_pending!(updates_subscriber);
1669+
1670+
// Let's generate two other updates.
1671+
linked_chunk.push_items_back(['b']);
1672+
linked_chunk.push_items_back(['c']);
1673+
1674+
// We can consume the updates without the stream, but the stream continues to
1675+
// know it has updates.
1676+
assert_eq!(
1677+
linked_chunk.updates().unwrap().take(),
1678+
&[
1679+
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
1680+
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
1681+
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
1682+
]
1683+
);
1684+
assert_next_eq!(
1685+
updates_subscriber,
1686+
&[
1687+
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
1688+
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
1689+
]
1690+
);
1691+
assert_pending!(updates_subscriber);
1692+
1693+
// When dropping the `LinkedChunk`, it closes the stream.
1694+
drop(linked_chunk);
1695+
assert_closed!(updates_subscriber);
1696+
1697+
// TODO: ensure the wakers are called.
1698+
}
1699+
15561700
#[test]
15571701
fn test_push_items() {
15581702
use super::LinkedChunkUpdate::*;

0 commit comments

Comments
 (0)