@@ -8,11 +8,11 @@ use pin_utils::unsafe_pinned;
8
8
/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method.
9
9
#[ must_use = "streams do nothing unless polled" ]
10
10
pub struct FlattenStream < Fut : Future > {
11
- state : State < Fut >
11
+ state : State < Fut , Fut :: Output > ,
12
12
}
13
13
14
14
impl < Fut : Future > FlattenStream < Fut > {
15
- unsafe_pinned ! ( state: State <Fut >) ;
15
+ unsafe_pinned ! ( state: State <Fut , Fut :: Output >) ;
16
16
17
17
pub ( super ) fn new ( future : Fut ) -> FlattenStream < Fut > {
18
18
FlattenStream {
@@ -33,11 +33,25 @@ impl<Fut> fmt::Debug for FlattenStream<Fut>
33
33
}
34
34
35
35
#[ derive( Debug ) ]
36
- enum State < Fut : Future > {
36
+ enum State < Fut , St > {
37
37
// future is not yet called or called and not ready
38
38
Future ( Fut ) ,
39
39
// future resolved to Stream
40
- Stream ( Fut :: Output ) ,
40
+ Stream ( St ) ,
41
+ }
42
+
43
+ impl < Fut , St > State < Fut , St > {
44
+ fn get_pin_mut < ' a > ( self : Pin < & ' a mut Self > ) -> State < Pin < & ' a mut Fut > , Pin < & ' a mut St > > {
45
+ // safety: data is never moved via the resulting &mut reference
46
+ match unsafe { Pin :: get_unchecked_mut ( self ) } {
47
+ // safety: the future we're re-pinning here will never be moved;
48
+ // it will just be polled, then dropped in place
49
+ State :: Future ( f) => State :: Future ( unsafe { Pin :: new_unchecked ( f) } ) ,
50
+ // safety: the stream we're repinning here will never be moved;
51
+ // it will just be polled, then dropped in place
52
+ State :: Stream ( s) => State :: Stream ( unsafe { Pin :: new_unchecked ( s) } ) ,
53
+ }
54
+ }
41
55
}
42
56
43
57
impl < Fut > FusedStream for FlattenStream < Fut >
@@ -60,23 +74,15 @@ impl<Fut> Stream for FlattenStream<Fut>
60
74
61
75
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
62
76
loop {
63
- // safety: data is never moved via the resulting &mut reference
64
- match & mut unsafe { Pin :: get_unchecked_mut ( self . as_mut ( ) ) } . state {
77
+ match self . as_mut ( ) . state ( ) . get_pin_mut ( ) {
65
78
State :: Future ( f) => {
66
- // safety: the future we're re-pinning here will never be moved;
67
- // it will just be polled, then dropped in place
68
- let stream = ready ! ( unsafe { Pin :: new_unchecked( f) } . poll( cx) ) ;
69
-
79
+ let stream = ready ! ( f. poll( cx) ) ;
70
80
// Future resolved to stream.
71
81
// We do not return, but poll that
72
82
// stream in the next loop iteration.
73
83
self . as_mut ( ) . state ( ) . set ( State :: Stream ( stream) ) ;
74
84
}
75
- State :: Stream ( s) => {
76
- // safety: the stream we're repinning here will never be moved;
77
- // it will just be polled, then dropped in place
78
- return unsafe { Pin :: new_unchecked ( s) } . poll_next ( cx) ;
79
- }
85
+ State :: Stream ( s) => return s. poll_next ( cx) ,
80
86
}
81
87
}
82
88
}
0 commit comments