Skip to content

Commit 90dca51

Browse files
committed
Add an async implementation of Queue
1 parent 62b122e commit 90dca51

File tree

7 files changed

+465
-0
lines changed

7 files changed

+465
-0
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ optional = true
5959
[dev-dependencies.ufmt]
6060
version = "0.1"
6161

62+
[dev-dependencies.tokio]
63+
version = "1"
64+
default-features = false
65+
features = [ "macros", "rt", "time" ]
66+
6267
[dependencies.defmt]
6368
version = ">=0.2.0,<0.4"
6469
optional = true

src/async_impl/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//! This module contains an async variant of [`Queue`]
2+
//!
3+
//! [`Queue`]: crate::spsc::Queue
4+
5+
mod ssq;
6+
7+
pub mod spsc;

src/async_impl/spsc/consumer.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use core::{
2+
future::Future,
3+
task::{Poll, Waker},
4+
};
5+
6+
use crate::{
7+
async_impl::ssq::{WakerConsumer, WakerProducer},
8+
spsc::Consumer as HConsumer,
9+
};
10+
11+
/// An async consumer
12+
pub struct Consumer<'queue, T, const N: usize>
13+
where
14+
T: Unpin,
15+
{
16+
inner: HConsumer<'queue, T, N>,
17+
producer_waker: WakerConsumer<'queue>,
18+
consumer_waker: WakerProducer<'queue>,
19+
}
20+
21+
impl<'queue, T, const N: usize> Consumer<'queue, T, N>
22+
where
23+
T: Unpin,
24+
{
25+
pub(crate) fn new(
26+
consumer: HConsumer<'queue, T, N>,
27+
producer_waker: WakerConsumer<'queue>,
28+
consumer_waker: WakerProducer<'queue>,
29+
) -> Self {
30+
Self {
31+
inner: consumer,
32+
producer_waker,
33+
consumer_waker,
34+
}
35+
}
36+
37+
/// Check if there are any items to dequeue.
38+
///
39+
/// When this returns true, at least the first subsequent [`Self::dequeue`] will succeed immediately
40+
pub fn ready(&self) -> bool {
41+
self.inner.ready()
42+
}
43+
44+
/// Returns the maximum number of elements the queue can hold
45+
pub fn capacity(&self) -> usize {
46+
self.inner.capacity()
47+
}
48+
49+
/// Returns the amount of elements currently in the queue
50+
pub fn len(&self) -> usize {
51+
self.inner.len()
52+
}
53+
54+
/// Dequeue an item from the backing queue.
55+
///
56+
/// The returned future only resolves once an item was succesfully
57+
/// dequeued.
58+
pub fn dequeue<'me>(&'me mut self) -> ConsumerFuture<'me, 'queue, T, N> {
59+
ConsumerFuture {
60+
consumer: self,
61+
dequeued_value: None,
62+
}
63+
}
64+
65+
/// Attempt to dequeue an item from the backing queue.
66+
pub fn try_dequeue(&mut self) -> Option<T> {
67+
self.try_wake_producer();
68+
69+
self.inner.dequeue()
70+
}
71+
72+
/// Try to wake a [`Producer`](super::Producer) associated with the backing queue if
73+
/// it is waiting to be awoken.
74+
fn try_wake_producer(&mut self) {
75+
self.producer_waker.dequeue().map(|w| w.wake());
76+
}
77+
78+
/// Try to register `waker` as the waker for this [`Consumer`]
79+
///
80+
/// Will return
81+
fn try_register_waker<'v>(&mut self, waker: Waker) {
82+
// We can safely overwrite the old waker, as we can only ever have 1 instance
83+
// of `self` waiting to be awoken.
84+
self.consumer_waker.enqueue(waker);
85+
}
86+
}
87+
88+
pub struct ConsumerFuture<'consumer, 'queue, T, const N: usize>
89+
where
90+
T: Unpin,
91+
{
92+
consumer: &'consumer mut Consumer<'queue, T, N>,
93+
dequeued_value: Option<T>,
94+
}
95+
96+
impl<T, const N: usize> Future for ConsumerFuture<'_, '_, T, N>
97+
where
98+
T: Unpin,
99+
{
100+
type Output = T;
101+
102+
fn poll(
103+
self: core::pin::Pin<&mut Self>,
104+
cx: &mut core::task::Context<'_>,
105+
) -> Poll<Self::Output> {
106+
let try_wake_producer = |me: &mut Self, value| {
107+
me.consumer.try_wake_producer();
108+
return Poll::Ready(value);
109+
};
110+
111+
let me = self.get_mut();
112+
let con = &mut me.consumer;
113+
114+
if let Some(value) = me.dequeued_value.take() {
115+
// Try to wake the producer because we managed to
116+
// dequeue a value
117+
return try_wake_producer(me, value);
118+
}
119+
120+
me.dequeued_value = con.inner.dequeue();
121+
if let Some(value) = me.dequeued_value.take() {
122+
// Try to wake the producer because we managed to
123+
// dequeue a value
124+
try_wake_producer(me, value)
125+
} else {
126+
me.consumer.try_register_waker(cx.waker().clone());
127+
128+
Poll::Pending
129+
}
130+
}
131+
}

src/async_impl/spsc/mod.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
//! An async wrapper around [`Queue`]
2+
use crate::spsc::Queue as HQueue;
3+
4+
mod producer;
5+
pub use producer::Producer;
6+
7+
mod consumer;
8+
pub use consumer::Consumer;
9+
10+
use super::ssq::WakerQueue;
11+
12+
/// An async queue
13+
pub struct Queue<T, const N: usize>
14+
where
15+
T: Unpin,
16+
{
17+
inner: HQueue<T, N>,
18+
producer_waker: WakerQueue,
19+
consumer_waker: WakerQueue,
20+
}
21+
22+
impl<T, const N: usize> Queue<T, N>
23+
where
24+
T: Unpin,
25+
{
26+
/// Create a new Queue
27+
pub const fn new() -> Self {
28+
Self {
29+
inner: HQueue::new(),
30+
producer_waker: WakerQueue::new(),
31+
consumer_waker: WakerQueue::new(),
32+
}
33+
}
34+
35+
/// Split the queue into a producer and consumer
36+
pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) {
37+
let ((cwp, cwc), (pwp, pwc)) = (self.consumer_waker.split(), self.producer_waker.split());
38+
39+
let (producer, consumer) = self.inner.split();
40+
(
41+
Producer::new(producer, pwc, cwp),
42+
Consumer::new(consumer, pwp, cwc),
43+
)
44+
}
45+
}
46+
47+
#[cfg(test)]
48+
mod test {
49+
use std;
50+
use std::boxed::Box;
51+
use std::println;
52+
use std::time::Duration;
53+
use std::vec::Vec;
54+
55+
use super::Queue;
56+
57+
#[tokio::test]
58+
async fn spsc() {
59+
let queue: &'static mut Queue<u32, 8> = Box::leak(Box::new(Queue::new()));
60+
61+
let (mut tx, mut rx) = queue.split();
62+
const MAX: u32 = 100;
63+
let mut data = Vec::new();
64+
for i in 0..=MAX {
65+
data.push(i);
66+
}
67+
68+
let t1_data = data.clone();
69+
let t1 = tokio::task::spawn(async move {
70+
println!("Dequeueing...");
71+
let mut rx_data = Vec::new();
72+
loop {
73+
let value = rx.dequeue().await;
74+
println!("Succesfully dequeued {}", value);
75+
rx_data.push(value);
76+
if value == MAX {
77+
break;
78+
}
79+
}
80+
assert_eq!(t1_data, rx_data);
81+
});
82+
83+
let t2 = tokio::task::spawn(async move {
84+
let mut interval = tokio::time::interval(Duration::from_millis(1));
85+
println!("Enqueing...");
86+
for i in data {
87+
tx.enqueue(i).await;
88+
interval.tick().await;
89+
println!("Succesfully enqueued {}", i);
90+
}
91+
});
92+
93+
let (t1, t2) = tokio::join!(t1, t2);
94+
t1.unwrap();
95+
t2.unwrap();
96+
}
97+
}

src/async_impl/spsc/producer.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use core::{
2+
future::Future,
3+
task::{Poll, Waker},
4+
};
5+
6+
use crate::{
7+
async_impl::ssq::{WakerConsumer, WakerProducer},
8+
spsc::Producer as HProducer,
9+
};
10+
11+
/// An async producer
12+
pub struct Producer<'queue, T, const N: usize>
13+
where
14+
T: Unpin,
15+
{
16+
inner: HProducer<'queue, T, N>,
17+
producer_waker: WakerProducer<'queue>,
18+
consumer_waker: WakerConsumer<'queue>,
19+
}
20+
21+
impl<'queue, T, const N: usize> Producer<'queue, T, N>
22+
where
23+
T: Unpin,
24+
{
25+
pub(crate) fn new(
26+
producer: HProducer<'queue, T, N>,
27+
producer_waker: WakerProducer<'queue>,
28+
consumer_waker: WakerConsumer<'queue>,
29+
) -> Self {
30+
Self {
31+
inner: producer,
32+
producer_waker,
33+
consumer_waker,
34+
}
35+
}
36+
37+
/// Check if an item can be enqueued.
38+
///
39+
/// If this returns true, at least the first subsequent [`Self::enqueue`] will succeed
40+
/// immediately.
41+
pub fn ready(&self) -> bool {
42+
self.inner.ready()
43+
}
44+
45+
/// Returns the maximum number of elements the queue can hold.
46+
pub fn capacity(&self) -> usize {
47+
self.inner.capacity()
48+
}
49+
50+
/// Returns the amount of elements currently in the queue.
51+
pub fn len(&self) -> usize {
52+
self.inner.len()
53+
}
54+
55+
/// Enqueue `value` into the backing queue.
56+
///
57+
/// The returned Future only resolves once the value was
58+
/// succesfully enqueued.
59+
pub fn enqueue<'me>(&'me mut self, value: T) -> ProducerFuture<'me, 'queue, T, N> {
60+
let value = self.inner.enqueue(value).err();
61+
ProducerFuture {
62+
producer: self,
63+
value_to_enqueue: value,
64+
}
65+
}
66+
67+
/// Try to enqueue `value` into the backing queue.
68+
pub fn try_enqueue(&mut self, value: T) -> Result<(), T> {
69+
self.inner.enqueue(value)
70+
}
71+
72+
/// Try to wake the [`Consumer`](super::Consumer) associated with the backing queue if
73+
/// it is waiting to be awoken.
74+
fn wake_consumer(&mut self) {
75+
self.consumer_waker.dequeue().map(|v| v.wake());
76+
}
77+
78+
/// Register `waker` as the waker for this [`Producer`]
79+
fn register_waker<'v>(&mut self, waker: Waker) {
80+
// We can safely overwrite the old waker, as we can only ever have 1 instance
81+
// of `self` waiting to be awoken.
82+
self.producer_waker.enqueue(waker);
83+
}
84+
}
85+
86+
pub struct ProducerFuture<'producer, 'queue, T, const N: usize>
87+
where
88+
T: Unpin,
89+
{
90+
producer: &'producer mut Producer<'queue, T, N>,
91+
value_to_enqueue: Option<T>,
92+
}
93+
94+
impl<T, const N: usize> Future for ProducerFuture<'_, '_, T, N>
95+
where
96+
T: Unpin,
97+
{
98+
type Output = ();
99+
100+
fn poll(
101+
self: core::pin::Pin<&mut Self>,
102+
cx: &mut core::task::Context<'_>,
103+
) -> Poll<Self::Output> {
104+
let try_wake_consumer = |me: &mut Self| {
105+
me.producer.wake_consumer();
106+
Poll::Ready(())
107+
};
108+
109+
let me = self.get_mut();
110+
let prod = &mut me.producer;
111+
let val_to_enqueue = &mut me.value_to_enqueue;
112+
113+
let value = if let Some(value) = val_to_enqueue.take() {
114+
value
115+
} else {
116+
// Try to wake the consumer because we've enqueued our value
117+
return try_wake_consumer(me);
118+
};
119+
120+
let failed_enqueue_value = if let Some(value) = prod.inner.enqueue(value).err() {
121+
value
122+
} else {
123+
// Try to wake the consumer because we've enqueued our value
124+
return try_wake_consumer(me);
125+
};
126+
127+
me.value_to_enqueue = Some(failed_enqueue_value);
128+
129+
me.producer.register_waker(cx.waker().clone());
130+
131+
Poll::Pending
132+
}
133+
}

0 commit comments

Comments
 (0)