1
1
//! Extensions for Futures types.
2
2
3
+ use pin_project:: pin_project;
3
4
use std:: future:: Future ;
4
5
use std:: io;
5
6
use std:: pin:: Pin ;
@@ -13,31 +14,26 @@ use super::Delay;
13
14
/// A future returned by methods in the [`FutureExt`] trait.
14
15
///
15
16
/// [`FutureExt.timeout`]: trait.FutureExt.html
17
+ #[ pin_project]
16
18
#[ derive( Debug ) ]
17
19
pub struct Timeout < F : Future > {
20
+ #[ pin]
18
21
future : F ,
22
+ #[ pin]
19
23
delay : Delay ,
20
24
}
21
25
22
26
impl < F : Future > Future for Timeout < F > {
23
27
type Output = Result < F :: Output , io:: Error > ;
24
28
25
- fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
26
- // This pinning projection is safe because:
27
- // 1. `Timeout` is only Unpin when `F` is Unpin. (Ok for default auto impl)
28
- // 2. `drop` never moves out of `F`. (No manual `Drop` impl and no `#[repr(packed)]`)
29
- // 3. `drop` on `F` must be called before overwritten or deallocated. (No manual `Drop` impl)
30
- // 4. No other operation provided for moving out `F`. (Ok)
31
- let ( future, delay) = unsafe {
32
- let Timeout { future, delay } = self . get_unchecked_mut ( ) ;
33
- ( Pin :: new_unchecked ( future) , Pin :: new ( delay) )
34
- } ;
29
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
30
+ let this = self . project ( ) ;
35
31
36
- if let Poll :: Ready ( t) = future. poll ( cx) {
32
+ if let Poll :: Ready ( t) = this . future . poll ( cx) {
37
33
return Poll :: Ready ( Ok ( t) ) ;
38
34
}
39
35
40
- delay
36
+ this . delay
41
37
. poll ( cx)
42
38
. map ( |_| Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , "future timed out" ) ) )
43
39
}
@@ -139,35 +135,29 @@ impl<T: Future> FutureExt for T {}
139
135
/// A stream returned by methods in the [`StreamExt`] trait.
140
136
///
141
137
/// [`StreamExt`]: trait.StreamExt.html
138
+ #[ pin_project]
142
139
#[ derive( Debug ) ]
143
140
pub struct TimeoutStream < S : Stream > {
141
+ #[ pin]
144
142
timeout : Delay ,
145
143
dur : Duration ,
144
+ #[ pin]
146
145
stream : S ,
147
146
}
148
147
149
148
impl < S : Stream > Stream for TimeoutStream < S > {
150
149
type Item = Result < S :: Item , io:: Error > ;
151
150
152
- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
153
- // This pinning projection is safe.
154
- // See detail in `Timeout::poll`.
155
- let ( mut timeout, dur, stream) = unsafe {
156
- let TimeoutStream {
157
- timeout,
158
- dur,
159
- stream,
160
- } = self . get_unchecked_mut ( ) ;
161
- ( Pin :: new ( timeout) , Pin :: new ( dur) , Pin :: new_unchecked ( stream) )
162
- } ;
151
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
152
+ let mut this = self . project ( ) ;
163
153
164
- if let Poll :: Ready ( s) = stream. poll_next ( cx) {
165
- timeout. set ( Delay :: new ( * dur) ) ;
154
+ if let Poll :: Ready ( s) = this . stream . as_mut ( ) . poll_next ( cx) {
155
+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
166
156
return Poll :: Ready ( Ok ( s) . transpose ( ) ) ;
167
157
}
168
158
169
- Pin :: new ( & mut * timeout) . poll ( cx) . map ( |_| {
170
- timeout. set ( Delay :: new ( * dur) ) ;
159
+ this . timeout . as_mut ( ) . poll ( cx) . map ( |_| {
160
+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
171
161
Some ( Err ( io:: Error :: new (
172
162
io:: ErrorKind :: TimedOut ,
173
163
"future timed out" ,
@@ -223,37 +213,31 @@ impl<S: Stream> StreamExt for S {}
223
213
/// A stream returned by methods in the [`StreamExt`] trait.
224
214
///
225
215
/// [`StreamExt`]: trait.StreamExt.html
216
+ #[ pin_project]
226
217
#[ derive( Debug ) ]
227
218
pub struct TimeoutAsyncRead < S : AsyncRead > {
219
+ #[ pin]
228
220
timeout : Delay ,
229
221
dur : Duration ,
222
+ #[ pin]
230
223
stream : S ,
231
224
}
232
225
233
226
impl < S : AsyncRead > AsyncRead for TimeoutAsyncRead < S > {
234
227
fn poll_read (
235
- self : Pin < & mut Self > ,
228
+ mut self : Pin < & mut Self > ,
236
229
cx : & mut Context < ' _ > ,
237
230
buf : & mut [ u8 ] ,
238
231
) -> Poll < Result < usize , io:: Error > > {
239
- // This pinning projection is safe.
240
- // See detail in `Timeout::poll`.
241
- let ( mut timeout, dur, stream) = unsafe {
242
- let TimeoutAsyncRead {
243
- timeout,
244
- dur,
245
- stream,
246
- } = self . get_unchecked_mut ( ) ;
247
- ( Pin :: new ( timeout) , Pin :: new ( dur) , Pin :: new_unchecked ( stream) )
248
- } ;
232
+ let mut this = self . project ( ) ;
249
233
250
- if let Poll :: Ready ( s) = stream. poll_read ( cx, buf) {
251
- timeout. set ( Delay :: new ( * dur) ) ;
234
+ if let Poll :: Ready ( s) = this . stream . as_mut ( ) . poll_read ( cx, buf) {
235
+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
252
236
return Poll :: Ready ( s) ;
253
237
}
254
238
255
- Pin :: new ( & mut * timeout) . poll ( cx) . map ( |_| {
256
- timeout. set ( Delay :: new ( * dur) ) ;
239
+ this . timeout . as_mut ( ) . poll ( cx) . map ( |_| {
240
+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
257
241
Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , "future timed out" ) )
258
242
} )
259
243
}
0 commit comments