Skip to content

Minor: Move get_equal_orderings into BuiltInWindowFunctionExpr, remove BuiltInWindowFunctionExpr::as_any #6619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 4 additions & 23 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::{
udaf, ExecutionPlan, PhysicalExpr,
};
use arrow::datatypes::Schema;
use arrow_schema::{SchemaRef, SortOptions};
use arrow_schema::SchemaRef;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
Expand All @@ -47,7 +47,6 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs};
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
Expand Down Expand Up @@ -245,32 +244,14 @@ pub(crate) fn window_ordering_equivalence(
.with_equivalences(input.equivalence_properties())
.with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec()))
.extend(input.ordering_equivalence_properties());

for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
{
// Only the built-in `RowNumber` window function introduces a new
// ordering:
if builtin_window_expr
builtin_window_expr
.get_built_in_func_expr()
.as_any()
.is::<RowNumber>()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the PR is to remove this special case for RowNumber (which I did by hoisting it into the trait)

{
if let Some((idx, field)) =
schema.column_with_name(builtin_window_expr.name())
{
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = PhysicalSortExpr {
expr: Arc::new(column) as _,
options,
};
builder.add_equal_conditions(vec![rhs]);
}
}
.add_equal_orderings(&mut builder);
}
}
builder.build()
Expand Down
9 changes: 8 additions & 1 deletion datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,18 @@ pub struct OrderingEquivalenceBuilder {
eq_properties: EquivalenceProperties,
ordering_eq_properties: OrderingEquivalenceProperties,
existing_ordering: Vec<PhysicalSortExpr>,
schema: SchemaRef,
}

impl OrderingEquivalenceBuilder {
pub fn new(schema: SchemaRef) -> Self {
let eq_properties = EquivalenceProperties::new(schema.clone());
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema);
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema.clone());
Self {
eq_properties,
ordering_eq_properties,
existing_ordering: vec![],
schema,
}
}

Expand Down Expand Up @@ -358,6 +360,11 @@ impl OrderingEquivalenceBuilder {
}
}

/// Return a reference to the schema with which this builder was constructed with
pub fn schema(&self) -> &SchemaRef {
&self.schema
}

pub fn build(self) -> OrderingEquivalenceProperties {
self.ordering_eq_properties
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use super::partition_evaluator::PartitionEvaluator;
use crate::equivalence::OrderingEquivalenceBuilder;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::sync::Arc;

/// Evaluates a window function by instantiating a
/// `[PartitionEvaluator]` for calculating the function's output in
Expand All @@ -34,10 +35,6 @@ use std::sync::Arc;
/// `nth_value` need the value.
#[allow(rustdoc::private_intra_doc_links)]
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// The field of the final result of evaluating this window function.
fn field(&self) -> Result<Field>;

Expand Down Expand Up @@ -79,6 +76,12 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
None
}

/// Adds any equivalent orderings generated by this expression
/// to `builder`.
///
/// The default implementation does nothing
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}

/// Can the window function be incrementally computed using
/// bounded memory?
///
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/window/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use datafusion_common::Result;
use std::any::Any;
use std::iter;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -42,11 +41,6 @@ pub fn cume_dist(name: String) -> CumeDist {
}

impl BuiltInWindowFunctionExpr for CumeDist {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::Float64;
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
use std::cmp::min;
use std::ops::{Neg, Range};
use std::sync::Arc;
Expand Down Expand Up @@ -84,11 +83,6 @@ pub fn lag(
}

impl BuiltInWindowFunctionExpr for WindowShift {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = true;
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -95,11 +94,6 @@ impl NthValue {
}

impl BuiltInWindowFunctionExpr for NthValue {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = true;
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
Expand Down
5 changes: 0 additions & 5 deletions datafusion/physical-expr/src/window/ntile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::Field;
use arrow_schema::DataType;
use datafusion_common::Result;
use std::any::Any;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -41,10 +40,6 @@ impl Ntile {
}

impl BuiltInWindowFunctionExpr for Ntile {
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::UInt64;
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/window/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use std::any::Any;
use std::iter;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -78,11 +77,6 @@ pub fn percent_rank(name: String) -> Rank {
}

impl BuiltInWindowFunctionExpr for Rank {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = match self.rank_type {
Expand Down
29 changes: 22 additions & 7 deletions datafusion/physical-expr/src/window/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

//! Defines physical expression for `row_number` that can evaluated at runtime during query execution

use crate::equivalence::OrderingEquivalenceBuilder;
use crate::expressions::Column;
use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{BuiltinWindowState, NumRowsState};
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::{Result, ScalarValue};
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -42,11 +44,6 @@ impl RowNumber {
}

impl BuiltInWindowFunctionExpr for RowNumber {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::UInt64;
Expand All @@ -61,6 +58,24 @@ impl BuiltInWindowFunctionExpr for RowNumber {
&self.name
}

fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) {
// Only the built-in `RowNumber` window function introduces a new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can replace comment
// Only the built-in RowNumber window function introduces a new with
// The built-in RowNumber window function introduces a new. Since method implements it (and special handling is not done outside), we do not need stress this is the only window function with this property.

Copy link
Contributor Author

@alamb alamb Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done in commit ad16502

// ordering:
let schema = builder.schema();
if let Some((idx, field)) = schema.column_with_name(self.name()) {
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = PhysicalSortExpr {
expr: Arc::new(column) as _,
options,
};
builder.add_equal_conditions(vec![rhs]);
}
}

fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<NumRowsEvaluator>::default())
}
Expand Down