Skip to content

Proposed changes for more flexible user defined Aggregate and window functions #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 106 additions & 20 deletions datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, UInt64Array};
use arrow::{
array::{AsArray, Float64Array},
datatypes::Float64Type,
Expand All @@ -26,9 +28,10 @@ use datafusion::datasource::file_format::options::CsvReadOptions;

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::DataFusionError;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::{
partition_evaluator::PartitionEvaluator, Signature, Volatility, WindowUDF,
partition_evaluator::PartitionEvaluator, Accumulator, Signature, Volatility,
WindowUDF,
};

// create local execution context with `cars.csv` registered as a table named `cars`
Expand All @@ -52,6 +55,8 @@ async fn main() -> Result<()> {

// register the window function with DataFusion so wecan call it
ctx.register_udwf(my_average());
ctx.register_udwf(my_first_value());
ctx.register_udwf(my_odd_row_number());

// Use SQL to run the new window function
let df = ctx.sql("SELECT * from cars").await?;
Expand All @@ -67,6 +72,22 @@ async fn main() -> Result<()> {
speed, \
lag(speed, 1) OVER (PARTITION BY car ORDER BY time),\
my_average(speed) OVER (PARTITION BY car ORDER BY time),\
my_first_value(speed) OVER (PARTITION BY car ORDER BY time),\
my_odd_row_number(speed) OVER (PARTITION BY car ORDER BY time), \
time \
from cars",
)
.await?;
// print the results
df.show().await?;

// When all of the window functions support bounded, BoundedWindowAggExec will be used
// In this case evaluate_stateful method will be called
let df = ctx
.sql(
"SELECT car, \
speed, \
my_odd_row_number(speed) OVER (PARTITION BY car ORDER BY time), \
time \
from cars",
)
Expand Down Expand Up @@ -132,15 +153,9 @@ impl MyPartitionEvaluator {

/// These different evaluation methods are called depending on the various settings of WindowUDF
impl PartitionEvaluator for MyPartitionEvaluator {
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<std::ops::Range<usize>> {
Err(DataFusionError::NotImplemented(
"get_range is not implemented for this window function".to_string(),
))
}

/// This function is given the values of each partition
fn evaluate(
&self,
fn evaluate_all(
&mut self,
values: &[arrow::array::ArrayRef],
_num_rows: usize,
) -> Result<arrow::array::ArrayRef> {
Expand Down Expand Up @@ -177,12 +192,13 @@ impl PartitionEvaluator for MyPartitionEvaluator {
Ok(Arc::new(new_values))
}

fn evaluate_stateful(
fn evaluate(
&mut self,
_values: &[arrow::array::ArrayRef],
_range: &std::ops::Range<usize>,
) -> Result<datafusion_common::ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_stateful is not implemented by default".into(),
"evaluate is not implemented by default".into(),
))
}

Expand All @@ -196,15 +212,85 @@ impl PartitionEvaluator for MyPartitionEvaluator {
))
}

fn evaluate_inside_range(
&self,
_values: &[arrow::array::ArrayRef],
_range: &std::ops::Range<usize>,
) -> Result<datafusion_common::ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_inside_range is not implemented by default".into(),
))
}

fn my_first_value() -> WindowUDF {
WindowUDF {
name: String::from("my_first_value"),
// it will take 2 arguments -- the column and the window size
signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable),
return_type: Arc::new(return_type),
partition_evaluator: Arc::new(make_partition_evaluator_first_value),
}
}

/// Create a partition evaluator for this argument
fn make_partition_evaluator_first_value() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(MyFirstValue::new()))
}

fn my_odd_row_number() -> WindowUDF {
WindowUDF {
name: String::from("my_odd_row_number"),
// it will take 2 arguments -- the column and the window size
signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable),
return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))),
partition_evaluator: Arc::new(make_partition_evaluator_odd_row_number),
}
}

/// Create a partition evaluator for this argument
fn make_partition_evaluator_odd_row_number() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(OddRowNumber::new()))
}

#[derive(Clone, Debug)]
struct MyFirstValue {}

impl MyFirstValue {
fn new() -> Self {
Self {}
}
}

// TODO show how to use other evaluate methods
/// These different evaluation methods are called depending on the various settings of WindowUDF
impl PartitionEvaluator for MyFirstValue {
fn evaluate(
&mut self,
values: &[arrow::array::ArrayRef],
range: &std::ops::Range<usize>,
) -> Result<ScalarValue> {
let first = ScalarValue::try_from_array(&values[0], range.start)?;
Ok(first)
}

fn uses_window_frame(&self) -> bool {
true
}
}

#[derive(Clone, Debug)]
struct OddRowNumber {
row_idx: usize,
}

impl OddRowNumber {
fn new() -> Self {
Self { row_idx: 1 }
}
}

// TODO show how to use other evaluate methods
/// These different evaluation methods are called depending on the various settings of WindowUDF
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

impl PartitionEvaluator for OddRowNumber {
fn evaluate(&mut self, _values: &[ArrayRef], _range: &Range<usize>,) -> Result<ScalarValue> {
let res = Ok(ScalarValue::UInt64(Some(self.row_idx as u64)));
self.row_idx += 2;
res
}

fn supports_bounded_execution(&self) -> bool {
true
}
}
14 changes: 13 additions & 1 deletion datafusion/core/src/physical_plan/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::{

use super::{expressions::format_state_name, Accumulator, AggregateExpr};
use crate::physical_plan::PhysicalExpr;
use datafusion_common::Result;
use datafusion_common::{DataFusionError, Result};
pub use datafusion_expr::AggregateUDF;

use datafusion_physical_expr::aggregate::utils::down_cast_any_ref;
Expand Down Expand Up @@ -106,6 +106,18 @@ impl AggregateExpr for AggregateFunctionExpr {
(self.fun.accumulator)(&self.data_type)
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let acc = (self.fun.accumulator)(&self.data_type)?;
if acc.supports_bounded_execution() {
Ok(acc)
} else {
Err(DataFusionError::Execution(format!(
"Accumulator: {:?} doesn't support bounded execution",
self.fun.name
)))
}
}

fn name(&self) -> &str {
&self.name
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ pub trait Accumulator: Send + Sync + Debug {
/// Returns the final aggregate value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;

/// Specifies whether this aggregate function can run using bounded memory.
/// Any accumulator returning "true" needs to implement `retract_batch`.
fn supports_bounded_execution(&self) -> bool {
false
}

/// Allocated size required for this accumulator, in bytes, including `Self`.
/// Allocated means that for internal containers such as `Vec`, the `capacity` should be used
/// not the `len`
Expand Down
71 changes: 46 additions & 25 deletions datafusion/expr/src/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ pub trait PartitionState {
///
/// # Stateless `PartitionEvaluator`
///
/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or
/// In this case, [`Self::evaluate_all`], [`Self::evaluate_with_rank`] or
/// [`Self::evaluate_inside_range`] is called with values for the
/// entire partition.
///
/// # Stateful `PartitionEvaluator`
///
/// In this case, [`Self::evaluate_stateful`] is called to calculate
/// In this case, [`Self::evaluate`] is called to calculate
/// the results of the window function incrementally for each new
/// batch, saving and restoring any state needed to do so as
/// [`BuiltinWindowState`].
///
/// For example, when computing `ROW_NUMBER` incrementally,
/// [`Self::evaluate_stateful`] will be called multiple times with
/// [`Self::evaluate`] will be called multiple times with
/// different batches. For all batches after the first, the output
/// `row_number` must start from last `row_number` produced for the
/// previous batch. The previous row number is saved and restored as
Expand Down Expand Up @@ -147,27 +147,46 @@ pub trait PartitionEvaluator: Debug + Send {
///
/// `idx`: is the index of last row for which result is calculated.
/// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
Err(DataFusionError::NotImplemented(
"get_range is not implemented for this window function".to_string(),
))
/// If `uses_window_frame` flag is `false`. This method is used to calculate required range for the window function
/// Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row
/// is enough to calculate window result (such as row_number, rank, etc)
fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
if self.uses_window_frame() {
Err(DataFusionError::Execution(
"Range should be calculated from window frame".to_string(),
))
} else {
Ok(Range {
start: idx,
end: idx + 1,
})
}
}

/// Called for window functions that *do not use* values from the
/// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`,
/// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`).
fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
Err(DataFusionError::NotImplemented(
"evaluate is not implemented by default".into(),
))
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
if !self.uses_window_frame() && self.supports_bounded_execution(){
let res = (0..num_rows).into_iter().map(|idx| self.evaluate(values, &Range{start: 0, end: 1})).collect::<Result<Vec<_>>>()?;
ScalarValue::iter_to_array(res.into_iter())
}else {
Err(DataFusionError::NotImplemented(
"evaluate_all is not implemented by default".into(),
))
}
}

/// Evaluate window function result inside given range.
///
/// Only used for stateful evaluation
fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
fn evaluate(
&mut self,
_values: &[ArrayRef],
_range: &Range<usize>,
) -> Result<ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_stateful is not implemented by default".into(),
"evaluate is not implemented by default".into(),
))
}

Expand Down Expand Up @@ -210,18 +229,20 @@ pub trait PartitionEvaluator: Debug + Send {
))
}

/// Called for window functions that use values from window frame,
/// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a
/// single value for every row in the partition.
/// Does the window function use the values from its window frame?
///
/// Returns a [`ScalarValue`] that is the value of the window function for the entire partition
fn evaluate_inside_range(
&self,
_values: &[ArrayRef],
_range: &Range<usize>,
) -> Result<ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_inside_range is not implemented by default".into(),
))
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_inside_range`]
fn uses_window_frame(&self) -> bool {
false
}

/// Can the window function be incrementally computed using
/// bounded memory?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate`]
fn supports_bounded_execution(&self) -> bool {
false
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When include_rank flag is true, evaluate_with_rank_all will be called. This method is basically same with evaluate_all in the spirit. (It takes all the data and produces all the output in single pass). However, since evaluate_with_rank_all requires additional arguments (such as rank boundaries). We do not unify their API, to not recalculate rank boundaries each time (even if we do not use them).
Certainly, we can move this trait to PartitionEvaluator also. However, I thought this would be confusing. Hence didn't move it. (I will think about how to combine evaluate_with_rank_all and evaluate_all without calculating rank boundaries unnecessarily).

Maybe we can present to the user just a subset of the PartitionEvaluator methods. They wouldn't see evaluate_with_rank_all either (Just like your suggestion in option 2).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ impl AggregateExpr for Avg {
is_row_accumulator_support_dtype(&self.sum_data_type)
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -257,6 +253,10 @@ impl Accumulator for AvgAccumulator {
}
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ impl AggregateExpr for Count {
true
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator {
Ok(ScalarValue::Int64(Some(self.count)))
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
Expand Down
Loading