19
19
use crate :: optimizer:: { ApplyOrder , ApplyOrder :: BottomUp } ;
20
20
use crate :: { OptimizerConfig , OptimizerRule } ;
21
21
22
- use datafusion_common:: { Column , Result } ;
22
+ use datafusion_common:: tree_node:: Transformed ;
23
+ use datafusion_common:: { internal_err, Column , Result } ;
24
+ use datafusion_expr:: expr_rewriter:: normalize_cols;
23
25
use datafusion_expr:: utils:: expand_wildcard;
24
26
use datafusion_expr:: {
25
27
aggregate_function:: AggregateFunction as AggregateFunctionFunc , col,
@@ -66,20 +68,24 @@ impl ReplaceDistinctWithAggregate {
66
68
}
67
69
68
70
impl OptimizerRule for ReplaceDistinctWithAggregate {
69
- fn try_optimize (
71
+ fn supports_rewrite ( & self ) -> bool {
72
+ true
73
+ }
74
+
75
+ fn rewrite (
70
76
& self ,
71
- plan : & LogicalPlan ,
77
+ plan : LogicalPlan ,
72
78
_config : & dyn OptimizerConfig ,
73
- ) -> Result < Option < LogicalPlan > > {
79
+ ) -> Result < Transformed < LogicalPlan > > {
74
80
match plan {
75
81
LogicalPlan :: Distinct ( Distinct :: All ( input) ) => {
76
- let group_expr = expand_wildcard ( input. schema ( ) , input, None ) ?;
77
- let aggregate = LogicalPlan :: Aggregate ( Aggregate :: try_new (
78
- input. clone ( ) ,
82
+ let group_expr = expand_wildcard ( input. schema ( ) , & input, None ) ?;
83
+ let aggr_plan = LogicalPlan :: Aggregate ( Aggregate :: try_new (
84
+ input,
79
85
group_expr,
80
86
vec ! [ ] ,
81
87
) ?) ;
82
- Ok ( Some ( aggregate ) )
88
+ Ok ( Transformed :: yes ( aggr_plan ) )
83
89
}
84
90
LogicalPlan :: Distinct ( Distinct :: On ( DistinctOn {
85
91
select_expr,
@@ -88,13 +94,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
88
94
input,
89
95
schema,
90
96
} ) ) => {
97
+ let expr_cnt = on_expr. len ( ) ;
98
+
91
99
// Construct the aggregation expression to be used to fetch the selected expressions.
92
100
let aggr_expr = select_expr
93
- . iter ( )
101
+ . into_iter ( )
94
102
. map ( |e| {
95
103
Expr :: AggregateFunction ( AggregateFunction :: new (
96
104
AggregateFunctionFunc :: FirstValue ,
97
- vec ! [ e. clone ( ) ] ,
105
+ vec ! [ e] ,
98
106
false ,
99
107
None ,
100
108
sort_expr. clone ( ) ,
@@ -103,45 +111,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
103
111
} )
104
112
. collect :: < Vec < Expr > > ( ) ;
105
113
114
+ let aggr_expr = normalize_cols ( aggr_expr, input. as_ref ( ) ) ?;
115
+ let group_expr = normalize_cols ( on_expr, input. as_ref ( ) ) ?;
116
+
106
117
// Build the aggregation plan
107
- let plan = LogicalPlanBuilder :: from ( input. as_ref ( ) . clone ( ) )
108
- . aggregate ( on_expr. clone ( ) , aggr_expr. to_vec ( ) ) ?
109
- . build ( ) ?;
118
+ let plan = LogicalPlan :: Aggregate ( Aggregate :: try_new (
119
+ input, group_expr, aggr_expr,
120
+ ) ?) ;
121
+ // TODO use LogicalPlanBuilder directly rather than recreating the Aggregate
122
+ // when https://github.com/apache/datafusion/issues/10485 is available
123
+ let lpb = LogicalPlanBuilder :: from ( plan) ;
110
124
111
- let plan = if let Some ( sort_expr) = sort_expr {
125
+ let plan = if let Some ( mut sort_expr) = sort_expr {
112
126
// While sort expressions were used in the `FIRST_VALUE` aggregation itself above,
113
127
// this on it's own isn't enough to guarantee the proper output order of the grouping
114
128
// (`ON`) expression, so we need to sort those as well.
115
- LogicalPlanBuilder :: from ( plan)
116
- . sort ( sort_expr[ ..on_expr. len ( ) ] . to_vec ( ) ) ?
117
- . build ( ) ?
129
+
130
+ // truncate the sort_expr to the length of on_expr
131
+ sort_expr. truncate ( expr_cnt) ;
132
+
133
+ lpb. sort ( sort_expr) ?. build ( ) ?
118
134
} else {
119
- plan
135
+ lpb . build ( ) ?
120
136
} ;
121
137
122
138
// Whereas the aggregation plan by default outputs both the grouping and the aggregation
123
139
// expressions, for `DISTINCT ON` we only need to emit the original selection expressions.
140
+
124
141
let project_exprs = plan
125
142
. schema ( )
126
143
. iter ( )
127
- . skip ( on_expr . len ( ) )
144
+ . skip ( expr_cnt )
128
145
. zip ( schema. iter ( ) )
129
146
. map ( |( ( new_qualifier, new_field) , ( old_qualifier, old_field) ) | {
130
- Ok ( col ( Column :: from ( ( new_qualifier, new_field) ) )
131
- . alias_qualified ( old_qualifier. cloned ( ) , old_field. name ( ) ) )
147
+ col ( Column :: from ( ( new_qualifier, new_field) ) )
148
+ . alias_qualified ( old_qualifier. cloned ( ) , old_field. name ( ) )
132
149
} )
133
- . collect :: < Result < Vec < Expr > > > ( ) ? ;
150
+ . collect :: < Vec < Expr > > ( ) ;
134
151
135
152
let plan = LogicalPlanBuilder :: from ( plan)
136
153
. project ( project_exprs) ?
137
154
. build ( ) ?;
138
155
139
- Ok ( Some ( plan) )
156
+ Ok ( Transformed :: yes ( plan) )
140
157
}
141
- _ => Ok ( None ) ,
158
+ _ => Ok ( Transformed :: no ( plan ) ) ,
142
159
}
143
160
}
144
161
162
+ fn try_optimize (
163
+ & self ,
164
+ _plan : & LogicalPlan ,
165
+ _config : & dyn OptimizerConfig ,
166
+ ) -> Result < Option < LogicalPlan > > {
167
+ internal_err ! ( "Should have called ReplaceDistinctWithAggregate::rewrite" )
168
+ }
169
+
145
170
fn name ( & self ) -> & str {
146
171
"replace_distinct_aggregate"
147
172
}
0 commit comments