Skip to content

Commit a50aeef

Browse files
authored
Fix wildcard expansion for HAVING clause (#12046)
* fix the wildcard expand for filter plan * expand the wildcard for the error message * add the tests * fix recompute_schema * fix clippy * cargo fmt * change the check for having clause * rename the function and moving the tests * fix check * expand the schema for aggregate plan * reduce the time to expand wildcard * clean the testing table after tested * fmt and address review * stop expand wildcard and add more check for group-by and selects * simplify the having check
1 parent b2ac83f commit a50aeef

File tree

8 files changed

+157
-15
lines changed

8 files changed

+157
-15
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,14 @@ impl LogicalPlanBuilder {
385385
.map(Self::from)
386386
}
387387

388+
/// Apply a filter which is used for a having clause
389+
pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
390+
let expr = normalize_col(expr.into(), &self.plan)?;
391+
Filter::try_new_with_having(expr, Arc::new(self.plan))
392+
.map(LogicalPlan::Filter)
393+
.map(Self::from)
394+
}
395+
388396
/// Make a builder for a prepare logical plan from the builder's plan
389397
pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
390398
Ok(Self::from(LogicalPlan::Prepare(Prepare {

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -643,9 +643,12 @@ impl LogicalPlan {
643643
// todo it isn't clear why the schema is not recomputed here
644644
Ok(LogicalPlan::Values(Values { schema, values }))
645645
}
646-
LogicalPlan::Filter(Filter { predicate, input }) => {
647-
Filter::try_new(predicate, input).map(LogicalPlan::Filter)
648-
}
646+
LogicalPlan::Filter(Filter {
647+
predicate,
648+
input,
649+
having,
650+
}) => Filter::try_new_internal(predicate, input, having)
651+
.map(LogicalPlan::Filter),
649652
LogicalPlan::Repartition(_) => Ok(self),
650653
LogicalPlan::Window(Window {
651654
input,
@@ -2080,6 +2083,8 @@ pub struct Filter {
20802083
pub predicate: Expr,
20812084
/// The incoming logical plan
20822085
pub input: Arc<LogicalPlan>,
2086+
/// The flag to indicate if the filter is a having clause
2087+
pub having: bool,
20832088
}
20842089

20852090
impl Filter {
@@ -2088,6 +2093,20 @@ impl Filter {
20882093
/// Notes: as Aliases have no effect on the output of a filter operator,
20892094
/// they are removed from the predicate expression.
20902095
pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2096+
Self::try_new_internal(predicate, input, false)
2097+
}
2098+
2099+
/// Create a new filter operator for a having clause.
2100+
/// This is similar to a filter, but its having flag is set to true.
2101+
pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2102+
Self::try_new_internal(predicate, input, true)
2103+
}
2104+
2105+
fn try_new_internal(
2106+
predicate: Expr,
2107+
input: Arc<LogicalPlan>,
2108+
having: bool,
2109+
) -> Result<Self> {
20912110
// Filter predicates must return a boolean value so we try and validate that here.
20922111
// Note that it is not always possible to resolve the predicate expression during plan
20932112
// construction (such as with correlated subqueries) so we make a best effort here and
@@ -2104,6 +2123,7 @@ impl Filter {
21042123
Ok(Self {
21052124
predicate: predicate.unalias_nested().data,
21062125
input,
2126+
having,
21072127
})
21082128
}
21092129

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,17 @@ impl TreeNode for LogicalPlan {
8787
schema,
8888
})
8989
}),
90-
LogicalPlan::Filter(Filter { predicate, input }) => rewrite_arc(input, f)?
91-
.update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
90+
LogicalPlan::Filter(Filter {
91+
predicate,
92+
input,
93+
having,
94+
}) => rewrite_arc(input, f)?.update_data(|input| {
95+
LogicalPlan::Filter(Filter {
96+
predicate,
97+
input,
98+
having,
99+
})
100+
}),
92101
LogicalPlan::Repartition(Repartition {
93102
input,
94103
partitioning_scheme,
@@ -561,10 +570,17 @@ impl LogicalPlan {
561570
value.into_iter().map_until_stop_and_collect(&mut f)
562571
})?
563572
.update_data(|values| LogicalPlan::Values(Values { schema, values })),
564-
LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
565-
.update_data(|predicate| {
566-
LogicalPlan::Filter(Filter { predicate, input })
567-
}),
573+
LogicalPlan::Filter(Filter {
574+
predicate,
575+
input,
576+
having,
577+
}) => f(predicate)?.update_data(|predicate| {
578+
LogicalPlan::Filter(Filter {
579+
predicate,
580+
input,
581+
having,
582+
})
583+
}),
568584
LogicalPlan::Repartition(Repartition {
569585
input,
570586
partitioning_scheme,

datafusion/expr/src/utils.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,15 @@ pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan {
804804
match input {
805805
LogicalPlan::Window(window) => find_base_plan(&window.input),
806806
LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input),
807+
LogicalPlan::Filter(filter) => {
808+
if filter.having {
809+
// If a filter is used for a having clause, its input plan is an aggregation.
810+
// We should expand the wildcard expression based on the aggregation's input plan.
811+
find_base_plan(&filter.input)
812+
} else {
813+
input
814+
}
815+
}
807816
_ => input,
808817
}
809818
}

datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,13 @@ fn replace_columns(
160160
mod tests {
161161
use arrow::datatypes::{DataType, Field, Schema};
162162

163+
use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
164+
use crate::Analyzer;
163165
use datafusion_common::{JoinType, TableReference};
164166
use datafusion_expr::{
165167
col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder,
166168
};
167169

168-
use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
169-
use crate::Analyzer;
170-
171170
use super::*;
172171

173172
fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {

datafusion/sql/src/select.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
215215

216216
let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr {
217217
LogicalPlanBuilder::from(plan)
218-
.filter(having_expr_post_aggr)?
218+
.having(having_expr_post_aggr)?
219219
.build()?
220220
} else {
221221
plan

datafusion/sql/src/utils.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
//! SQL Utility Functions
1919
20-
use std::collections::HashMap;
21-
2220
use arrow_schema::{
2321
DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE,
2422
};
@@ -33,6 +31,7 @@ use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction};
3331
use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
3432
use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan};
3533
use sqlparser::ast::{Ident, Value};
34+
use std::collections::HashMap;
3635

3736
/// Make a best-effort attempt at resolving all columns in the expression tree
3837
pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5655,6 +5655,97 @@ select count(null), min(null), max(null), bit_and(NULL), bit_or(NULL), bit_xor(N
56555655
----
56565656
0 NULL NULL NULL NULL NULL NULL NULL
56575657

5658+
statement ok
5659+
create table having_test(v1 int, v2 int)
5660+
5661+
statement ok
5662+
create table join_table(v1 int, v2 int)
5663+
5664+
statement ok
5665+
insert into having_test values (1, 2), (2, 3), (3, 4)
5666+
5667+
statement ok
5668+
insert into join_table values (1, 2), (2, 3), (3, 4)
5669+
5670+
5671+
query II
5672+
select * from having_test group by v1, v2 having max(v1) = 3
5673+
----
5674+
3 4
5675+
5676+
query TT
5677+
EXPLAIN select * from having_test group by v1, v2 having max(v1) = 3
5678+
----
5679+
logical_plan
5680+
01)Projection: having_test.v1, having_test.v2
5681+
02)--Filter: max(having_test.v1) = Int32(3)
5682+
03)----Aggregate: groupBy=[[having_test.v1, having_test.v2]], aggr=[[max(having_test.v1)]]
5683+
04)------TableScan: having_test projection=[v1, v2]
5684+
physical_plan
5685+
01)ProjectionExec: expr=[v1@0 as v1, v2@1 as v2]
5686+
02)--CoalesceBatchesExec: target_batch_size=8192
5687+
03)----FilterExec: max(having_test.v1)@2 = 3
5688+
04)------AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
5689+
05)--------CoalesceBatchesExec: target_batch_size=8192
5690+
06)----------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=4
5691+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
5692+
08)--------------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
5693+
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
5694+
5695+
5696+
query error
5697+
select * from having_test having max(v1) = 3
5698+
5699+
query I
5700+
select max(v1) from having_test having max(v1) = 3
5701+
----
5702+
3
5703+
5704+
query I
5705+
select max(v1), * exclude (v1, v2) from having_test having max(v1) = 3
5706+
----
5707+
3
5708+
5709+
# because v1, v2 is not in the group by clause, the sql is invalid
5710+
query III
5711+
select max(v1), * replace ('v1' as v3) from having_test group by v1, v2 having max(v1) = 3
5712+
----
5713+
3 3 4
5714+
5715+
query III
5716+
select max(v1), t.* from having_test t group by v1, v2 having max(v1) = 3
5717+
----
5718+
3 3 4
5719+
5720+
# j.* should also be included in the group-by clause
5721+
query error
5722+
select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by t.v1, t.v2 having max(t.v1) = 3
5723+
5724+
query III
5725+
select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by j.v1, j.v2 having max(t.v1) = 3
5726+
----
5727+
3 3 4
5728+
5729+
# If the select items only contain scalar expressions, the having clause is valid.
5730+
query P
5731+
select now() from having_test having max(v1) = 4
5732+
----
5733+
5734+
# If the select items only contain scalar expressions, the having clause is valid.
5735+
query I
5736+
select 0 from having_test having max(v1) = 4
5737+
----
5738+
5739+
# v2 should also be included in group-by clause
5740+
query error
5741+
select * from having_test group by v1 having max(v1) = 3
5742+
5743+
statement ok
5744+
drop table having_test
5745+
5746+
statement ok
5747+
drop table join_table
5748+
56585749
# test min/max Float16 without group expression
56595750
query RRTT
56605751
WITH data AS (

0 commit comments

Comments
 (0)