Skip to content

rtic-sync: deal with dropping & re-splitting channels. #1040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rtic-sync/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ For each category, _Added_, _Changed_, _Fixed_ add new entries at the top!

## [Unreleased]

- Avoid a critical section when a `send`-link is popped and when returning `free_slot`.
### Changed

- Actually drop items left over in `Channel` on drop of `Receiver`.
- Allow for `split()`-ing a channel more than once without immediately panicking.
- Add `loom` support.
- Avoid a critical section when a `send`-link is popped and when returning `free_slot`.
- Don't force `Signal` import when using `make_signal` macro
- Update `make_signal`'s documentation to match `make_channel`'s

Expand Down
92 changes: 78 additions & 14 deletions rtic-sync/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,19 @@ impl<T, const N: usize> Channel<T, N> {

/// Split the queue into a `Sender`/`Receiver` pair.
pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
// NOTE(assert): queue is cleared by dropping the corresponding `Receiver`.
debug_assert!(self.readyq.as_mut().is_empty(),);

let freeq = self.freeq.as_mut();

freeq.clear();

// Fill free queue
for idx in 0..N as u8 {
// NOTE(assert): `split`-ing does not put `freeq` into a known-empty
// state, so `debug_assert` is not good enough.
assert!(!freeq.is_full());
debug_assert!(!freeq.is_full());

// SAFETY: This safe as the loop goes from 0 to the capacity of the underlying queue.
// SAFETY: This safe as the loop goes from 0 to the capacity of the underlying queue,
// and the queue is cleared beforehand.
unsafe {
freeq.push_back_unchecked(idx);
}
Expand All @@ -137,7 +141,8 @@ impl<T, const N: usize> Channel<T, N> {
/// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
/// 2. else, insert `slot` into `self.freeq`.
///
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`], and that `slot`
/// is returned at most once.
unsafe fn return_free_slot(&self, slot: u8) {
critical_section::with(|cs| {
fence(Ordering::SeqCst);
Expand All @@ -152,14 +157,22 @@ impl<T, const N: usize> Channel<T, N> {
// SAFETY: `self.freeq` is not called recursively.
unsafe {
self.freeq(cs, |freeq| {
assert!(!freeq.is_full());
debug_assert!(!freeq.is_full());
// SAFETY: `freeq` is not full.
freeq.push_back_unchecked(slot);
});
}
}
})
}

/// SAFETY: the caller must guarantee that `slot` is an `u8` obtained by dequeueing from [`Self::readyq`],
/// and is read at most once.
unsafe fn read_slot(&self, slot: u8) -> T {
let first_element = self.slots.get_unchecked(slot as usize).get_mut();
let ptr = first_element.deref().as_ptr();
ptr::read(ptr)
}
}

/// Creates a split channel with `'static` lifetime.
Expand Down Expand Up @@ -324,7 +337,7 @@ impl<T, const N: usize> Sender<'_, T, N> {
// SAFETY: `self.0.readyq` is not called recursively.
unsafe {
self.0.readyq(cs, |readyq| {
assert!(!readyq.is_full());
debug_assert!(!readyq.is_full());
// SAFETY: ready is not full.
readyq.push_back_unchecked(idx);
});
Expand Down Expand Up @@ -458,7 +471,7 @@ impl<T, const N: usize> Sender<'_, T, N> {
// SAFETY: `self.0.freeq` is not called recursively.
unsafe {
self.0.freeq(cs, |freeq| {
assert!(!freeq.is_empty());
debug_assert!(!freeq.is_empty());
// SAFETY: `freeq` is non-empty
let slot = freeq.pop_back_unchecked();
Poll::Ready(Ok(slot))
Expand Down Expand Up @@ -579,14 +592,13 @@ impl<T, const N: usize> Receiver<'_, T, N> {

if let Some(rs) = ready_slot {
// Read the value from the slots, note; this memcpy is not under a critical section.
let r = unsafe {
let first_element = self.0.slots.get_unchecked(rs as usize).get_mut();
let ptr = first_element.deref().as_ptr();
ptr::read(ptr)
};
// SAFETY: `rs` is directly obtained from `self.0.readyq` and is read exactly
// once.
let r = unsafe { self.0.read_slot(rs) };

// Return the index to the free queue after we've read the value.
// SAFETY: `rs` comes directly from `readyq`.
// SAFETY: `rs` comes directly from `readyq` and is only returned
// once.
unsafe { self.0.return_free_slot(rs) };

Ok(r)
Expand Down Expand Up @@ -655,6 +667,19 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
self.0.receiver_dropped(cs, |v| *v = true);
});

let ready_slot = || {
critical_section::with(|cs| unsafe {
// SAFETY: `self.0.readyq` is not called recursively.
self.0.readyq(cs, |q| q.pop_back())
})
};

while let Some(slot) = ready_slot() {
// SAFETY: `slot` comes from `readyq` and is
// read exactly once.
drop(unsafe { self.0.read_slot(slot) })
}

while let Some((waker, _)) = self.0.wait_queue.pop() {
waker.wake();
}
Expand All @@ -664,6 +689,9 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
#[cfg(test)]
#[cfg(not(loom))]
mod tests {
use core::sync::atomic::AtomicBool;
use std::sync::Arc;

use cassette::Cassette;

use super::*;
Expand Down Expand Up @@ -801,6 +829,42 @@ mod tests {
// Make sure that rx & tx are alive until here for good measure.
drop((tx, rx));
}

#[derive(Debug)]
struct SetToTrueOnDrop(Arc<AtomicBool>);

impl Drop for SetToTrueOnDrop {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}

#[test]
fn non_popped_item_is_dropped() {
let mut channel: Channel<SetToTrueOnDrop, 1> = Channel::new();

let (mut tx, rx) = channel.split();

let value = Arc::new(AtomicBool::new(false));
tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();

drop((tx, rx));

assert!(value.load(Ordering::SeqCst));
}

#[test]
pub fn splitting_empty_channel_works() {
let mut channel: Channel<(), 1> = Channel::new();

let (mut tx, rx) = channel.split();

tx.try_send(()).unwrap();

drop((tx, rx));

channel.split();
}
}

#[cfg(not(loom))]
Expand Down