1
- //! A fixed capacity Multiple-Producer Multiple-Consumer (MPMC) lock-free queue.
1
+ //! A fixed capacity multiple-producer, multiple-consumer (MPMC) lock-free queue.
2
2
//!
3
- //! NOTE: This module requires atomic CAS operations. On targets where they're not natively available,
4
- //! they are emulated by the [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
3
+ //! **Note:** This module requires atomic compare-and-swap (CAS) instructions. On
4
+ //! targets where they're not natively available, they are emulated by the
5
+ //! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
5
6
//!
6
7
//! # Example
7
8
//!
8
- //! This queue can be constructed in " const context" . Placing it in a `static` variable lets *all*
9
- //! contexts (interrupts/threads/`main`) safely enqueue and dequeue items from it .
9
+ //! This queue can be constructed in ` const` context. Placing it in a `static` variable lets *all*
10
+ //! contexts (interrupts/threads/`main`) safely enqueue and dequeue items.
10
11
//!
11
- //! ``` ignore
12
- //! #![no_main]
13
- //! #![no_std]
14
- //!
15
- //! use panic_semihosting as _;
16
- //!
17
- //! use cortex_m::{asm, peripheral::syst::SystClkSource};
18
- //! use cortex_m_rt::{entry, exception};
19
- //! use cortex_m_semihosting::hprintln;
20
- //! use heapless::mpmc::Queue::<_, 2>;
12
+ //! ```
13
+ //! use core::sync::atomic::{AtomicU8, Ordering};
21
14
//!
22
- //! static Q: Queue::<_, 2><u8> = Q2::new() ;
15
+ //! use heapless::mpmc::Queue ;
23
16
//!
24
- //! #[entry]
25
- //! fn main() -> ! {
26
- //! if let Some(p) = cortex_m::Peripherals::take() {
27
- //! let mut syst = p.SYST;
17
+ //! static Q: Queue<u8, 2> = Queue::new();
28
18
//!
29
- //! // configures the system timer to trigger a SysTick exception every second
30
- //! syst.set_clock_source(SystClkSource::Core);
31
- //! syst.set_reload(12_000_000);
32
- //! syst.enable_counter();
33
- //! syst.enable_interrupt();
34
- //! }
19
+ //! fn main() {
20
+ //! // Configure systick interrupt.
35
21
//!
36
22
//! loop {
37
23
//! if let Some(x) = Q.dequeue() {
38
- //! hprintln !("{}", x).ok( );
24
+ //! println !("{}", x);
39
25
//! } else {
40
- //! asm::wfi();
26
+ //! // Wait for interrupt.
41
27
//! }
28
+ //! # break
42
29
//! }
43
30
//! }
44
31
//!
45
- //! #[exception]
46
- //! fn SysTick() {
47
- //! static mut COUNT: u8 = 0 ;
32
+ //! fn systick() {
33
+ //! static COUNT: AtomicU8 = AtomicU8::new(0);
34
+ //! let count = COUNT.fetch_add(1, Ordering::SeqCst) ;
48
35
//!
49
- //! Q.enqueue(*COUNT).ok();
50
- //! *COUNT += 1 ;
36
+ //! # let _ =
37
+ //! Q.enqueue(count) ;
51
38
//! }
52
39
//! ```
53
40
//!
54
41
//! # Benchmark
55
42
//!
56
- //! Measured on a ARM Cortex-M3 core running at 8 MHz and with zero Flash wait cycles
43
+ //! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled with `-C opt-level=z`:
57
44
//!
58
- //! N| `Q8::<u8>::enqueue().ok()` (`z`) | `Q8::<u8>::dequeue()` (`z`) |
59
- //! -|----------------------------------|-----------------------------|
60
- //! 0|34 |35 |
61
- //! 1|52 |53 |
62
- //! 2|69 |71 |
45
+ //! | Method | Time | N |
46
+ //! |:----------------------------|-----:|---:|
47
+ //! | `Queue::<u8, 8>::enqueue()` | 34 | 0 |
48
+ //! | `Queue::<u8, 8>::enqueue()` | 52 | 1 |
49
+ //! | `Queue::<u8, 8>::enqueue()` | 69 | 2 |
50
+ //! | `Queue::<u8, 8>::dequeue()` | 35 | 0 |
51
+ //! | `Queue::<u8, 8>::dequeue()` | 53 | 1 |
52
+ //! | `Queue::<u8, 8>::dequeue()` | 71 | 2 |
63
53
//!
64
- //! - `N` denotes the number of * interruptions* . On Cortex-M, an interruption consists of an
54
+ //! - N denotes the number of interruptions. On Cortex-M, an interruption consists of an
65
55
//! interrupt handler preempting the would-be atomic section of the `enqueue`/`dequeue`
66
56
//! operation. Note that it does *not* matter if the higher priority handler uses the queue or
67
57
//! not.
68
- //! - All execution times are in clock cycles. 1 clock cycle = 125 ns.
69
- //! - Execution time is *dependent* of `mem::size_of::<T>()`. Both operations include one
70
- //! `memcpy(T)` in their successful path.
71
- //! - The optimization level is indicated in parentheses.
72
- //! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue`
73
- //! and `Ok` is returned by `enqueue`).
74
- //!
75
- //! # Portability
76
- //!
77
- //! This module requires CAS atomic instructions which are not available on all architectures
78
- //! (e.g. ARMv6-M (`thumbv6m-none-eabi`) and MSP430 (`msp430-none-elf`)). These atomics can be
79
- //! emulated however with [`portable-atomic`](https://crates.io/crates/portable-atomic), which is
80
- //! enabled with the `cas` feature and is enabled by default for `thumbv6m-none-eabi` and `riscv32`
81
- //! targets.
58
+ //! - All execution times are in clock cycles (1 clock cycle = 125 ns).
59
+ //! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
60
+ //! `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
61
+ //! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
62
+ //! and `enqueue` returning `Ok`.
82
63
//!
83
64
//! # References
84
65
//!
85
- //! This is an implementation of Dmitry Vyukov's ["Bounded MPMC queue"][0] minus the cache padding.
66
+ //! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
67
+ //! cache padding.
86
68
//!
87
- //! [0 ]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
69
+ //! [bounded MPMC queue ]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
88
70
89
71
use core:: { cell:: UnsafeCell , mem:: MaybeUninit } ;
90
72
@@ -122,18 +104,24 @@ pub struct QueueInner<T, S: Storage> {
122
104
buffer : UnsafeCell < S :: Buffer < Cell < T > > > ,
123
105
}
124
106
125
- /// MPMC queue with a capacity for N elements
126
- /// N must be a power of 2
127
- /// The max value of N is `u8::MAX` - 1 if `mpmc_large` feature is not enabled.
107
+ /// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
108
+ ///
109
+ /// <div class="warning">
110
+ ///
111
+ /// `N` must be a power of 2.
112
+ ///
113
+ /// </div>
114
+ ///
115
+ /// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
128
116
pub type Queue < T , const N : usize > = QueueInner < T , OwnedStorage < N > > ;
129
117
130
- /// MPMC queue with a capacity for N elements
131
- /// N must be a power of 2
132
- /// The max value of N is `u8::MAX` - 1 if `mpmc_large` feature is not enabled .
118
+ /// A [`Queue`] with dynamic capacity.
119
+ ///
120
+ /// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference .
133
121
pub type QueueView < T > = QueueInner < T , ViewStorage > ;
134
122
135
123
impl < T , const N : usize > Queue < T , N > {
136
- /// Creates an empty queue
124
+ /// Creates an empty queue.
137
125
pub const fn new ( ) -> Self {
138
126
const {
139
127
assert ! ( N > 1 ) ;
@@ -156,17 +144,23 @@ impl<T, const N: usize> Queue<T, N> {
156
144
}
157
145
}
158
146
159
- /// Used in `Storage` implementation
147
+ /// Used in `Storage` implementation.
160
148
pub ( crate ) fn as_view_private ( & self ) -> & QueueView < T > {
161
149
self
162
150
}
163
- /// Used in `Storage` implementation
151
+ /// Used in `Storage` implementation.
164
152
pub ( crate ) fn as_view_mut_private ( & mut self ) -> & mut QueueView < T > {
165
153
self
166
154
}
167
155
}
168
156
169
157
impl < T , S : Storage > QueueInner < T , S > {
158
+ /// Returns the maximum number of elements the queue can hold.
159
+ #[ inline]
160
+ pub fn capacity ( & self ) -> usize {
161
+ S :: len ( self . buffer . get ( ) )
162
+ }
163
+
170
164
/// Get a reference to the `Queue`, erasing the `N` const-generic.
171
165
///
172
166
///
@@ -212,14 +206,14 @@ impl<T, S: Storage> QueueInner<T, S> {
212
206
( S :: len ( self . buffer . get ( ) ) - 1 ) as _
213
207
}
214
208
215
- /// Returns the item in the front of the queue, or `None` if the queue is empty
209
+ /// Returns the item in the front of the queue, or `None` if the queue is empty.
216
210
pub fn dequeue ( & self ) -> Option < T > {
217
211
unsafe { dequeue ( S :: as_ptr ( self . buffer . get ( ) ) , & self . dequeue_pos , self . mask ( ) ) }
218
212
}
219
213
220
- /// Adds an `item` to the end of the queue
214
+ /// Adds an `item` to the end of the queue.
221
215
///
222
- /// Returns back the `item` if the queue is full
216
+ /// Returns back the `item` if the queue is full.
223
217
pub fn enqueue ( & self , item : T ) -> Result < ( ) , T > {
224
218
unsafe {
225
219
enqueue (
@@ -240,7 +234,7 @@ impl<T, const N: usize> Default for Queue<T, N> {
240
234
241
235
impl < T , S : Storage > Drop for QueueInner < T , S > {
242
236
fn drop ( & mut self ) {
243
- // drop all contents currently in the queue
237
+ // Drop all elements currently in the queue.
244
238
while self . dequeue ( ) . is_some ( ) { }
245
239
}
246
240
}
@@ -416,6 +410,8 @@ mod tests {
416
410
417
411
let q: Queue < u8 , CAPACITY > = Queue :: new ( ) ;
418
412
413
+ assert_eq ! ( q. capacity( ) , CAPACITY ) ;
414
+
419
415
for _ in 0 ..CAPACITY {
420
416
q. enqueue ( 0xAA ) . unwrap ( ) ;
421
417
}
0 commit comments