Skip to content

feat(sdk): Dropping an UpdatesSubscriber release its reader token for the garbage collector #4102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 189 additions & 11 deletions crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,22 @@ where
}
}

impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
fn drop(&mut self) {
// Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
// This is important so that the garbage collector can do its jobs correctly
// without a dead dangling reader token.
if let Some(updates) = self.updates.upgrade() {
let mut updates = updates.write().unwrap();

// Remove the reader token from `UpdatesInner`.
// It's safe to ignore the result of `remove` here: `None` means the token was
// already removed (note: it should be unreachable).
let _ = updates.last_index_per_reader.remove(&self.token);
}
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down Expand Up @@ -495,7 +511,7 @@ mod tests {
// | d | e | f | g | h | i |
// +---+---+---+---+---+---+
//
// “main” will have its index updated from 3 to 0.
// “main” will have its index updated from 0 to 3.
// “other” will have its index updated from 6 to 3.
{
let updates = linked_chunk.updates().unwrap();
Expand Down Expand Up @@ -563,19 +579,19 @@ mod tests {
}
}

#[test]
fn test_updates_stream() {
use super::Update::*;
struct CounterWaker {
number_of_wakeup: Mutex<usize>,
}

struct CounterWaker {
number_of_wakeup: Mutex<usize>,
impl Wake for CounterWaker {
fn wake(self: Arc<Self>) {
*self.number_of_wakeup.lock().unwrap() += 1;
}
}

impl Wake for CounterWaker {
fn wake(self: Arc<Self>) {
*self.number_of_wakeup.lock().unwrap() += 1;
}
}
#[test]
fn test_updates_stream() {
use super::Update::*;

let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
let waker = counter_waker.clone().into();
Expand Down Expand Up @@ -646,4 +662,166 @@ mod tests {
// Wakers calls have not changed.
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
}

#[test]
fn test_updates_multiple_streams() {
use super::Update::*;

let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });

let waker1 = counter_waker1.clone().into();
let waker2 = counter_waker2.clone().into();

let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);

let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();

let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
pin_mut!(updates_subscriber1);

// Scope for `updates_subscriber2`.
let updates_subscriber2_token = {
let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
pin_mut!(updates_subscriber2);

// No update, streams are pending.
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);

// Let's generate an update.
linked_chunk.push_items_back(['a']);

// The wakers must have been called.
assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);

// There is an update! Right after that, the streams are pending again.
assert_matches!(
updates_subscriber1.as_mut().poll_next(&mut context1),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
}
);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
assert_matches!(
updates_subscriber2.as_mut().poll_next(&mut context2),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
}
);
assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);

// Let's generate two other updates.
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);

// A waker is consumed when called. The first call to `push_items_back` will
// call and consume the wakers. The second call to `push_items_back` will do
// nothing as the wakers have been consumed. New wakers will be registered on
// polling.
//
// So, the waker must have been called only once for the two updates.
assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);

// Let's poll `updates_subscriber1` only.
assert_matches!(
updates_subscriber1.as_mut().poll_next(&mut context1),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[
PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
}
);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);

// For the sake of this test, we also need to advance the main reader token.
let _ = linked_chunk.updates().unwrap().take();
let _ = linked_chunk.updates().unwrap().take();
Comment on lines +752 to +754
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh? Why do we need to do it twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's something I want to change. The garbage collector is run before take_with_token is called (which is called by take). I think it's better to run the garbage collector after take_with_token is called: “I read new items, I clean read items” vs. “I clean previously read items, I read new ones”. Right now, we are in the second pattern, but I believe the first is preferable. I'll address that in the next pull request.


// If we inspect the garbage collector state, `a`, `b` and `c` should still be
// present because not all of them have been consumed by `updates_subscriber2`
// yet.
{
let updates = linked_chunk.updates().unwrap();

let inner = updates.inner.read().unwrap();

// Inspect number of updates in memory.
// We get 2 because the garbage collector runs before data are taken, not after:
// `updates_subscriber2` has read `a` only, so `b` and `c` remain.
assert_eq!(inner.len(), 2);

// Inspect the indices.
let indices = &inner.last_index_per_reader;

assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
}

// Poll `updates_subscriber1` again: there is no new update so it must be
// pending.
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);

// The state of the garbage collector is unchanged: `a`, `b` and `c` are still
// in memory.
{
let updates = linked_chunk.updates().unwrap();

let inner = updates.inner.read().unwrap();

// Inspect number of updates in memory. Value is unchanged.
assert_eq!(inner.len(), 2);

// Inspect the indices. They are unchanged.
let indices = &inner.last_index_per_reader;

assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
}

updates_subscriber2.token
// Drop `updates_subscriber2`!
};

// `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
// still no new update, but it will run the garbage collector again, and this
// time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
// collector must be empty.
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);

// Inspect the garbage collector.
{
let updates = linked_chunk.updates().unwrap();

let inner = updates.inner.read().unwrap();

// Inspect number of updates in memory.
assert_eq!(inner.len(), 0);

// Inspect the indices.
let indices = &inner.last_index_per_reader;

assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
}

// When dropping the `LinkedChunk`, it closes the stream.
drop(linked_chunk);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
}
}
Loading