Skip to content

Commit 0b9f653

Browse files
committed
refactor: move aggregate statistics to datafusion-physical-optimizer
1 parent f4e519f commit 0b9f653

File tree

6 files changed

+316
-286
lines changed

6 files changed

+316
-286
lines changed

datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
193193
mod tests {
194194

195195
use super::*;
196-
use crate::physical_optimizer::aggregate_statistics::tests::TestAggregate;
197196
use crate::physical_optimizer::enforce_distribution::tests::{
198197
parquet_exec_with_sort, schema, trim_plan_display,
199198
};
199+
use crate::physical_optimizer::tests::aggregate_statistics::tests::TestAggregate;
200200
use crate::physical_plan::aggregates::PhysicalGroupBy;
201201
use crate::physical_plan::collect;
202202
use crate::physical_plan::memory::MemoryExec;

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
//! "Repartition" or "Sortedness"
2222
//!
2323
//! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
24-
pub mod aggregate_statistics;
24+
mod tests {
25+
pub mod aggregate_statistics;
26+
}
2527
pub mod coalesce_batches;
2628
pub mod combine_partial_final_agg;
2729
pub mod enforce_distribution;

datafusion/core/src/physical_optimizer/aggregate_statistics.rs renamed to datafusion/core/src/physical_optimizer/tests/aggregate_statistics.rs

Lines changed: 12 additions & 284 deletions
Original file line numberDiff line numberDiff line change
@@ -16,291 +16,19 @@
1616
// under the License.
1717

1818
//! Utilizing exact statistics from sources to avoid scanning data
19-
use std::sync::Arc;
20-
21-
use crate::config::ConfigOptions;
22-
use crate::error::Result;
23-
use crate::physical_plan::aggregates::AggregateExec;
24-
use crate::physical_plan::projection::ProjectionExec;
25-
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics};
26-
use crate::scalar::ScalarValue;
27-
28-
use datafusion_common::stats::Precision;
29-
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30-
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
31-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
32-
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
33-
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
34-
35-
/// Optimizer that uses available statistics for aggregate functions
36-
#[derive(Default)]
37-
pub struct AggregateStatistics {}
38-
39-
impl AggregateStatistics {
40-
#[allow(missing_docs)]
41-
pub fn new() -> Self {
42-
Self {}
43-
}
44-
}
45-
46-
impl PhysicalOptimizerRule for AggregateStatistics {
47-
fn optimize(
48-
&self,
49-
plan: Arc<dyn ExecutionPlan>,
50-
_config: &ConfigOptions,
51-
) -> Result<Arc<dyn ExecutionPlan>> {
52-
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
53-
let partial_agg_exec = partial_agg_exec
54-
.as_any()
55-
.downcast_ref::<AggregateExec>()
56-
.expect("take_optimizable() ensures that this is a AggregateExec");
57-
let stats = partial_agg_exec.input().statistics()?;
58-
let mut projections = vec![];
59-
for expr in partial_agg_exec.aggr_expr() {
60-
if let Some((non_null_rows, name)) =
61-
take_optimizable_column_and_table_count(&**expr, &stats)
62-
{
63-
projections.push((expressions::lit(non_null_rows), name.to_owned()));
64-
} else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) {
65-
projections.push((expressions::lit(min), name.to_owned()));
66-
} else if let Some((max, name)) = take_optimizable_max(&**expr, &stats) {
67-
projections.push((expressions::lit(max), name.to_owned()));
68-
} else {
69-
// TODO: we need all aggr_expr to be resolved (cf TODO fullres)
70-
break;
71-
}
72-
}
73-
74-
// TODO fullres: use statistics even if not all aggr_expr could be resolved
75-
if projections.len() == partial_agg_exec.aggr_expr().len() {
76-
// input can be entirely removed
77-
Ok(Arc::new(ProjectionExec::try_new(
78-
projections,
79-
Arc::new(PlaceholderRowExec::new(plan.schema())),
80-
)?))
81-
} else {
82-
plan.map_children(|child| {
83-
self.optimize(child, _config).map(Transformed::yes)
84-
})
85-
.data()
86-
}
87-
} else {
88-
plan.map_children(|child| self.optimize(child, _config).map(Transformed::yes))
89-
.data()
90-
}
91-
}
92-
93-
fn name(&self) -> &str {
94-
"aggregate_statistics"
95-
}
96-
97-
/// This rule will change the nullable properties of the schema, disable the schema check.
98-
fn schema_check(&self) -> bool {
99-
false
100-
}
101-
}
102-
103-
/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:
104-
/// - its child (with possible intermediate layers) is a partial `AggregateExec` node
105-
/// - they both have no grouping expression
106-
///
107-
/// If this is the case, return a ref to the partial `AggregateExec`, else `None`.
108-
/// We would have preferred to return a casted ref to AggregateExec but the recursion requires
109-
/// the `ExecutionPlan.children()` method that returns an owned reference.
110-
fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
111-
if let Some(final_agg_exec) = node.as_any().downcast_ref::<AggregateExec>() {
112-
if !final_agg_exec.mode().is_first_stage()
113-
&& final_agg_exec.group_expr().is_empty()
114-
{
115-
let mut child = Arc::clone(final_agg_exec.input());
116-
loop {
117-
if let Some(partial_agg_exec) =
118-
child.as_any().downcast_ref::<AggregateExec>()
119-
{
120-
if partial_agg_exec.mode().is_first_stage()
121-
&& partial_agg_exec.group_expr().is_empty()
122-
&& partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
123-
{
124-
return Some(child);
125-
}
126-
}
127-
if let [childrens_child] = child.children().as_slice() {
128-
child = Arc::clone(childrens_child);
129-
} else {
130-
break;
131-
}
132-
}
133-
}
134-
}
135-
None
136-
}
137-
138-
/// If this agg_expr is a count that can be exactly derived from the statistics, return it.
139-
fn take_optimizable_column_and_table_count(
140-
agg_expr: &dyn AggregateExpr,
141-
stats: &Statistics,
142-
) -> Option<(ScalarValue, String)> {
143-
let col_stats = &stats.column_statistics;
144-
if is_non_distinct_count(agg_expr) {
145-
if let Precision::Exact(num_rows) = stats.num_rows {
146-
let exprs = agg_expr.expressions();
147-
if exprs.len() == 1 {
148-
// TODO optimize with exprs other than Column
149-
if let Some(col_expr) =
150-
exprs[0].as_any().downcast_ref::<expressions::Column>()
151-
{
152-
let current_val = &col_stats[col_expr.index()].null_count;
153-
if let &Precision::Exact(val) = current_val {
154-
return Some((
155-
ScalarValue::Int64(Some((num_rows - val) as i64)),
156-
agg_expr.name().to_string(),
157-
));
158-
}
159-
} else if let Some(lit_expr) =
160-
exprs[0].as_any().downcast_ref::<expressions::Literal>()
161-
{
162-
if lit_expr.value() == &COUNT_STAR_EXPANSION {
163-
return Some((
164-
ScalarValue::Int64(Some(num_rows as i64)),
165-
agg_expr.name().to_string(),
166-
));
167-
}
168-
}
169-
}
170-
}
171-
}
172-
None
173-
}
174-
175-
/// If this agg_expr is a min that is exactly defined in the statistics, return it.
176-
fn take_optimizable_min(
177-
agg_expr: &dyn AggregateExpr,
178-
stats: &Statistics,
179-
) -> Option<(ScalarValue, String)> {
180-
if let Precision::Exact(num_rows) = &stats.num_rows {
181-
match *num_rows {
182-
0 => {
183-
// MIN/MAX with 0 rows is always null
184-
if is_min(agg_expr) {
185-
if let Ok(min_data_type) =
186-
ScalarValue::try_from(agg_expr.field().unwrap().data_type())
187-
{
188-
return Some((min_data_type, agg_expr.name().to_string()));
189-
}
190-
}
191-
}
192-
value if value > 0 => {
193-
let col_stats = &stats.column_statistics;
194-
if is_min(agg_expr) {
195-
let exprs = agg_expr.expressions();
196-
if exprs.len() == 1 {
197-
// TODO optimize with exprs other than Column
198-
if let Some(col_expr) =
199-
exprs[0].as_any().downcast_ref::<expressions::Column>()
200-
{
201-
if let Precision::Exact(val) =
202-
&col_stats[col_expr.index()].min_value
203-
{
204-
if !val.is_null() {
205-
return Some((
206-
val.clone(),
207-
agg_expr.name().to_string(),
208-
));
209-
}
210-
}
211-
}
212-
}
213-
}
214-
}
215-
_ => {}
216-
}
217-
}
218-
None
219-
}
220-
221-
/// If this agg_expr is a max that is exactly defined in the statistics, return it.
222-
fn take_optimizable_max(
223-
agg_expr: &dyn AggregateExpr,
224-
stats: &Statistics,
225-
) -> Option<(ScalarValue, String)> {
226-
if let Precision::Exact(num_rows) = &stats.num_rows {
227-
match *num_rows {
228-
0 => {
229-
// MIN/MAX with 0 rows is always null
230-
if is_max(agg_expr) {
231-
if let Ok(max_data_type) =
232-
ScalarValue::try_from(agg_expr.field().unwrap().data_type())
233-
{
234-
return Some((max_data_type, agg_expr.name().to_string()));
235-
}
236-
}
237-
}
238-
value if value > 0 => {
239-
let col_stats = &stats.column_statistics;
240-
if is_max(agg_expr) {
241-
let exprs = agg_expr.expressions();
242-
if exprs.len() == 1 {
243-
// TODO optimize with exprs other than Column
244-
if let Some(col_expr) =
245-
exprs[0].as_any().downcast_ref::<expressions::Column>()
246-
{
247-
if let Precision::Exact(val) =
248-
&col_stats[col_expr.index()].max_value
249-
{
250-
if !val.is_null() {
251-
return Some((
252-
val.clone(),
253-
agg_expr.name().to_string(),
254-
));
255-
}
256-
}
257-
}
258-
}
259-
}
260-
}
261-
_ => {}
262-
}
263-
}
264-
None
265-
}
266-
267-
// TODO: Move this check into AggregateUDFImpl
268-
// https://github.com/apache/datafusion/issues/11153
269-
fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool {
270-
if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
271-
if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {
272-
return true;
273-
}
274-
}
275-
false
276-
}
277-
278-
// TODO: Move this check into AggregateUDFImpl
279-
// https://github.com/apache/datafusion/issues/11153
280-
fn is_min(agg_expr: &dyn AggregateExpr) -> bool {
281-
if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
282-
if agg_expr.fun().name().to_lowercase() == "min" {
283-
return true;
284-
}
285-
}
286-
false
287-
}
288-
289-
// TODO: Move this check into AggregateUDFImpl
290-
// https://github.com/apache/datafusion/issues/11153
291-
fn is_max(agg_expr: &dyn AggregateExpr) -> bool {
292-
if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
293-
if agg_expr.fun().name().to_lowercase() == "max" {
294-
return true;
295-
}
296-
}
297-
false
298-
}
299-
30019
#[cfg(test)]
30120
pub(crate) mod tests {
302-
use super::*;
30321

22+
use datafusion_common::config::ConfigOptions;
23+
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
24+
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
25+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
26+
use datafusion_physical_plan::aggregates::AggregateExec;
27+
use datafusion_physical_plan::projection::ProjectionExec;
28+
use datafusion_physical_plan::ExecutionPlan;
29+
use std::sync::Arc;
30+
31+
use crate::error::Result;
30432
use crate::logical_expr::Operator;
30533
use crate::physical_plan::aggregates::PhysicalGroupBy;
30634
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -314,8 +42,8 @@ pub(crate) mod tests {
31442
use arrow::record_batch::RecordBatch;
31543
use datafusion_common::cast::as_int64_array;
31644
use datafusion_functions_aggregate::count::count_udaf;
317-
use datafusion_physical_expr::expressions::cast;
318-
use datafusion_physical_expr::PhysicalExpr;
45+
use datafusion_physical_expr::expressions::{self, cast};
46+
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
31947
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
32048
use datafusion_physical_plan::aggregates::AggregateMode;
32149

datafusion/physical-optimizer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ workspace = true
3434
[dependencies]
3535
datafusion-common = { workspace = true, default-features = true }
3636
datafusion-execution = { workspace = true }
37+
datafusion-expr = { workspace = true }
3738
datafusion-physical-expr = { workspace = true }
3839
datafusion-physical-plan = { workspace = true }

0 commit comments

Comments
 (0)