Skip to content

Commit 9137a4a

Browse files
committed
Minor: Move get_equal_orderings into BuiltInWindowFunctionExpr
1 parent 1af846b commit 9137a4a

File tree

9 files changed

+43
-66
lines changed

9 files changed

+43
-66
lines changed

datafusion/core/src/physical_plan/windows/mod.rs

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::physical_plan::{
2626
udaf, ExecutionPlan, PhysicalExpr,
2727
};
2828
use arrow::datatypes::Schema;
29-
use arrow_schema::{SchemaRef, SortOptions};
29+
use arrow_schema::SchemaRef;
3030
use datafusion_common::ScalarValue;
3131
use datafusion_common::{DataFusionError, Result};
3232
use datafusion_expr::{
@@ -47,7 +47,6 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec;
4747
pub use bounded_window_agg_exec::PartitionSearchMode;
4848
use datafusion_common::utils::longest_consecutive_prefix;
4949
use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
50-
use datafusion_physical_expr::expressions::Column;
5150
use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs};
5251
pub use datafusion_physical_expr::window::{
5352
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
@@ -245,32 +244,14 @@ pub(crate) fn window_ordering_equivalence(
245244
.with_equivalences(input.equivalence_properties())
246245
.with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec()))
247246
.extend(input.ordering_equivalence_properties());
247+
248248
for expr in window_expr {
249249
if let Some(builtin_window_expr) =
250250
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
251251
{
252-
// Only the built-in `RowNumber` window function introduces a new
253-
// ordering:
254-
if builtin_window_expr
252+
builtin_window_expr
255253
.get_built_in_func_expr()
256-
.as_any()
257-
.is::<RowNumber>()
258-
{
259-
if let Some((idx, field)) =
260-
schema.column_with_name(builtin_window_expr.name())
261-
{
262-
let column = Column::new(field.name(), idx);
263-
let options = SortOptions {
264-
descending: false,
265-
nulls_first: false,
266-
}; // ASC, NULLS LAST
267-
let rhs = PhysicalSortExpr {
268-
expr: Arc::new(column) as _,
269-
options,
270-
};
271-
builder.add_equal_conditions(vec![rhs]);
272-
}
273-
}
254+
.add_equal_orderings(&mut builder);
274255
}
275256
}
276257
builder.build()

datafusion/physical-expr/src/equivalence.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,16 +296,18 @@ pub struct OrderingEquivalenceBuilder {
296296
eq_properties: EquivalenceProperties,
297297
ordering_eq_properties: OrderingEquivalenceProperties,
298298
existing_ordering: Vec<PhysicalSortExpr>,
299+
schema: SchemaRef,
299300
}
300301

301302
impl OrderingEquivalenceBuilder {
302303
pub fn new(schema: SchemaRef) -> Self {
303304
let eq_properties = EquivalenceProperties::new(schema.clone());
304-
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema);
305+
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema.clone());
305306
Self {
306307
eq_properties,
307308
ordering_eq_properties,
308309
existing_ordering: vec![],
310+
schema,
309311
}
310312
}
311313

@@ -358,6 +360,11 @@ impl OrderingEquivalenceBuilder {
358360
}
359361
}
360362

363+
/// Return a reference to the schema with which this builder was constructed with
364+
pub fn schema(&self) -> &SchemaRef {
365+
&self.schema
366+
}
367+
361368
pub fn build(self) -> OrderingEquivalenceProperties {
362369
self.ordering_eq_properties
363370
}

datafusion/physical-expr/src/window/built_in_window_function_expr.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use super::partition_evaluator::PartitionEvaluator;
21+
use crate::equivalence::OrderingEquivalenceBuilder;
1922
use crate::PhysicalExpr;
2023
use arrow::array::ArrayRef;
2124
use arrow::datatypes::Field;
2225
use arrow::record_batch::RecordBatch;
2326
use datafusion_common::Result;
24-
use std::any::Any;
25-
use std::sync::Arc;
2627

2728
/// Evaluates a window function by instantiating a
2829
/// `[PartitionEvaluator]` for calculating the function's output in
@@ -34,10 +35,6 @@ use std::sync::Arc;
3435
/// `nth_value` need the value.
3536
#[allow(rustdoc::private_intra_doc_links)]
3637
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
37-
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
38-
/// downcast to a specific implementation.
39-
fn as_any(&self) -> &dyn Any;
40-
4138
/// The field of the final result of evaluating this window function.
4239
fn field(&self) -> Result<Field>;
4340

@@ -79,6 +76,12 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
7976
None
8077
}
8178

79+
/// Adds any equivalent orderings generated by this expression
80+
/// to `builder`.
81+
///
82+
/// The default implementation does nothing
83+
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
84+
8285
/// Can the window function be incrementally computed using
8386
/// bounded memory?
8487
///

datafusion/physical-expr/src/window/cume_dist.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use arrow::array::ArrayRef;
2525
use arrow::array::Float64Array;
2626
use arrow::datatypes::{DataType, Field};
2727
use datafusion_common::Result;
28-
use std::any::Any;
2928
use std::iter;
3029
use std::ops::Range;
3130
use std::sync::Arc;
@@ -42,11 +41,6 @@ pub fn cume_dist(name: String) -> CumeDist {
4241
}
4342

4443
impl BuiltInWindowFunctionExpr for CumeDist {
45-
/// Return a reference to Any that can be used for downcasting
46-
fn as_any(&self) -> &dyn Any {
47-
self
48-
}
49-
5044
fn field(&self) -> Result<Field> {
5145
let nullable = false;
5246
let data_type = DataType::Float64;

datafusion/physical-expr/src/window/lead_lag.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use arrow::compute::cast;
2727
use arrow::datatypes::{DataType, Field};
2828
use datafusion_common::ScalarValue;
2929
use datafusion_common::{DataFusionError, Result};
30-
use std::any::Any;
3130
use std::cmp::min;
3231
use std::ops::{Neg, Range};
3332
use std::sync::Arc;
@@ -84,11 +83,6 @@ pub fn lag(
8483
}
8584

8685
impl BuiltInWindowFunctionExpr for WindowShift {
87-
/// Return a reference to Any that can be used for downcasting
88-
fn as_any(&self) -> &dyn Any {
89-
self
90-
}
91-
9286
fn field(&self) -> Result<Field> {
9387
let nullable = true;
9488
Ok(Field::new(&self.name, self.data_type.clone(), nullable))

datafusion/physical-expr/src/window/nth_value.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use arrow::array::{Array, ArrayRef};
2626
use arrow::datatypes::{DataType, Field};
2727
use datafusion_common::ScalarValue;
2828
use datafusion_common::{DataFusionError, Result};
29-
use std::any::Any;
3029
use std::ops::Range;
3130
use std::sync::Arc;
3231

@@ -95,11 +94,6 @@ impl NthValue {
9594
}
9695

9796
impl BuiltInWindowFunctionExpr for NthValue {
98-
/// Return a reference to Any that can be used for downcasting
99-
fn as_any(&self) -> &dyn Any {
100-
self
101-
}
102-
10397
fn field(&self) -> Result<Field> {
10498
let nullable = true;
10599
Ok(Field::new(&self.name, self.data_type.clone(), nullable))

datafusion/physical-expr/src/window/ntile.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use arrow::array::{ArrayRef, UInt64Array};
2525
use arrow::datatypes::Field;
2626
use arrow_schema::DataType;
2727
use datafusion_common::Result;
28-
use std::any::Any;
2928
use std::sync::Arc;
3029

3130
#[derive(Debug)]
@@ -41,10 +40,6 @@ impl Ntile {
4140
}
4241

4342
impl BuiltInWindowFunctionExpr for Ntile {
44-
fn as_any(&self) -> &dyn Any {
45-
self
46-
}
47-
4843
fn field(&self) -> Result<Field> {
4944
let nullable = false;
5045
let data_type = DataType::UInt64;

datafusion/physical-expr/src/window/rank.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use arrow::array::{Float64Array, UInt64Array};
2727
use arrow::datatypes::{DataType, Field};
2828
use datafusion_common::utils::get_row_at_idx;
2929
use datafusion_common::{DataFusionError, Result, ScalarValue};
30-
use std::any::Any;
3130
use std::iter;
3231
use std::ops::Range;
3332
use std::sync::Arc;
@@ -78,11 +77,6 @@ pub fn percent_rank(name: String) -> Rank {
7877
}
7978

8079
impl BuiltInWindowFunctionExpr for Rank {
81-
/// Return a reference to Any that can be used for downcasting
82-
fn as_any(&self) -> &dyn Any {
83-
self
84-
}
85-
8680
fn field(&self) -> Result<Field> {
8781
let nullable = false;
8882
let data_type = match self.rank_type {

datafusion/physical-expr/src/window/row_number.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution
1919
20+
use crate::equivalence::OrderingEquivalenceBuilder;
21+
use crate::expressions::Column;
2022
use crate::window::partition_evaluator::PartitionEvaluator;
2123
use crate::window::window_expr::{BuiltinWindowState, NumRowsState};
2224
use crate::window::BuiltInWindowFunctionExpr;
23-
use crate::PhysicalExpr;
25+
use crate::{PhysicalExpr, PhysicalSortExpr};
2426
use arrow::array::{ArrayRef, UInt64Array};
2527
use arrow::datatypes::{DataType, Field};
28+
use arrow_schema::SortOptions;
2629
use datafusion_common::{Result, ScalarValue};
27-
use std::any::Any;
2830
use std::ops::Range;
2931
use std::sync::Arc;
3032

@@ -42,11 +44,6 @@ impl RowNumber {
4244
}
4345

4446
impl BuiltInWindowFunctionExpr for RowNumber {
45-
/// Return a reference to Any that can be used for downcasting
46-
fn as_any(&self) -> &dyn Any {
47-
self
48-
}
49-
5047
fn field(&self) -> Result<Field> {
5148
let nullable = false;
5249
let data_type = DataType::UInt64;
@@ -61,6 +58,24 @@ impl BuiltInWindowFunctionExpr for RowNumber {
6158
&self.name
6259
}
6360

61+
fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) {
62+
// Only the built-in `RowNumber` window function introduces a new
63+
// ordering:
64+
let schema = builder.schema();
65+
if let Some((idx, field)) = schema.column_with_name(self.name()) {
66+
let column = Column::new(field.name(), idx);
67+
let options = SortOptions {
68+
descending: false,
69+
nulls_first: false,
70+
}; // ASC, NULLS LAST
71+
let rhs = PhysicalSortExpr {
72+
expr: Arc::new(column) as _,
73+
options,
74+
};
75+
builder.add_equal_conditions(vec![rhs]);
76+
}
77+
}
78+
6479
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
6580
Ok(Box::<NumRowsEvaluator>::default())
6681
}

0 commit comments

Comments
 (0)