Skip to content

Commit 4184a7f

Browse files
authored
Combine evaluate_stateful and evaluate_inside_range (#6665)
* Combine evaluate_stateful and evaluate_inside_range * Move flags to partition_evaluator trait * Update comments * Update PartitionEvaluator comment * move include_rank to partition_evaluator * Default implement get_range when window frame is not used.
1 parent 6194d58 commit 4184a7f

File tree

9 files changed

+179
-153
lines changed

9 files changed

+179
-153
lines changed

datafusion/physical-expr/src/window/built_in.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ impl WindowExpr for BuiltInWindowExpr {
9595
}
9696

9797
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
98-
let evaluator = self.expr.create_evaluator()?;
98+
let mut evaluator = self.expr.create_evaluator()?;
9999
let num_rows = batch.num_rows();
100-
if self.expr.uses_window_frame() {
100+
if evaluator.uses_window_frame() {
101101
let sort_options: Vec<SortOptions> =
102102
self.order_by.iter().map(|o| o.options).collect();
103103
let mut row_wise_results = vec![];
@@ -114,18 +114,18 @@ impl WindowExpr for BuiltInWindowExpr {
114114
num_rows,
115115
idx,
116116
)?;
117-
let value = evaluator.evaluate_inside_range(&values, &range)?;
117+
let value = evaluator.evaluate(&values, &range)?;
118118
row_wise_results.push(value);
119119
last_range = range;
120120
}
121121
ScalarValue::iter_to_array(row_wise_results.into_iter())
122-
} else if self.expr.include_rank() {
122+
} else if evaluator.include_rank() {
123123
let columns = self.sort_columns(batch)?;
124124
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
125-
evaluator.evaluate_with_rank(num_rows, &sort_partition_points)
125+
evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
126126
} else {
127127
let (values, _) = self.get_values_orderbys(batch)?;
128-
evaluator.evaluate(&values, num_rows)
128+
evaluator.evaluate_all(&values, num_rows)
129129
}
130130
}
131131

@@ -164,15 +164,15 @@ impl WindowExpr for BuiltInWindowExpr {
164164
// We iterate on each row to perform a running calculation.
165165
let record_batch = &partition_batch_state.record_batch;
166166
let num_rows = record_batch.num_rows();
167-
let sort_partition_points = if self.expr.include_rank() {
167+
let sort_partition_points = if evaluator.include_rank() {
168168
let columns = self.sort_columns(record_batch)?;
169169
evaluate_partition_ranges(num_rows, &columns)?
170170
} else {
171171
vec![]
172172
};
173173
let mut row_wise_results: Vec<ScalarValue> = vec![];
174174
for idx in state.last_calculated_index..num_rows {
175-
let frame_range = if self.expr.uses_window_frame() {
175+
let frame_range = if evaluator.uses_window_frame() {
176176
state
177177
.window_frame_ctx
178178
.get_or_insert_with(|| {
@@ -199,7 +199,8 @@ impl WindowExpr for BuiltInWindowExpr {
199199
// Update last range
200200
state.window_frame_range = frame_range;
201201
evaluator.update_state(state, idx, &order_bys, &sort_partition_points)?;
202-
row_wise_results.push(evaluator.evaluate_stateful(&values)?);
202+
row_wise_results
203+
.push(evaluator.evaluate(&values, &state.window_frame_range)?);
203204
}
204205
let out_col = if row_wise_results.is_empty() {
205206
new_empty_array(out_type)
@@ -231,8 +232,12 @@ impl WindowExpr for BuiltInWindowExpr {
231232
}
232233

233234
fn uses_bounded_memory(&self) -> bool {
234-
self.expr.supports_bounded_execution()
235-
&& (!self.expr.uses_window_frame()
236-
|| !self.window_frame.end_bound.is_unbounded())
235+
if let Ok(evaluator) = self.expr.create_evaluator() {
236+
evaluator.supports_bounded_execution()
237+
&& (!evaluator.uses_window_frame()
238+
|| !self.window_frame.end_bound.is_unbounded())
239+
} else {
240+
false
241+
}
237242
}
238243
}

datafusion/physical-expr/src/window/built_in_window_function_expr.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -85,29 +85,4 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
8585
///
8686
/// The default implementation does nothing
8787
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
88-
89-
/// Can the window function be incrementally computed using
90-
/// bounded memory?
91-
///
92-
/// If this function returns true, [`Self::create_evaluator`] must
93-
/// implement [`PartitionEvaluator::evaluate_stateful`]
94-
fn supports_bounded_execution(&self) -> bool {
95-
false
96-
}
97-
98-
/// Does the window function use the values from its window frame?
99-
///
100-
/// If this function returns true, [`Self::create_evaluator`] must
101-
/// implement [`PartitionEvaluator::evaluate_inside_range`]
102-
fn uses_window_frame(&self) -> bool {
103-
false
104-
}
105-
106-
/// Can this function be evaluated with (only) rank
107-
///
108-
/// If `include_rank` is true, then [`Self::create_evaluator`] must
109-
/// implement [`PartitionEvaluator::evaluate_with_rank`]
110-
fn include_rank(&self) -> bool {
111-
false
112-
}
11388
}

datafusion/physical-expr/src/window/cume_dist.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,13 @@ impl BuiltInWindowFunctionExpr for CumeDist {
6464
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
6565
Ok(Box::new(CumeDistEvaluator {}))
6666
}
67-
68-
fn include_rank(&self) -> bool {
69-
true
70-
}
7167
}
7268

7369
#[derive(Debug)]
7470
pub(crate) struct CumeDistEvaluator;
7571

7672
impl PartitionEvaluator for CumeDistEvaluator {
77-
fn evaluate_with_rank(
73+
fn evaluate_with_rank_all(
7874
&self,
7975
num_rows: usize,
8076
ranks_in_partition: &[Range<usize>],
@@ -94,6 +90,10 @@ impl PartitionEvaluator for CumeDistEvaluator {
9490
);
9591
Ok(Arc::new(result))
9692
}
93+
94+
fn include_rank(&self) -> bool {
95+
true
96+
}
9797
}
9898

9999
#[cfg(test)]
@@ -109,7 +109,7 @@ mod tests {
109109
) -> Result<()> {
110110
let result = expr
111111
.create_evaluator()?
112-
.evaluate_with_rank(num_rows, &ranks)?;
112+
.evaluate_with_rank_all(num_rows, &ranks)?;
113113
let result = as_float64_array(&result)?;
114114
let result = result.values();
115115
assert_eq!(expected, *result);

datafusion/physical-expr/src/window/lead_lag.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
110110
}))
111111
}
112112

113-
fn supports_bounded_execution(&self) -> bool {
114-
true
115-
}
116-
117113
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
118114
Some(Arc::new(Self {
119115
name: self.name.clone(),
@@ -206,7 +202,11 @@ impl PartitionEvaluator for WindowShiftEvaluator {
206202
}
207203
}
208204

209-
fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
205+
fn evaluate(
206+
&mut self,
207+
values: &[ArrayRef],
208+
_range: &Range<usize>,
209+
) -> Result<ScalarValue> {
210210
let array = &values[0];
211211
let dtype = array.data_type();
212212
let idx = self.state.idx as i64 - self.shift_offset;
@@ -217,11 +217,19 @@ impl PartitionEvaluator for WindowShiftEvaluator {
217217
}
218218
}
219219

220-
fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
220+
fn evaluate_all(
221+
&mut self,
222+
values: &[ArrayRef],
223+
_num_rows: usize,
224+
) -> Result<ArrayRef> {
221225
// LEAD, LAG window functions take single column, values will have size 1
222226
let value = &values[0];
223227
shift_with_default_value(value, self.shift_offset, self.default_value.as_ref())
224228
}
229+
230+
fn supports_bounded_execution(&self) -> bool {
231+
true
232+
}
225233
}
226234

227235
fn get_default_value(
@@ -258,7 +266,7 @@ mod tests {
258266
let values = expr.evaluate_args(&batch)?;
259267
let result = expr
260268
.create_evaluator()?
261-
.evaluate(&values, batch.num_rows())?;
269+
.evaluate_all(&values, batch.num_rows())?;
262270
let result = as_int32_array(&result)?;
263271
assert_eq!(expected, *result);
264272
Ok(())

datafusion/physical-expr/src/window/nth_value.rs

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,6 @@ impl BuiltInWindowFunctionExpr for NthValue {
122122
Ok(Box::new(NthValueEvaluator { state }))
123123
}
124124

125-
fn supports_bounded_execution(&self) -> bool {
126-
true
127-
}
128-
129-
fn uses_window_frame(&self) -> bool {
130-
true
131-
}
132-
133125
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
134126
let reversed_kind = match self.kind {
135127
NthValueKind::First => NthValueKind::Last,
@@ -197,40 +189,44 @@ impl PartitionEvaluator for NthValueEvaluator {
197189
Ok(())
198190
}
199191

200-
fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
201-
if let Some(ref result) = self.state.finalized_result {
202-
Ok(result.clone())
203-
} else {
204-
self.evaluate_inside_range(values, &self.state.range)
205-
}
206-
}
207-
208-
fn evaluate_inside_range(
209-
&self,
192+
fn evaluate(
193+
&mut self,
210194
values: &[ArrayRef],
211195
range: &Range<usize>,
212196
) -> Result<ScalarValue> {
213-
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
214-
let arr = &values[0];
215-
let n_range = range.end - range.start;
216-
if n_range == 0 {
217-
// We produce None if the window is empty.
218-
return ScalarValue::try_from(arr.data_type());
219-
}
220-
match self.state.kind {
221-
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
222-
NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
223-
NthValueKind::Nth(n) => {
224-
// We are certain that n > 0.
225-
let index = (n as usize) - 1;
226-
if index >= n_range {
227-
ScalarValue::try_from(arr.data_type())
228-
} else {
229-
ScalarValue::try_from_array(arr, range.start + index)
197+
if let Some(ref result) = self.state.finalized_result {
198+
Ok(result.clone())
199+
} else {
200+
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
201+
let arr = &values[0];
202+
let n_range = range.end - range.start;
203+
if n_range == 0 {
204+
// We produce None if the window is empty.
205+
return ScalarValue::try_from(arr.data_type());
206+
}
207+
match self.state.kind {
208+
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
209+
NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
210+
NthValueKind::Nth(n) => {
211+
// We are certain that n > 0.
212+
let index = (n as usize) - 1;
213+
if index >= n_range {
214+
ScalarValue::try_from(arr.data_type())
215+
} else {
216+
ScalarValue::try_from_array(arr, range.start + index)
217+
}
230218
}
231219
}
232220
}
233221
}
222+
223+
fn supports_bounded_execution(&self) -> bool {
224+
true
225+
}
226+
227+
fn uses_window_frame(&self) -> bool {
228+
true
229+
}
234230
}
235231

236232
#[cfg(test)]
@@ -254,11 +250,11 @@ mod tests {
254250
end: i + 1,
255251
})
256252
}
257-
let evaluator = expr.create_evaluator()?;
253+
let mut evaluator = expr.create_evaluator()?;
258254
let values = expr.evaluate_args(&batch)?;
259255
let result = ranges
260256
.iter()
261-
.map(|range| evaluator.evaluate_inside_range(&values, range))
257+
.map(|range| evaluator.evaluate(&values, range))
262258
.collect::<Result<Vec<ScalarValue>>>()?;
263259
let result = ScalarValue::iter_to_array(result.into_iter())?;
264260
let result = as_int32_array(&result)?;

datafusion/physical-expr/src/window/ntile.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ pub(crate) struct NtileEvaluator {
7070
}
7171

7272
impl PartitionEvaluator for NtileEvaluator {
73-
fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
73+
fn evaluate_all(
74+
&mut self,
75+
_values: &[ArrayRef],
76+
num_rows: usize,
77+
) -> Result<ArrayRef> {
7478
let num_rows = num_rows as u64;
7579
let mut vec: Vec<u64> = Vec::new();
7680
for i in 0..num_rows {

0 commit comments

Comments
 (0)