1
1
//! A general purpose `Batcher` implementation based on radix sort.
2
2
3
+ use std:: collections:: VecDeque ;
4
+
3
5
use timely:: communication:: message:: RefOrMut ;
4
6
use timely:: progress:: frontier:: Antichain ;
5
7
@@ -120,60 +122,6 @@ where
120
122
}
121
123
122
124
123
- use std:: slice:: { from_raw_parts} ;
124
-
125
- pub struct VecQueue < T > {
126
- list : Vec < T > ,
127
- head : usize ,
128
- tail : usize ,
129
- }
130
-
131
- impl < T > VecQueue < T > {
132
- #[ inline]
133
- pub fn new ( ) -> Self { VecQueue :: from ( Vec :: new ( ) ) }
134
- #[ inline]
135
- pub fn pop ( & mut self ) -> T {
136
- debug_assert ! ( self . head < self . tail) ;
137
- self . head += 1 ;
138
- unsafe { :: std:: ptr:: read ( self . list . as_mut_ptr ( ) . offset ( ( self . head as isize ) - 1 ) ) }
139
- }
140
- #[ inline]
141
- pub fn peek ( & self ) -> & T {
142
- debug_assert ! ( self . head < self . tail) ;
143
- unsafe { self . list . get_unchecked ( self . head ) }
144
- }
145
- #[ inline]
146
- pub fn _peek_tail ( & self ) -> & T {
147
- debug_assert ! ( self . head < self . tail) ;
148
- unsafe { self . list . get_unchecked ( self . tail -1 ) }
149
- }
150
- #[ inline]
151
- pub fn _slice ( & self ) -> & [ T ] {
152
- debug_assert ! ( self . head < self . tail) ;
153
- unsafe { from_raw_parts ( self . list . get_unchecked ( self . head ) , self . tail - self . head ) }
154
- }
155
- #[ inline]
156
- pub fn from ( mut list : Vec < T > ) -> Self {
157
- let tail = list. len ( ) ;
158
- unsafe { list. set_len ( 0 ) ; }
159
- VecQueue {
160
- list : list,
161
- head : 0 ,
162
- tail : tail,
163
- }
164
- }
165
- // could leak, if self.head != self.tail.
166
- #[ inline]
167
- pub fn done ( self ) -> Vec < T > {
168
- debug_assert ! ( self . head == self . tail) ;
169
- self . list
170
- }
171
- #[ inline]
172
- pub fn len ( & self ) -> usize { self . tail - self . head }
173
- #[ inline]
174
- pub fn is_empty ( & self ) -> bool { self . head == self . tail }
175
- }
176
-
177
125
#[ inline]
178
126
unsafe fn push_unchecked < T > ( vec : & mut Vec < T > , element : T ) {
179
127
debug_assert ! ( vec. len( ) < vec. capacity( ) ) ;
@@ -277,28 +225,28 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
277
225
let mut output = Vec :: with_capacity ( list1. len ( ) + list2. len ( ) ) ;
278
226
let mut result = self . empty ( ) ;
279
227
280
- let mut list1 = VecQueue :: from ( list1) ;
281
- let mut list2 = VecQueue :: from ( list2) ;
228
+ let mut list1 = list1. into_iter ( ) ;
229
+ let mut list2 = list2. into_iter ( ) ;
282
230
283
- let mut head1 = if !list1 . is_empty ( ) { VecQueue :: from ( list1. pop ( ) ) } else { VecQueue :: new ( ) } ;
284
- let mut head2 = if !list2 . is_empty ( ) { VecQueue :: from ( list2. pop ( ) ) } else { VecQueue :: new ( ) } ;
231
+ let mut head1 = VecDeque :: from ( list1. next ( ) . unwrap_or_default ( ) ) ;
232
+ let mut head2 = VecDeque :: from ( list2. next ( ) . unwrap_or_default ( ) ) ;
285
233
286
234
// while we have valid data in each input, merge.
287
235
while !head1. is_empty ( ) && !head2. is_empty ( ) {
288
236
289
237
while ( result. capacity ( ) - result. len ( ) ) > 0 && head1. len ( ) > 0 && head2. len ( ) > 0 {
290
238
291
239
let cmp = {
292
- let x = head1. peek ( ) ;
293
- let y = head2. peek ( ) ;
240
+ let x = head1. front ( ) . unwrap ( ) ;
241
+ let y = head2. front ( ) . unwrap ( ) ;
294
242
( & x. 0 , & x. 1 ) . cmp ( & ( & y. 0 , & y. 1 ) )
295
243
} ;
296
244
match cmp {
297
- Ordering :: Less => { unsafe { push_unchecked ( & mut result, head1. pop ( ) ) ; } }
298
- Ordering :: Greater => { unsafe { push_unchecked ( & mut result, head2. pop ( ) ) ; } }
245
+ Ordering :: Less => { unsafe { push_unchecked ( & mut result, head1. pop_front ( ) . unwrap ( ) ) ; } }
246
+ Ordering :: Greater => { unsafe { push_unchecked ( & mut result, head2. pop_front ( ) . unwrap ( ) ) ; } }
299
247
Ordering :: Equal => {
300
- let ( data1, time1, mut diff1) = head1. pop ( ) ;
301
- let ( _data2, _time2, diff2) = head2. pop ( ) ;
248
+ let ( data1, time1, mut diff1) = head1. pop_front ( ) . unwrap ( ) ;
249
+ let ( _data2, _time2, diff2) = head2. pop_front ( ) . unwrap ( ) ;
302
250
diff1. plus_equals ( & diff2) ;
303
251
if !diff1. is_zero ( ) {
304
252
unsafe { push_unchecked ( & mut result, ( data1, time1, diff1) ) ; }
@@ -313,14 +261,14 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
313
261
}
314
262
315
263
if head1. is_empty ( ) {
316
- let done1 = head1 . done ( ) ;
264
+ let done1 = Vec :: from ( head1 ) ;
317
265
if done1. capacity ( ) == Self :: buffer_size ( ) { self . stash . push ( done1) ; }
318
- head1 = if !list1 . is_empty ( ) { VecQueue :: from ( list1. pop ( ) ) } else { VecQueue :: new ( ) } ;
266
+ head1 = VecDeque :: from ( list1. next ( ) . unwrap_or_default ( ) ) ;
319
267
}
320
268
if head2. is_empty ( ) {
321
- let done2 = head2 . done ( ) ;
269
+ let done2 = Vec :: from ( head2 ) ;
322
270
if done2. capacity ( ) == Self :: buffer_size ( ) { self . stash . push ( done2) ; }
323
- head2 = if !list2 . is_empty ( ) { VecQueue :: from ( list2. pop ( ) ) } else { VecQueue :: new ( ) } ;
271
+ head2 = VecDeque :: from ( list2. next ( ) . unwrap_or_default ( ) ) ;
324
272
}
325
273
}
326
274
@@ -329,21 +277,17 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
329
277
330
278
if !head1. is_empty ( ) {
331
279
let mut result = self . empty ( ) ;
332
- for _ in 0 .. head1. len ( ) { result. push ( head1 . pop ( ) ) ; }
280
+ for item1 in head1 { result. push ( item1 ) ; }
333
281
output. push ( result) ;
334
282
}
335
- while !list1. is_empty ( ) {
336
- output. push ( list1. pop ( ) ) ;
337
- }
283
+ output. extend ( list1) ;
338
284
339
285
if !head2. is_empty ( ) {
340
286
let mut result = self . empty ( ) ;
341
- for _ in 0 .. head2. len ( ) { result. push ( head2 . pop ( ) ) ; }
287
+ for item2 in head2 { result. push ( item2 ) ; }
342
288
output. push ( result) ;
343
289
}
344
- while !list2. is_empty ( ) {
345
- output. push ( list2. pop ( ) ) ;
346
- }
290
+ output. extend ( list2) ;
347
291
348
292
output
349
293
}
0 commit comments