@@ -66,6 +66,7 @@ impl AggregateExpr for Median {
66
66
fn create_accumulator ( & self ) -> Result < Box < dyn Accumulator > > {
67
67
Ok ( Box :: new ( MedianAccumulator {
68
68
data_type : self . data_type . clone ( ) ,
69
+ arrays : vec ! [ ] ,
69
70
all_values : vec ! [ ] ,
70
71
} ) )
71
72
}
@@ -108,29 +109,31 @@ impl PartialEq<dyn Any> for Median {
108
109
/// The median accumulator accumulates the raw input values
109
110
/// as `ScalarValue`s
110
111
///
111
- /// The intermediate state is represented as a List of those scalars
112
+ /// The intermediate state is represented as a List of scalar values updated by
113
+ /// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
114
+ /// in the final evaluation step so that we avoid expensive conversions and
115
+ /// allocations during `update_batch`.
112
116
struct MedianAccumulator {
113
117
data_type : DataType ,
118
+ arrays : Vec < ArrayRef > ,
114
119
all_values : Vec < ScalarValue > ,
115
120
}
116
121
117
122
impl Accumulator for MedianAccumulator {
118
123
fn state ( & self ) -> Result < Vec < ScalarValue > > {
119
- let state =
120
- ScalarValue :: new_list ( Some ( self . all_values . clone ( ) ) , self . data_type . clone ( ) ) ;
124
+ let all_values = to_scalar_values ( & self . arrays ) ?;
125
+ let state = ScalarValue :: new_list ( Some ( all_values) , self . data_type . clone ( ) ) ;
126
+
121
127
Ok ( vec ! [ state] )
122
128
}
123
129
124
130
fn update_batch ( & mut self , values : & [ ArrayRef ] ) -> Result < ( ) > {
125
131
assert_eq ! ( values. len( ) , 1 ) ;
126
132
let array = & values[ 0 ] ;
127
133
134
+ // Defer conversions to scalar values to final evaluation.
128
135
assert_eq ! ( array. data_type( ) , & self . data_type) ;
129
- self . all_values . reserve ( array. len ( ) ) ;
130
- for index in 0 ..array. len ( ) {
131
- self . all_values
132
- . push ( ScalarValue :: try_from_array ( array, index) ?) ;
133
- }
136
+ self . arrays . push ( array. clone ( ) ) ;
134
137
135
138
Ok ( ( ) )
136
139
}
@@ -157,7 +160,14 @@ impl Accumulator for MedianAccumulator {
157
160
}
158
161
159
162
fn evaluate ( & self ) -> Result < ScalarValue > {
160
- if !self . all_values . iter ( ) . any ( |v| !v. is_null ( ) ) {
163
+ let batch_values = to_scalar_values ( & self . arrays ) ?;
164
+
165
+ if !self
166
+ . all_values
167
+ . iter ( )
168
+ . chain ( batch_values. iter ( ) )
169
+ . any ( |v| !v. is_null ( ) )
170
+ {
161
171
return ScalarValue :: try_from ( & self . data_type ) ;
162
172
}
163
173
@@ -166,6 +176,7 @@ impl Accumulator for MedianAccumulator {
166
176
let array = ScalarValue :: iter_to_array (
167
177
self . all_values
168
178
. iter ( )
179
+ . chain ( batch_values. iter ( ) )
169
180
// ignore null values
170
181
. filter ( |v| !v. is_null ( ) )
171
182
. cloned ( ) ,
@@ -214,13 +225,30 @@ impl Accumulator for MedianAccumulator {
214
225
}
215
226
216
227
fn size ( & self ) -> usize {
217
- std:: mem:: size_of_val ( self ) + ScalarValue :: size_of_vec ( & self . all_values )
228
+ let arrays_size: usize = self . arrays . iter ( ) . map ( |a| a. len ( ) ) . sum ( ) ;
229
+
230
+ std:: mem:: size_of_val ( self )
231
+ + ScalarValue :: size_of_vec ( & self . all_values )
232
+ + arrays_size
218
233
- std:: mem:: size_of_val ( & self . all_values )
219
234
+ self . data_type . size ( )
220
235
- std:: mem:: size_of_val ( & self . data_type )
221
236
}
222
237
}
223
238
239
+ fn to_scalar_values ( arrays : & [ ArrayRef ] ) -> Result < Vec < ScalarValue > > {
240
+ let num_values: usize = arrays. iter ( ) . map ( |a| a. len ( ) ) . sum ( ) ;
241
+ let mut all_values = Vec :: with_capacity ( num_values) ;
242
+
243
+ for array in arrays {
244
+ for index in 0 ..array. len ( ) {
245
+ all_values. push ( ScalarValue :: try_from_array ( & array, index) ?) ;
246
+ }
247
+ }
248
+
249
+ Ok ( all_values)
250
+ }
251
+
224
252
/// Given a returns `array[indicies[indicie_index]]` as a `ScalarValue`
225
253
fn scalar_at_index (
226
254
array : & dyn Array ,
0 commit comments