Skip to content

Minor: Move get_equal_orderings into BuiltInWindowFunctionExpr, remove BuiltInWindowFunctionExpr::as_any #6619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 4 additions & 23 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::{
udaf, ExecutionPlan, PhysicalExpr,
};
use arrow::datatypes::Schema;
use arrow_schema::{SchemaRef, SortOptions};
use arrow_schema::SchemaRef;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
Expand All @@ -47,7 +47,6 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs};
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
Expand Down Expand Up @@ -245,32 +244,14 @@ pub(crate) fn window_ordering_equivalence(
.with_equivalences(input.equivalence_properties())
.with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec()))
.extend(input.ordering_equivalence_properties());

for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
{
// Only the built-in `RowNumber` window function introduces a new
// ordering:
if builtin_window_expr
builtin_window_expr
.get_built_in_func_expr()
.as_any()
.is::<RowNumber>()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the PR is to remove this special case for RowNumber (which I did by hoisting it into the trait)

{
if let Some((idx, field)) =
schema.column_with_name(builtin_window_expr.name())
{
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = PhysicalSortExpr {
expr: Arc::new(column) as _,
options,
};
builder.add_equal_conditions(vec![rhs]);
}
}
.add_equal_orderings(&mut builder);
}
}
builder.build()
Expand Down
9 changes: 8 additions & 1 deletion datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,18 @@ pub struct OrderingEquivalenceBuilder {
eq_properties: EquivalenceProperties,
ordering_eq_properties: OrderingEquivalenceProperties,
existing_ordering: Vec<PhysicalSortExpr>,
schema: SchemaRef,
}

impl OrderingEquivalenceBuilder {
pub fn new(schema: SchemaRef) -> Self {
let eq_properties = EquivalenceProperties::new(schema.clone());
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema);
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema.clone());
Self {
eq_properties,
ordering_eq_properties,
existing_ordering: vec![],
schema,
}
}

Expand Down Expand Up @@ -358,6 +360,11 @@ impl OrderingEquivalenceBuilder {
}
}

/// Return a reference to the schema with which this builder was constructed with
pub fn schema(&self) -> &SchemaRef {
&self.schema
}

pub fn build(self) -> OrderingEquivalenceProperties {
self.ordering_eq_properties
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use super::partition_evaluator::PartitionEvaluator;
use crate::equivalence::OrderingEquivalenceBuilder;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::Field;
Expand Down Expand Up @@ -79,6 +80,12 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
None
}

/// Adds any equivalent orderings generated by this expression
/// to `builder`.
///
/// The default implementation does nothing
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}

/// Can the window function be incrementally computed using
/// bounded memory?
///
Expand Down
23 changes: 22 additions & 1 deletion datafusion/physical-expr/src/window/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

//! Defines physical expression for `row_number` that can evaluated at runtime during query execution

use crate::equivalence::OrderingEquivalenceBuilder;
use crate::expressions::Column;
use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{BuiltinWindowState, NumRowsState};
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::{Result, ScalarValue};
use std::any::Any;
use std::ops::Range;
Expand Down Expand Up @@ -61,6 +64,24 @@ impl BuiltInWindowFunctionExpr for RowNumber {
&self.name
}

fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) {
// The built-in RowNumber window function introduces a new
// ordering:
let schema = builder.schema();
if let Some((idx, field)) = schema.column_with_name(self.name()) {
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = PhysicalSortExpr {
expr: Arc::new(column) as _,
options,
};
builder.add_equal_conditions(vec![rhs]);
}
}

fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<NumRowsEvaluator>::default())
}
Expand Down