@@ -45,59 +45,102 @@ use crate::task::{Context, Poll};
45
45
/// # };
46
46
/// ```
47
47
#[ unstable( feature = "future_join" , issue = "91642" ) ]
48
- pub macro join {
49
- ( $( $fut: expr) , * $( , ) ?) => {
50
- join ! { @count: ( ) , @futures: { } , @rest: ( $( $fut, ) * ) }
51
- } ,
52
- // Recurse until we have the position of each future in the tuple
48
+ pub macro join ( $( $fut: expr) , + $( , ) ? ) {
49
+ // Funnel through an internal macro not to leak implementation details.
50
+ join_internal ! {
51
+ current_position[ ]
52
+ futures_and_positions[ ]
53
+ munching[ $( $fut) + ]
54
+ }
55
+ }
56
+
57
+ /// To be able to *name* the i-th future in the tuple (say we want the .4-th),
58
+ /// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;`
59
+ /// In order to do that, we need to generate a `i`-long repetition of `_`,
60
+ /// for each i-th fut. Hence the recursive muncher approach.
61
+ macro join_internal {
62
+ // Recursion step: map each future with its "position" (underscore count).
53
63
(
54
- // A token for each future that has been expanded: "_ _ _"
55
- @count : ( $( $count: tt) * ) ,
56
- // Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
57
- @futures : { $( $fut: tt) * } ,
58
- // Take a future from @rest to expand
59
- @rest : ( $current: expr, $( $rest: tt) * )
60
- ) => {
61
- join ! {
62
- @count: ( $( $count) * _) ,
63
- @futures: { $( $fut) * $current => ( $( $count) * ) , } ,
64
- @rest: ( $( $rest) * )
64
+ // Accumulate a token for each future that has been expanded: "_ _ _".
65
+ current_position[
66
+ $( $underscores: tt) *
67
+ ]
68
+ // Accumulate Futures and their positions in the tuple: `_0th () _1st ( _ ) …`.
69
+ futures_and_positions[
70
+ $( $acc: tt) *
71
+ ]
72
+ // Munch one future.
73
+ munching[
74
+ $current: tt
75
+ $( $rest: tt) *
76
+ ]
77
+ ) => (
78
+ join_internal ! {
79
+ current_position[
80
+ $( $underscores) *
81
+ _
82
+ ]
83
+ futures_and_positions[
84
+ $( $acc) *
85
+ $current ( $( $underscores) * )
86
+ ]
87
+ munching[
88
+ $( $rest) *
89
+ ]
65
90
}
66
- } ,
67
- // Now generate the output future
68
- (
69
- @count: ( $( $count: tt) * ) ,
70
- @futures : {
71
- $( $( @$f: tt) ? $fut: expr => ( $( $pos: tt) * ) , ) *
72
- } ,
73
- @rest : ( )
74
- ) => {
75
- async move {
76
- let mut futures = ( $( MaybeDone :: Future ( $fut) , ) * ) ;
91
+ ) ,
77
92
93
+ // End of recursion: generate the output future.
94
+ (
95
+ current_position $_: tt
96
+ futures_and_positions[
97
+ $(
98
+ $fut_expr: tt ( $( $pos: tt) * )
99
+ ) *
100
+ ]
101
+ // Nothing left to munch.
102
+ munching[ ]
103
+ ) => (
104
+ match ( $( MaybeDone :: Future ( $fut_expr) , ) * ) { futures => async {
105
+ let mut futures = futures;
106
+ // SAFETY: this is `pin_mut!`.
107
+ let futures = unsafe { Pin :: new_unchecked ( & mut futures) } ;
78
108
poll_fn ( move |cx| {
79
109
let mut done = true ;
80
-
110
+ // For each `fut`, pin-project to it, and poll it.
81
111
$(
82
- let ( $( $pos, ) * fut, .. ) = & mut futures;
83
-
84
- // SAFETY: The futures are never moved
85
- done &= unsafe { Pin :: new_unchecked ( fut) . poll ( cx) . is_ready ( ) } ;
112
+ // SAFETY: pinning projection
113
+ let fut = unsafe {
114
+ futures. as_mut ( ) . map_unchecked_mut ( |it| {
115
+ let ( $( $pos, ) * fut, .. ) = it;
116
+ fut
117
+ } )
118
+ } ;
119
+ // Despite how tempting it may be to `let () = fut.poll(cx).ready()?;`
120
+ // doing so would defeat the point of `join!`: to start polling eagerly all
121
+ // of the futures, to allow parallelizing the waits.
122
+ done &= fut. poll ( cx) . is_ready ( ) ;
86
123
) *
87
-
88
- if done {
89
- // Extract all the outputs
90
- Poll :: Ready ( ( $( {
91
- let ( $( $pos, ) * fut, .. ) = & mut futures;
92
-
93
- fut. take_output ( ) . unwrap ( )
94
- } ) , * ) )
95
- } else {
96
- Poll :: Pending
124
+ if !done {
125
+ return Poll :: Pending ;
97
126
}
127
+ // All ready; time to extract all the outputs.
128
+
129
+ // SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`.
130
+ let futures = unsafe {
131
+ futures. as_mut ( ) . get_unchecked_mut ( )
132
+ } ;
133
+ Poll :: Ready (
134
+ ( $(
135
+ {
136
+ let ( $( $pos, ) * fut, .. ) = & mut * futures;
137
+ fut. take_output ( ) . unwrap ( )
138
+ }
139
+ ) , * ) // <- no trailing comma since we don't want 1-tuples.
140
+ )
98
141
} ) . await
99
- }
100
- }
142
+ } }
143
+ ) ,
101
144
}
102
145
103
146
/// Future used by `join!` that stores it's output to
@@ -109,14 +152,14 @@ pub macro join {
109
152
pub enum MaybeDone <F : Future > {
110
153
Future ( F ) ,
111
154
Done ( F :: Output ) ,
112
- Took ,
155
+ Taken ,
113
156
}
114
157
115
158
#[ unstable( feature = "future_join" , issue = "91642" ) ]
116
159
impl < F : Future > MaybeDone < F > {
117
160
pub fn take_output ( & mut self ) -> Option < F :: Output > {
118
- match & * self {
119
- MaybeDone :: Done ( _) => match mem:: replace ( self , Self :: Took ) {
161
+ match * self {
162
+ MaybeDone :: Done ( _) => match mem:: replace ( self , Self :: Taken ) {
120
163
MaybeDone :: Done ( val) => Some ( val) ,
121
164
_ => unreachable ! ( ) ,
122
165
} ,
@@ -132,13 +175,14 @@ impl<F: Future> Future for MaybeDone<F> {
132
175
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
133
176
// SAFETY: pinning in structural for `f`
134
177
unsafe {
135
- match self . as_mut ( ) . get_unchecked_mut ( ) {
136
- MaybeDone :: Future ( f) => match Pin :: new_unchecked ( f) . poll ( cx) {
137
- Poll :: Ready ( val) => self . set ( Self :: Done ( val) ) ,
138
- Poll :: Pending => return Poll :: Pending ,
139
- } ,
178
+ // Do not mix match ergonomics with unsafe.
179
+ match * self . as_mut ( ) . get_unchecked_mut ( ) {
180
+ MaybeDone :: Future ( ref mut f) => {
181
+ let val = Pin :: new_unchecked ( f) . poll ( cx) . ready ( ) ?;
182
+ self . set ( Self :: Done ( val) ) ;
183
+ }
140
184
MaybeDone :: Done ( _) => { }
141
- MaybeDone :: Took => unreachable ! ( ) ,
185
+ MaybeDone :: Taken => unreachable ! ( ) ,
142
186
}
143
187
}
144
188
0 commit comments