Skip to content

Commit 8da5f26

Browse files
mustafasrepostuartcarniealamb
authored
Replace supports_bounded_execution with supports_retract_batch (#6695)
* feat: support sliding window accumulators Rationale: The default implementation of the `Accumulator` trait returns an error for the `retract_batch` API. * Allow AggregateUDF to define retractable batch * Replace supports_bounded_execution with supports_retract_batch * simplifications * simplifications * Rename evalaute_with_rank_all --------- Co-authored-by: Stuart Carnie <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 4a2c28c commit 8da5f26

File tree

11 files changed

+31
-40
lines changed

11 files changed

+31
-40
lines changed

datafusion/physical-expr/src/aggregate/average.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,6 @@ impl AggregateExpr for Avg {
134134
is_row_accumulator_support_dtype(&self.sum_data_type)
135135
}
136136

137-
fn supports_bounded_execution(&self) -> bool {
138-
true
139-
}
140-
141137
fn create_row_accumulator(
142138
&self,
143139
start_index: usize,
@@ -263,6 +259,9 @@ impl Accumulator for AvgAccumulator {
263259
)),
264260
}
265261
}
262+
fn supports_retract_batch(&self) -> bool {
263+
true
264+
}
266265

267266
fn size(&self) -> usize {
268267
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()

datafusion/physical-expr/src/aggregate/count.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,6 @@ impl AggregateExpr for Count {
133133
true
134134
}
135135

136-
fn supports_bounded_execution(&self) -> bool {
137-
true
138-
}
139-
140136
fn create_row_accumulator(
141137
&self,
142138
start_index: usize,
@@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator {
214210
Ok(ScalarValue::Int64(Some(self.count)))
215211
}
216212

213+
fn supports_retract_batch(&self) -> bool {
214+
true
215+
}
216+
217217
fn size(&self) -> usize {
218218
std::mem::size_of_val(self)
219219
}

datafusion/physical-expr/src/aggregate/min_max.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,6 @@ impl AggregateExpr for Max {
125125
is_row_accumulator_support_dtype(&self.data_type)
126126
}
127127

128-
fn supports_bounded_execution(&self) -> bool {
129-
true
130-
}
131-
132128
fn create_row_accumulator(
133129
&self,
134130
start_index: usize,
@@ -699,6 +695,10 @@ impl Accumulator for SlidingMaxAccumulator {
699695
Ok(self.max.clone())
700696
}
701697

698+
fn supports_retract_batch(&self) -> bool {
699+
true
700+
}
701+
702702
fn size(&self) -> usize {
703703
std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
704704
}
@@ -825,10 +825,6 @@ impl AggregateExpr for Min {
825825
is_row_accumulator_support_dtype(&self.data_type)
826826
}
827827

828-
fn supports_bounded_execution(&self) -> bool {
829-
true
830-
}
831-
832828
fn create_row_accumulator(
833829
&self,
834830
start_index: usize,
@@ -958,6 +954,10 @@ impl Accumulator for SlidingMinAccumulator {
958954
Ok(self.min.clone())
959955
}
960956

957+
fn supports_retract_batch(&self) -> bool {
958+
true
959+
}
960+
961961
fn size(&self) -> usize {
962962
std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
963963
}

datafusion/physical-expr/src/aggregate/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
9696
false
9797
}
9898

99-
/// Specifies whether this aggregate function can run using bounded memory.
100-
/// Any accumulator returning "true" needs to implement `retract_batch`.
101-
fn supports_bounded_execution(&self) -> bool {
102-
false
103-
}
104-
10599
/// RowAccumulator to access/update row-based aggregation state in-place.
106100
/// Currently, row accumulator only supports states of fixed-sized type.
107101
///

datafusion/physical-expr/src/aggregate/sum.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,6 @@ impl AggregateExpr for Sum {
131131
is_row_accumulator_support_dtype(&self.data_type)
132132
}
133133

134-
fn supports_bounded_execution(&self) -> bool {
135-
true
136-
}
137-
138134
fn create_row_accumulator(
139135
&self,
140136
start_index: usize,
@@ -361,6 +357,10 @@ impl Accumulator for SumAccumulator {
361357
}
362358
}
363359

360+
fn supports_retract_batch(&self) -> bool {
361+
true
362+
}
363+
364364
fn size(&self) -> usize {
365365
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
366366
}

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
155155
}
156156

157157
fn uses_bounded_memory(&self) -> bool {
158-
self.aggregate.supports_bounded_execution()
159-
&& !self.window_frame.end_bound.is_unbounded()
158+
!self.window_frame.end_bound.is_unbounded()
160159
}
161160
}
162161

datafusion/physical-expr/src/window/built_in.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl WindowExpr for BuiltInWindowExpr {
122122
} else if evaluator.include_rank() {
123123
let columns = self.sort_columns(batch)?;
124124
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
125-
evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
125+
evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
126126
} else {
127127
let (values, _) = self.get_values_orderbys(batch)?;
128128
evaluator.evaluate_all(&values, num_rows)

datafusion/physical-expr/src/window/cume_dist.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
7070
pub(crate) struct CumeDistEvaluator;
7171

7272
impl PartitionEvaluator for CumeDistEvaluator {
73-
fn evaluate_with_rank_all(
73+
fn evaluate_all_with_rank(
7474
&self,
7575
num_rows: usize,
7676
ranks_in_partition: &[Range<usize>],
@@ -109,7 +109,7 @@ mod tests {
109109
) -> Result<()> {
110110
let result = expr
111111
.create_evaluator()?
112-
.evaluate_with_rank_all(num_rows, &ranks)?;
112+
.evaluate_all_with_rank(num_rows, &ranks)?;
113113
let result = as_float64_array(&result)?;
114114
let result = result.values();
115115
assert_eq!(expected, *result);

datafusion/physical-expr/src/window/partition_evaluator.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use std::ops::Range;
6969
///
7070
/// # Stateless `PartitionEvaluator`
7171
///
72-
/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the
72+
/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_all_with_rank`] is called with values for the
7373
/// entire partition.
7474
///
7575
/// # Stateful `PartitionEvaluator`
@@ -221,7 +221,7 @@ pub trait PartitionEvaluator: Debug + Send {
221221
))
222222
}
223223

224-
/// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window
224+
/// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
225225
/// functions that only need the rank of a row within its window
226226
/// frame.
227227
///
@@ -248,7 +248,7 @@ pub trait PartitionEvaluator: Debug + Send {
248248
/// (3,4),
249249
/// ]
250250
/// ```
251-
fn evaluate_with_rank_all(
251+
fn evaluate_all_with_rank(
252252
&self,
253253
_num_rows: usize,
254254
_ranks_in_partition: &[Range<usize>],
@@ -278,7 +278,7 @@ pub trait PartitionEvaluator: Debug + Send {
278278

279279
/// Can this function be evaluated with (only) rank
280280
///
281-
/// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`]
281+
/// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_all_with_rank`]
282282
fn include_rank(&self) -> bool {
283283
false
284284
}

datafusion/physical-expr/src/window/rank.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
159159
}
160160
}
161161

162-
fn evaluate_with_rank_all(
162+
fn evaluate_all_with_rank(
163163
&self,
164164
num_rows: usize,
165165
ranks_in_partition: &[Range<usize>],
@@ -236,7 +236,7 @@ mod tests {
236236
) -> Result<()> {
237237
let result = expr
238238
.create_evaluator()?
239-
.evaluate_with_rank_all(num_rows, &ranks)?;
239+
.evaluate_all_with_rank(num_rows, &ranks)?;
240240
let result = as_float64_array(&result)?;
241241
let result = result.values();
242242
assert_eq!(expected, *result);
@@ -248,7 +248,7 @@ mod tests {
248248
ranks: Vec<Range<usize>>,
249249
expected: Vec<u64>,
250250
) -> Result<()> {
251-
let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?;
251+
let result = expr.create_evaluator()?.evaluate_all_with_rank(8, &ranks)?;
252252
let result = as_uint64_array(&result)?;
253253
let result = result.values();
254254
assert_eq!(expected, *result);

datafusion/physical-expr/src/window/sliding_aggregate.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ impl WindowExpr for SlidingAggregateWindowExpr {
139139
}
140140

141141
fn uses_bounded_memory(&self) -> bool {
142-
self.aggregate.supports_bounded_execution()
143-
&& !self.window_frame.end_bound.is_unbounded()
142+
!self.window_frame.end_bound.is_unbounded()
144143
}
145144
}
146145

0 commit comments

Comments
 (0)