3
3
use crate :: linked_list:: { self , Link , LinkedList } ;
4
4
use crate :: TimeoutError ;
5
5
6
- use core:: future:: { poll_fn , Future } ;
6
+ use core:: future:: Future ;
7
7
use core:: pin:: Pin ;
8
8
use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
9
9
use core:: task:: { Poll , Waker } ;
10
- use futures_util:: {
11
- future:: { select, Either } ,
12
- pin_mut,
13
- } ;
14
- use rtic_common:: dropper:: OnDrop ;
15
10
16
11
mod backend;
17
12
mod tick_type;
@@ -67,26 +62,6 @@ pub struct TimerQueue<Backend: TimerQueueBackend> {
67
62
initialized : AtomicBool ,
68
63
}
69
64
70
- /// This is needed to make the async closure in `delay_until` accept that we "share"
71
- /// the link possible between threads.
72
- struct LinkPtr < Backend : TimerQueueBackend > ( * mut Option < linked_list:: Link < WaitingWaker < Backend > > > ) ;
73
-
74
- impl < Backend : TimerQueueBackend > Clone for LinkPtr < Backend > {
75
- fn clone ( & self ) -> Self {
76
- LinkPtr ( self . 0 )
77
- }
78
- }
79
-
80
- impl < Backend : TimerQueueBackend > LinkPtr < Backend > {
81
- /// This will dereference the pointer stored within and give out an `&mut`.
82
- unsafe fn get ( & mut self ) -> & mut Option < linked_list:: Link < WaitingWaker < Backend > > > {
83
- & mut * self . 0
84
- }
85
- }
86
-
87
- unsafe impl < Backend : TimerQueueBackend > Send for LinkPtr < Backend > { }
88
- unsafe impl < Backend : TimerQueueBackend > Sync for LinkPtr < Backend > { }
89
-
90
65
impl < Backend : TimerQueueBackend > Default for TimerQueue < Backend > {
91
66
fn default ( ) -> Self {
92
67
Self :: new ( )
@@ -112,7 +87,7 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
112
87
pub fn initialize ( & self , backend : Backend ) {
113
88
self . initialized . store ( true , Ordering :: SeqCst ) ;
114
89
115
- // Don't run drop on `Mono `
90
+ // Don't run drop on `Backend `
116
91
core:: mem:: forget ( backend) ;
117
92
}
118
93
@@ -164,29 +139,29 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
164
139
}
165
140
166
141
/// Timeout at a specific time.
167
- pub async fn timeout_at < F : Future > (
142
+ pub fn timeout_at < F : Future > (
168
143
& self ,
169
144
instant : Backend :: Ticks ,
170
145
future : F ,
171
- ) -> Result < F :: Output , TimeoutError > {
172
- let delay = self . delay_until ( instant ) ;
173
-
174
- pin_mut ! ( future ) ;
175
- pin_mut ! ( delay ) ;
176
-
177
- match select ( future , delay ) . await {
178
- Either :: Left ( ( r , _ ) ) => Ok ( r ) ,
179
- Either :: Right ( _ ) => Err ( TimeoutError ) ,
146
+ ) -> Timeout < ' _ , Backend , F > {
147
+ Timeout {
148
+ delay : Delay :: < Backend > {
149
+ instant ,
150
+ queue : & self . queue ,
151
+ link_ptr : None ,
152
+ marker : AtomicUsize :: new ( 0 ) ,
153
+ } ,
154
+ future ,
180
155
}
181
156
}
182
157
183
158
/// Timeout after at least a specific duration.
184
159
#[ inline]
185
- pub async fn timeout_after < F : Future > (
160
+ pub fn timeout_after < F : Future > (
186
161
& self ,
187
162
duration : Backend :: Ticks ,
188
163
future : F ,
189
- ) -> Result < F :: Output , TimeoutError > {
164
+ ) -> Timeout < ' _ , Backend , F > {
190
165
let now = Backend :: now ( ) ;
191
166
let mut timeout = now. wrapping_add ( duration) ;
192
167
if now != timeout {
@@ -195,12 +170,12 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
195
170
196
171
// Wait for one period longer, because by definition timers have an uncertainty
197
172
// of one period, so waiting for 'at least' needs to compensate for that.
198
- self . timeout_at ( timeout, future) . await
173
+ self . timeout_at ( timeout, future)
199
174
}
200
175
201
176
/// Delay for at least some duration of time.
202
177
#[ inline]
203
- pub async fn delay ( & self , duration : Backend :: Ticks ) {
178
+ pub fn delay ( & self , duration : Backend :: Ticks ) -> Delay < ' _ , Backend > {
204
179
let now = Backend :: now ( ) ;
205
180
let mut timeout = now. wrapping_add ( duration) ;
206
181
if now != timeout {
@@ -209,79 +184,111 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
209
184
210
185
// Wait for one period longer, because by definition timers have an uncertainty
211
186
// of one period, so waiting for 'at least' needs to compensate for that.
212
- self . delay_until ( timeout) . await ;
187
+ self . delay_until ( timeout)
213
188
}
214
189
215
190
/// Delay to some specific time instant.
216
- pub async fn delay_until ( & self , instant : Backend :: Ticks ) {
191
+ pub fn delay_until ( & self , instant : Backend :: Ticks ) -> Delay < ' _ , Backend > {
217
192
if !self . initialized . load ( Ordering :: Relaxed ) {
218
193
panic ! (
219
194
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
220
195
) ;
221
196
}
197
+ Delay :: < Backend > {
198
+ instant,
199
+ queue : & self . queue ,
200
+ link_ptr : None ,
201
+ marker : AtomicUsize :: new ( 0 ) ,
202
+ }
203
+ }
204
+ }
222
205
223
- let mut link_ptr: Option < linked_list:: Link < WaitingWaker < Backend > > > = None ;
206
+ /// Future returned by `delay` and `delay_until`.
207
+ pub struct Delay < ' q , Backend : TimerQueueBackend > {
208
+ instant : Backend :: Ticks ,
209
+ queue : & ' q LinkedList < WaitingWaker < Backend > > ,
210
+ link_ptr : Option < linked_list:: Link < WaitingWaker < Backend > > > ,
211
+ marker : AtomicUsize ,
212
+ }
224
213
225
- // Make this future `Drop`-safe
226
- // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
227
- let mut link_ptr =
228
- LinkPtr ( & mut link_ptr as * mut Option < linked_list:: Link < WaitingWaker < Backend > > > ) ;
229
- let mut link_ptr2 = link_ptr. clone ( ) ;
214
+ impl < ' q , Backend : TimerQueueBackend > Future for Delay < ' q , Backend > {
215
+ type Output = ( ) ;
230
216
231
- let queue = & self . queue ;
232
- let marker = & AtomicUsize :: new ( 0 ) ;
217
+ fn poll ( self : Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
218
+ // SAFETY: We ensure we never move anything out of this.
219
+ let this = unsafe { self . get_unchecked_mut ( ) } ;
233
220
234
- let dropper = OnDrop :: new ( || {
235
- queue . delete ( marker . load ( Ordering :: Relaxed ) ) ;
236
- } ) ;
221
+ if Backend :: now ( ) . is_at_least ( this . instant ) {
222
+ return Poll :: Ready ( ( ) ) ;
223
+ }
237
224
238
- poll_fn ( |cx| {
239
- if Backend :: now ( ) . is_at_least ( instant) {
240
- return Poll :: Ready ( ( ) ) ;
225
+ // SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
226
+ // in `drop` we can't do this access concurrently with queue removal.
227
+ let link = & mut this. link_ptr ;
228
+ if link. is_none ( ) {
229
+ let link_ref = link. insert ( Link :: new ( WaitingWaker {
230
+ waker : cx. waker ( ) . clone ( ) ,
231
+ release_at : this. instant ,
232
+ was_popped : AtomicBool :: new ( false ) ,
233
+ } ) ) ;
234
+
235
+ // SAFETY(new_unchecked): The address to the link is stable as it is defined
236
+ // outside this stack frame.
237
+ // SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
238
+ // the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
239
+ // queue on drop, which happens before the struct and thus `link_ptr` goes out of
240
+ // scope.
241
+ let ( head_updated, addr) = unsafe { this. queue . insert ( Pin :: new_unchecked ( link_ref) ) } ;
242
+ this. marker . store ( addr, Ordering :: Relaxed ) ;
243
+ if head_updated {
244
+ Backend :: pend_interrupt ( )
241
245
}
246
+ }
242
247
243
- // SAFETY: This pointer is only dereferenced here and on drop of the future
244
- // which happens outside this `poll_fn`'s stack frame, so this mutable access cannot
245
- // happen at the same time as `dropper` runs.
246
- let link = unsafe { link_ptr2. get ( ) } ;
247
- if link. is_none ( ) {
248
- let link_ref = link. insert ( Link :: new ( WaitingWaker {
249
- waker : cx. waker ( ) . clone ( ) ,
250
- release_at : instant,
251
- was_popped : AtomicBool :: new ( false ) ,
252
- } ) ) ;
253
-
254
- // SAFETY(new_unchecked): The address to the link is stable as it is defined
255
- //outside this stack frame.
256
- // SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and
257
- // we make sure in `dropper` that the link is removed from the queue before
258
- // dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives
259
- // until the end of the stack frame.
260
- let ( head_updated, addr) = unsafe { queue. insert ( Pin :: new_unchecked ( link_ref) ) } ;
261
-
262
- marker. store ( addr, Ordering :: Relaxed ) ;
263
-
264
- if head_updated {
265
- // Pend the monotonic handler if the queue head was updated.
266
- Backend :: pend_interrupt ( )
267
- }
248
+ Poll :: Pending
249
+ }
250
+ }
251
+
252
+ impl < ' q , Backend : TimerQueueBackend > Drop for Delay < ' q , Backend > {
253
+ fn drop ( & mut self ) {
254
+ // SAFETY: Drop cannot be run at the same time as poll, so we can't end up
255
+ // derefencing this concurrently to the one in `poll`.
256
+ match self . link_ptr . as_ref ( ) {
257
+ None => return ,
258
+ // If it was popped from the queue there is no need to run delete
259
+ Some ( link) if link. val . was_popped . load ( Ordering :: Relaxed ) => return ,
260
+ _ => { }
261
+ }
262
+ self . queue . delete ( self . marker . load ( Ordering :: Relaxed ) ) ;
263
+ }
264
+ }
265
+
266
+ /// Future returned by `timeout` and `timeout_at`.
267
+ pub struct Timeout < ' q , Backend : TimerQueueBackend , F > {
268
+ delay : Delay < ' q , Backend > ,
269
+ future : F ,
270
+ }
271
+
272
+ impl < ' q , Backend : TimerQueueBackend , F : Future > Future for Timeout < ' q , Backend , F > {
273
+ type Output = Result < F :: Output , TimeoutError > ;
274
+
275
+ fn poll ( self : Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
276
+ let inner = unsafe { self . get_unchecked_mut ( ) } ;
277
+
278
+ {
279
+ let f = unsafe { Pin :: new_unchecked ( & mut inner. future ) } ;
280
+ if let Poll :: Ready ( v) = f. poll ( cx) {
281
+ return Poll :: Ready ( Ok ( v) ) ;
268
282
}
283
+ }
269
284
270
- Poll :: Pending
271
- } )
272
- . await ;
273
-
274
- // SAFETY: We only run this and dereference the pointer if we have
275
- // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
276
- // of this pointer is in the `poll_fn`.
277
- if let Some ( link) = unsafe { link_ptr. get ( ) } {
278
- if link. val . was_popped . load ( Ordering :: Relaxed ) {
279
- // If it was popped from the queue there is no need to run delete
280
- dropper. defuse ( ) ;
285
+ {
286
+ let d = unsafe { Pin :: new_unchecked ( & mut inner. delay ) } ;
287
+ if d. poll ( cx) . is_ready ( ) {
288
+ return Poll :: Ready ( Err ( TimeoutError ) ) ;
281
289
}
282
- } else {
283
- // Make sure that our link is deleted from the list before we drop this stack
284
- drop ( dropper) ;
285
290
}
291
+
292
+ Poll :: Pending
286
293
}
287
294
}
0 commit comments