Skip to content

Change timeout/delay functions to non-async #910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rtic-time/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ For each category, *Added*, *Changed*, *Fixed* add new entries at the top!

## Unreleased

### Changed

- Replace `async` implementations of `delay`/`delay_until`/`timeout`/`timeout_at` with structs to reduce memory usage.

## v2.0.0 - 2024-05-29

### Added
Expand Down
199 changes: 103 additions & 96 deletions rtic-time/src/timer_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@
use crate::linked_list::{self, Link, LinkedList};
use crate::TimeoutError;

use core::future::{poll_fn, Future};
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::task::{Poll, Waker};
use futures_util::{
future::{select, Either},
pin_mut,
};
use rtic_common::dropper::OnDrop;

mod backend;
mod tick_type;
Expand Down Expand Up @@ -67,26 +62,6 @@ pub struct TimerQueue<Backend: TimerQueueBackend> {
initialized: AtomicBool,
}

/// This is needed to make the async closure in `delay_until` accept that we "share"
/// the link possible between threads.
struct LinkPtr<Backend: TimerQueueBackend>(*mut Option<linked_list::Link<WaitingWaker<Backend>>>);

impl<Backend: TimerQueueBackend> Clone for LinkPtr<Backend> {
fn clone(&self) -> Self {
LinkPtr(self.0)
}
}

impl<Backend: TimerQueueBackend> LinkPtr<Backend> {
/// This will dereference the pointer stored within and give out an `&mut`.
unsafe fn get(&mut self) -> &mut Option<linked_list::Link<WaitingWaker<Backend>>> {
&mut *self.0
}
}

unsafe impl<Backend: TimerQueueBackend> Send for LinkPtr<Backend> {}
unsafe impl<Backend: TimerQueueBackend> Sync for LinkPtr<Backend> {}

impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
fn default() -> Self {
Self::new()
Expand All @@ -112,7 +87,7 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
pub fn initialize(&self, backend: Backend) {
self.initialized.store(true, Ordering::SeqCst);

// Don't run drop on `Mono`
// Don't run drop on `Backend`
core::mem::forget(backend);
}

Expand Down Expand Up @@ -164,29 +139,29 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
}

/// Timeout at a specific time.
pub async fn timeout_at<F: Future>(
pub fn timeout_at<F: Future>(
&self,
instant: Backend::Ticks,
future: F,
) -> Result<F::Output, TimeoutError> {
let delay = self.delay_until(instant);

pin_mut!(future);
pin_mut!(delay);

match select(future, delay).await {
Either::Left((r, _)) => Ok(r),
Either::Right(_) => Err(TimeoutError),
) -> Timeout<'_, Backend, F> {
Timeout {
delay: Delay::<Backend> {
instant,
queue: &self.queue,
link_ptr: None,
marker: AtomicUsize::new(0),
},
future,
}
}

/// Timeout after at least a specific duration.
#[inline]
pub async fn timeout_after<F: Future>(
pub fn timeout_after<F: Future>(
&self,
duration: Backend::Ticks,
future: F,
) -> Result<F::Output, TimeoutError> {
) -> Timeout<'_, Backend, F> {
let now = Backend::now();
let mut timeout = now.wrapping_add(duration);
if now != timeout {
Expand All @@ -195,12 +170,12 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {

// Wait for one period longer, because by definition timers have an uncertainty
// of one period, so waiting for 'at least' needs to compensate for that.
self.timeout_at(timeout, future).await
self.timeout_at(timeout, future)
}

/// Delay for at least some duration of time.
#[inline]
pub async fn delay(&self, duration: Backend::Ticks) {
pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> {
let now = Backend::now();
let mut timeout = now.wrapping_add(duration);
if now != timeout {
Expand All @@ -209,79 +184,111 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {

// Wait for one period longer, because by definition timers have an uncertainty
// of one period, so waiting for 'at least' needs to compensate for that.
self.delay_until(timeout).await;
self.delay_until(timeout)
}

/// Delay to some specific time instant.
pub async fn delay_until(&self, instant: Backend::Ticks) {
pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> {
if !self.initialized.load(Ordering::Relaxed) {
panic!(
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
);
}
Delay::<Backend> {
instant,
queue: &self.queue,
link_ptr: None,
marker: AtomicUsize::new(0),
}
}
}

let mut link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>> = None;
/// Future returned by `delay` and `delay_until`.
pub struct Delay<'q, Backend: TimerQueueBackend> {
instant: Backend::Ticks,
queue: &'q LinkedList<WaitingWaker<Backend>>,
link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>>,
marker: AtomicUsize,
}

// Make this future `Drop`-safe
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
let mut link_ptr =
LinkPtr(&mut link_ptr as *mut Option<linked_list::Link<WaitingWaker<Backend>>>);
let mut link_ptr2 = link_ptr.clone();
impl<'q, Backend: TimerQueueBackend> Future for Delay<'q, Backend> {
type Output = ();

let queue = &self.queue;
let marker = &AtomicUsize::new(0);
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
// SAFETY: We ensure we never move anything out of this.
let this = unsafe { self.get_unchecked_mut() };

let dropper = OnDrop::new(|| {
queue.delete(marker.load(Ordering::Relaxed));
});
if Backend::now().is_at_least(this.instant) {
return Poll::Ready(());
}

poll_fn(|cx| {
if Backend::now().is_at_least(instant) {
return Poll::Ready(());
// SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
// in `drop` we can't do this access concurrently with queue removal.
let link = &mut this.link_ptr;
if link.is_none() {
let link_ref = link.insert(Link::new(WaitingWaker {
waker: cx.waker().clone(),
release_at: this.instant,
was_popped: AtomicBool::new(false),
}));

// SAFETY(new_unchecked): The address to the link is stable as it is defined
// outside this stack frame.
// SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
// the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
// queue on drop, which happens before the struct and thus `link_ptr` goes out of
// scope.
let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
this.marker.store(addr, Ordering::Relaxed);
if head_updated {
Backend::pend_interrupt()
}
}

// SAFETY: This pointer is only dereferenced here and on drop of the future
// which happens outside this `poll_fn`'s stack frame, so this mutable access cannot
// happen at the same time as `dropper` runs.
let link = unsafe { link_ptr2.get() };
if link.is_none() {
let link_ref = link.insert(Link::new(WaitingWaker {
waker: cx.waker().clone(),
release_at: instant,
was_popped: AtomicBool::new(false),
}));

// SAFETY(new_unchecked): The address to the link is stable as it is defined
//outside this stack frame.
// SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and
// we make sure in `dropper` that the link is removed from the queue before
// dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives
// until the end of the stack frame.
let (head_updated, addr) = unsafe { queue.insert(Pin::new_unchecked(link_ref)) };

marker.store(addr, Ordering::Relaxed);

if head_updated {
// Pend the monotonic handler if the queue head was updated.
Backend::pend_interrupt()
}
Poll::Pending
}
}

impl<'q, Backend: TimerQueueBackend> Drop for Delay<'q, Backend> {
fn drop(&mut self) {
// SAFETY: Drop cannot be run at the same time as poll, so we can't end up
// derefencing this concurrently to the one in `poll`.
match self.link_ptr.as_ref() {
None => return,
// If it was popped from the queue there is no need to run delete
Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
_ => {}
}
self.queue.delete(self.marker.load(Ordering::Relaxed));
}
}

/// Future returned by `timeout` and `timeout_at`.
pub struct Timeout<'q, Backend: TimerQueueBackend, F> {
delay: Delay<'q, Backend>,
future: F,
}

impl<'q, Backend: TimerQueueBackend, F: Future> Future for Timeout<'q, Backend, F> {
type Output = Result<F::Output, TimeoutError>;

fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
let inner = unsafe { self.get_unchecked_mut() };

{
let f = unsafe { Pin::new_unchecked(&mut inner.future) };
if let Poll::Ready(v) = f.poll(cx) {
return Poll::Ready(Ok(v));
}
}

Poll::Pending
})
.await;

// SAFETY: We only run this and dereference the pointer if we have
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
// of this pointer is in the `poll_fn`.
if let Some(link) = unsafe { link_ptr.get() } {
if link.val.was_popped.load(Ordering::Relaxed) {
// If it was popped from the queue there is no need to run delete
dropper.defuse();
{
let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
if d.poll(cx).is_ready() {
return Poll::Ready(Err(TimeoutError));
}
} else {
// Make sure that our link is deleted from the list before we drop this stack
drop(dropper);
}

Poll::Pending
}
}
Loading