7
7
#![ deny( missing_docs) ]
8
8
#![ allow( incomplete_features) ]
9
9
10
- use core:: future:: { poll_fn , Future } ;
10
+ use core:: future:: Future ;
11
11
use core:: pin:: Pin ;
12
12
use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
13
13
use core:: task:: { Poll , Waker } ;
14
- use futures_util:: {
15
- future:: { select, Either } ,
16
- pin_mut,
17
- } ;
18
14
use linked_list:: { Link , LinkedList } ;
19
15
pub use monotonic:: Monotonic ;
20
- use rtic_common:: dropper:: OnDrop ;
21
16
22
17
pub mod half_period_counter;
23
18
mod linked_list;
@@ -75,26 +70,6 @@ pub struct TimerQueue<Mono: Monotonic> {
75
70
/// This indicates that there was a timeout.
76
71
pub struct TimeoutError ;
77
72
78
- /// This is needed to make the async closure in `delay_until` accept that we "share"
79
- /// the link possible between threads.
80
- struct LinkPtr < Mono : Monotonic > ( * mut Option < linked_list:: Link < WaitingWaker < Mono > > > ) ;
81
-
82
- impl < Mono : Monotonic > Clone for LinkPtr < Mono > {
83
- fn clone ( & self ) -> Self {
84
- LinkPtr ( self . 0 )
85
- }
86
- }
87
-
88
- impl < Mono : Monotonic > LinkPtr < Mono > {
89
- /// This will dereference the pointer stored within and give out an `&mut`.
90
- unsafe fn get ( & mut self ) -> & mut Option < linked_list:: Link < WaitingWaker < Mono > > > {
91
- & mut * self . 0
92
- }
93
- }
94
-
95
- unsafe impl < Mono : Monotonic > Send for LinkPtr < Mono > { }
96
- unsafe impl < Mono : Monotonic > Sync for LinkPtr < Mono > { }
97
-
98
73
impl < Mono : Monotonic > TimerQueue < Mono > {
99
74
/// Make a new queue.
100
75
pub const fn new ( ) -> Self {
@@ -166,29 +141,25 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
166
141
}
167
142
168
143
/// Timeout at a specific time.
169
- pub async fn timeout_at < F : Future > (
170
- & self ,
171
- instant : Mono :: Instant ,
172
- future : F ,
173
- ) -> Result < F :: Output , TimeoutError > {
174
- let delay = self . delay_until ( instant) ;
175
-
176
- pin_mut ! ( future) ;
177
- pin_mut ! ( delay) ;
178
-
179
- match select ( future, delay) . await {
180
- Either :: Left ( ( r, _) ) => Ok ( r) ,
181
- Either :: Right ( _) => Err ( TimeoutError ) ,
144
+ pub fn timeout_at < F : Future > ( & self , instant : Mono :: Instant , future : F ) -> Timeout < ' _ , Mono , F > {
145
+ Timeout {
146
+ delay : Delay :: < Mono > {
147
+ instant,
148
+ queue : & self . queue ,
149
+ link_ptr : None ,
150
+ marker : AtomicUsize :: new ( 0 ) ,
151
+ } ,
152
+ future,
182
153
}
183
154
}
184
155
185
156
/// Timeout after at least a specific duration.
186
157
#[ inline]
187
- pub async fn timeout_after < F : Future > (
158
+ pub fn timeout_after < F : Future > (
188
159
& self ,
189
160
duration : Mono :: Duration ,
190
161
future : F ,
191
- ) -> Result < F :: Output , TimeoutError > {
162
+ ) -> Timeout < ' _ , Mono , F > {
192
163
let now = Mono :: now ( ) ;
193
164
let mut timeout = now + duration;
194
165
if now != timeout {
@@ -197,12 +168,12 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
197
168
198
169
// Wait for one period longer, because by definition timers have an uncertainty
199
170
// of one period, so waiting for 'at least' needs to compensate for that.
200
- self . timeout_at ( timeout, future) . await
171
+ self . timeout_at ( timeout, future)
201
172
}
202
173
203
174
/// Delay for at least some duration of time.
204
175
#[ inline]
205
- pub async fn delay ( & self , duration : Mono :: Duration ) {
176
+ pub fn delay ( & self , duration : Mono :: Duration ) -> Delay < ' _ , Mono > {
206
177
let now = Mono :: now ( ) ;
207
178
let mut timeout = now + duration;
208
179
if now != timeout {
@@ -211,79 +182,111 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
211
182
212
183
// Wait for one period longer, because by definition timers have an uncertainty
213
184
// of one period, so waiting for 'at least' needs to compensate for that.
214
- self . delay_until ( timeout) . await ;
185
+ self . delay_until ( timeout)
215
186
}
216
187
217
188
/// Delay to some specific time instant.
218
- pub async fn delay_until ( & self , instant : Mono :: Instant ) {
189
+ pub fn delay_until ( & self , instant : Mono :: Instant ) -> Delay < ' _ , Mono > {
219
190
if !self . initialized . load ( Ordering :: Relaxed ) {
220
191
panic ! (
221
192
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
222
193
) ;
223
194
}
195
+ Delay :: < Mono > {
196
+ instant,
197
+ queue : & self . queue ,
198
+ link_ptr : None ,
199
+ marker : AtomicUsize :: new ( 0 ) ,
200
+ }
201
+ }
202
+ }
224
203
225
- let mut link_ptr: Option < linked_list:: Link < WaitingWaker < Mono > > > = None ;
204
+ /// Future returned by `delay` and `delay_until`.
205
+ pub struct Delay < ' q , Mono : Monotonic > {
206
+ instant : Mono :: Instant ,
207
+ queue : & ' q LinkedList < WaitingWaker < Mono > > ,
208
+ link_ptr : Option < linked_list:: Link < WaitingWaker < Mono > > > ,
209
+ marker : AtomicUsize ,
210
+ }
226
211
227
- // Make this future `Drop`-safe
228
- // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
229
- let mut link_ptr =
230
- LinkPtr ( & mut link_ptr as * mut Option < linked_list:: Link < WaitingWaker < Mono > > > ) ;
231
- let mut link_ptr2 = link_ptr. clone ( ) ;
212
+ impl < ' q , Mono : Monotonic > Future for Delay < ' q , Mono > {
213
+ type Output = ( ) ;
232
214
233
- let queue = & self . queue ;
234
- let marker = & AtomicUsize :: new ( 0 ) ;
215
+ fn poll ( self : Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
216
+ // SAFETY: We ensure we never move anything out of this.
217
+ let this = unsafe { self . get_unchecked_mut ( ) } ;
235
218
236
- let dropper = OnDrop :: new ( || {
237
- queue . delete ( marker . load ( Ordering :: Relaxed ) ) ;
238
- } ) ;
219
+ if Mono :: now ( ) >= this . instant {
220
+ return Poll :: Ready ( ( ) ) ;
221
+ }
239
222
240
- poll_fn ( |cx| {
241
- if Mono :: now ( ) >= instant {
242
- return Poll :: Ready ( ( ) ) ;
223
+ // SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
224
+ // in `drop` we can't do this access concurrently with queue removal.
225
+ let link = & mut this. link_ptr ;
226
+ if link. is_none ( ) {
227
+ let link_ref = link. insert ( Link :: new ( WaitingWaker {
228
+ waker : cx. waker ( ) . clone ( ) ,
229
+ release_at : this. instant ,
230
+ was_popped : AtomicBool :: new ( false ) ,
231
+ } ) ) ;
232
+
233
+ // SAFETY(new_unchecked): The address to the link is stable as it is defined
234
+ // outside this stack frame.
235
+ // SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
236
+ // the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
237
+ // queue on drop, which happens before the struct and thus `link_ptr` goes out of
238
+ // scope.
239
+ let ( head_updated, addr) = unsafe { this. queue . insert ( Pin :: new_unchecked ( link_ref) ) } ;
240
+ this. marker . store ( addr, Ordering :: Relaxed ) ;
241
+ if head_updated {
242
+ Mono :: pend_interrupt ( )
243
243
}
244
+ }
244
245
245
- // SAFETY: This pointer is only dereferenced here and on drop of the future
246
- // which happens outside this `poll_fn`'s stack frame, so this mutable access cannot
247
- // happen at the same time as `dropper` runs.
248
- let link = unsafe { link_ptr2. get ( ) } ;
249
- if link. is_none ( ) {
250
- let link_ref = link. insert ( Link :: new ( WaitingWaker {
251
- waker : cx. waker ( ) . clone ( ) ,
252
- release_at : instant,
253
- was_popped : AtomicBool :: new ( false ) ,
254
- } ) ) ;
255
-
256
- // SAFETY(new_unchecked): The address to the link is stable as it is defined
257
- //outside this stack frame.
258
- // SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and
259
- // we make sure in `dropper` that the link is removed from the queue before
260
- // dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives
261
- // until the end of the stack frame.
262
- let ( head_updated, addr) = unsafe { queue. insert ( Pin :: new_unchecked ( link_ref) ) } ;
263
-
264
- marker. store ( addr, Ordering :: Relaxed ) ;
265
-
266
- if head_updated {
267
- // Pend the monotonic handler if the queue head was updated.
268
- Mono :: pend_interrupt ( )
269
- }
246
+ Poll :: Pending
247
+ }
248
+ }
249
+
250
+ impl < ' q , Mono : Monotonic > Drop for Delay < ' q , Mono > {
251
+ fn drop ( & mut self ) {
252
+ // SAFETY: The dropper is cannot be run at the same time as poll, so we can't end up
253
+ // derefencing this concurrently to the one in `poll`.
254
+ match self . link_ptr . as_ref ( ) {
255
+ None => return ,
256
+ // If it was popped from the queue there is no need to run delete
257
+ Some ( link) if link. val . was_popped . load ( Ordering :: Relaxed ) => return ,
258
+ _ => { }
259
+ }
260
+ self . queue . delete ( self . marker . load ( Ordering :: Relaxed ) ) ;
261
+ }
262
+ }
263
+
264
+ /// Future returned by `timeout` and `timeout_at`.
265
+ pub struct Timeout < ' q , Mono : Monotonic , F > {
266
+ delay : Delay < ' q , Mono > ,
267
+ future : F ,
268
+ }
269
+
270
+ impl < ' q , Mono : Monotonic , F : Future > Future for Timeout < ' q , Mono , F > {
271
+ type Output = Result < F :: Output , TimeoutError > ;
272
+
273
+ fn poll ( self : Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
274
+ let inner = unsafe { self . get_unchecked_mut ( ) } ;
275
+
276
+ {
277
+ let f = unsafe { Pin :: new_unchecked ( & mut inner. future ) } ;
278
+ if let Poll :: Ready ( v) = f. poll ( cx) {
279
+ return Poll :: Ready ( Ok ( v) ) ;
270
280
}
281
+ }
271
282
272
- Poll :: Pending
273
- } )
274
- . await ;
275
-
276
- // SAFETY: We only run this and dereference the pointer if we have
277
- // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
278
- // of this pointer is in the `poll_fn`.
279
- if let Some ( link) = unsafe { link_ptr. get ( ) } {
280
- if link. val . was_popped . load ( Ordering :: Relaxed ) {
281
- // If it was popped from the queue there is no need to run delete
282
- dropper. defuse ( ) ;
283
+ {
284
+ let d = unsafe { Pin :: new_unchecked ( & mut inner. delay ) } ;
285
+ if d. poll ( cx) . is_ready ( ) {
286
+ return Poll :: Ready ( Err ( TimeoutError ) ) ;
283
287
}
284
- } else {
285
- // Make sure that our link is deleted from the list before we drop this stack
286
- drop ( dropper) ;
287
288
}
289
+
290
+ Poll :: Pending
288
291
}
289
292
}
0 commit comments