Skip to content

Commit 1fc9d7d

Browse files
vladimirg-dbMaxGekk
authored andcommitted
[SPARK-50982][SQL] Support more SQL/DataFrame read path functionality in single-pass Analyzer
### What changes were proposed in this pull request? Support more SQL/DataFrame read path functionality in single-pass Analyzer: - Most of name resolution - Views - CTEs - UNIONs - Global aggregates - Most of the functions - LCAs in Project - LIMIT - Subtree resolution in extensions - Expression ID assignment - Generic type coercion Also, remove `TracksResolvedNodes`, because it's based on comparing object addresses, which doesn't always make sense in Catalyst, because Catalyst reuses objects (e.g. literals or local/global limit expression trees). ### Why are the changes needed? To replace fixed-point Analyzer in Spark with a single-pass one. ### Does this PR introduce _any_ user-facing change? No, single-pass Analyzer is still disabled. ### How was this patch tested? - New test suites - Dual running two Analyzers and comparing logical plans with `ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER`. ### Was this patch authored or co-authored using generative AI tooling? Yes, copilot. Closes apache#49658 from vladimirg-db/vladimirg-db/single-pass-analyzer/more-functionality. Authored-by: Vladimir Golubev <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent 16d007f commit 1fc9d7d

File tree

62 files changed

+5427
-1768
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+5427
-1768
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
22+
package object resolver {
23+
24+
type LogicalPlanResolver = TreeNodeResolver[LogicalPlan, LogicalPlan]
25+
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis.resolver
19+
20+
import org.apache.spark.sql.catalyst.analysis.{
21+
AnalysisErrorAt,
22+
AnsiTypeCoercion,
23+
CollationTypeCoercion,
24+
TypeCoercion
25+
}
26+
import org.apache.spark.sql.catalyst.expressions.Expression
27+
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
28+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Project}
29+
30+
/**
31+
* A resolver for [[AggregateExpression]]s which are introduced while resolving an
32+
* [[UnresolvedFunction]]. It is responsible for the following:
33+
* - Handling of the exceptions related to [[AggregateExpressions]].
34+
* - Updating the [[ExpressionResolver.expressionResolutionContextStack]].
35+
* - Applying type coercion rules to the [[AggregateExpressions]]s children. This is the only
36+
* resolution that we apply here as we already resolved the children of [[AggregateExpression]]
37+
* in the [[FunctionResolver]].
38+
*/
39+
class AggregateExpressionResolver(
40+
expressionResolver: ExpressionResolver,
41+
timezoneAwareExpressionResolver: TimezoneAwareExpressionResolver)
42+
extends TreeNodeResolver[AggregateExpression, Expression]
43+
with ResolvesExpressionChildren {
44+
private val typeCoercionTransformations: Seq[Expression => Expression] =
45+
if (conf.ansiEnabled) {
46+
AggregateExpressionResolver.ANSI_TYPE_COERCION_TRANSFORMATIONS
47+
} else {
48+
AggregateExpressionResolver.TYPE_COERCION_TRANSFORMATIONS
49+
}
50+
51+
private val typeCoercionResolver: TypeCoercionResolver =
52+
new TypeCoercionResolver(timezoneAwareExpressionResolver, typeCoercionTransformations)
53+
54+
private val expressionResolutionContextStack =
55+
expressionResolver.getExpressionResolutionContextStack
56+
57+
/**
58+
* Resolves the given [[AggregateExpression]] by applying:
59+
* - Type coercion rules
60+
* - Validity checks. Those include:
61+
* - Whether the [[AggregateExpression]] is under a valid operator.
62+
* - Whether there is a nested [[AggregateExpression]].
63+
* - Whether there is a nondeterministic child.
64+
* - Updates to the [[ExpressionResolver.expressionResolutionContextStack]]
65+
*/
66+
override def resolve(aggregateExpression: AggregateExpression): Expression = {
67+
val aggregateExpressionWithTypeCoercion =
68+
withResolvedChildren(aggregateExpression, typeCoercionResolver.resolve)
69+
70+
throwIfNotUnderValidOperator(aggregateExpression)
71+
throwIfNestedAggregateExists(aggregateExpressionWithTypeCoercion)
72+
throwIfHasNondeterministicChildren(aggregateExpressionWithTypeCoercion)
73+
74+
expressionResolutionContextStack
75+
.peek()
76+
.hasAggregateExpressionsInASubtree = true
77+
78+
// There are two different cases that we handle regarding the value of the flag:
79+
//
80+
// - We have an attribute under an `AggregateExpression`:
81+
// {{{ SELECT COUNT(col1) FROM VALUES (1); }}}
82+
// In this case, value of the `hasAttributeInASubtree` flag should be `false` as it
83+
// indicates whether there is an attribute in the subtree that's not `AggregateExpression`
84+
// so we can throw the `MISSING_GROUP_BY` exception appropriately.
85+
//
86+
// - In the following example:
87+
// {{{ SELECT COUNT(*), col1 + 1 FROM VALUES (1); }}}
88+
// It would be `true` as described above.
89+
expressionResolutionContextStack.peek().hasAttributeInASubtree = false
90+
91+
aggregateExpressionWithTypeCoercion
92+
}
93+
94+
private def throwIfNotUnderValidOperator(aggregateExpression: AggregateExpression): Unit = {
95+
expressionResolver.getParentOperator.get match {
96+
case _: Aggregate | _: Project =>
97+
case filter: Filter =>
98+
filter.failAnalysis(
99+
errorClass = "INVALID_WHERE_CONDITION",
100+
messageParameters = Map(
101+
"condition" -> toSQLExpr(filter.condition),
102+
"expressionList" -> Seq(aggregateExpression).mkString(", ")
103+
)
104+
)
105+
case other =>
106+
other.failAnalysis(
107+
errorClass = "UNSUPPORTED_EXPR_FOR_OPERATOR",
108+
messageParameters = Map(
109+
"invalidExprSqls" -> Seq(aggregateExpression).mkString(", ")
110+
)
111+
)
112+
}
113+
}
114+
115+
private def throwIfNestedAggregateExists(aggregateExpression: AggregateExpression): Unit = {
116+
if (expressionResolutionContextStack
117+
.peek()
118+
.hasAggregateExpressionsInASubtree) {
119+
aggregateExpression.failAnalysis(
120+
errorClass = "NESTED_AGGREGATE_FUNCTION",
121+
messageParameters = Map.empty
122+
)
123+
}
124+
}
125+
126+
private def throwIfHasNondeterministicChildren(aggregateExpression: AggregateExpression): Unit = {
127+
aggregateExpression.aggregateFunction.children.foreach(child => {
128+
if (!child.deterministic) {
129+
child.failAnalysis(
130+
errorClass = "AGGREGATE_FUNCTION_WITH_NONDETERMINISTIC_EXPRESSION",
131+
messageParameters = Map("sqlExpr" -> toSQLExpr(aggregateExpression))
132+
)
133+
}
134+
})
135+
}
136+
}
137+
138+
object AggregateExpressionResolver {
139+
// Ordering in the list of type coercions should be in sync with the list in [[TypeCoercion]].
140+
private val TYPE_COERCION_TRANSFORMATIONS: Seq[Expression => Expression] = Seq(
141+
CollationTypeCoercion.apply,
142+
TypeCoercion.InTypeCoercion.apply,
143+
TypeCoercion.FunctionArgumentTypeCoercion.apply,
144+
TypeCoercion.IfTypeCoercion.apply,
145+
TypeCoercion.ImplicitTypeCoercion.apply
146+
)
147+
148+
// Ordering in the list of type coercions should be in sync with the list in [[AnsiTypeCoercion]].
149+
private val ANSI_TYPE_COERCION_TRANSFORMATIONS: Seq[Expression => Expression] = Seq(
150+
CollationTypeCoercion.apply,
151+
AnsiTypeCoercion.InTypeCoercion.apply,
152+
AnsiTypeCoercion.FunctionArgumentTypeCoercion.apply,
153+
AnsiTypeCoercion.IfTypeCoercion.apply,
154+
AnsiTypeCoercion.ImplicitTypeCoercion.apply
155+
)
156+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala

+33-106
Original file line numberDiff line numberDiff line change
@@ -17,126 +17,53 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

20-
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, UnresolvedAlias}
21-
import org.apache.spark.sql.catalyst.expressions.{
22-
Alias,
23-
Cast,
24-
CreateNamedStruct,
25-
Expression,
26-
NamedExpression
27-
}
20+
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, MultiAlias, UnresolvedAlias}
21+
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}
2822

2923
/**
3024
* Resolver class that resolves unresolved aliases and handles user-specified aliases.
3125
*/
32-
class AliasResolver(expressionResolver: ExpressionResolver, scopes: NameScopeStack)
26+
class AliasResolver(expressionResolver: ExpressionResolver)
3327
extends TreeNodeResolver[UnresolvedAlias, Expression]
3428
with ResolvesExpressionChildren {
29+
private val scopes = expressionResolver.getNameScopes
3530

3631
/**
37-
* Resolves [[UnresolvedAlias]] by handling two specific cases:
38-
* - Alias(CreateNamedStruct(...)) - instead of calling [[CreateNamedStructResolver]] which will
39-
* clean up its inner aliases, we manually resolve [[CreateNamedStruct]]'s children, because we
40-
* need to preserve inner aliases until after the alias name is computed. This is a hack because
41-
* fixed-point analyzer computes [[Alias]] name before removing inner aliases.
42-
* - Alias(...) - recursively call [[ExpressionResolver]] to resolve the child expression.
43-
*
44-
* After the children are resolved, call [[AliasResolution]] to compute the alias name. Finally,
45-
* clean up inner aliases from [[CreateNamedStruct]].
32+
* Resolves [[UnresolvedAlias]] by resolving its child and computing the alias name by calling
33+
* [[AliasResolution]] on the result. After resolving it, we assign a correct exprId to the
34+
* resulting [[Alias]]. Here we allow inner aliases to persist until the end of single-pass
35+
* resolution, after which they will be removed in the post-processing phase.
4636
*/
47-
override def resolve(unresolvedAlias: UnresolvedAlias): NamedExpression = {
48-
val aliasWithResolvedChildren = withResolvedChildren(
49-
unresolvedAlias, {
50-
case createNamedStruct: CreateNamedStruct =>
51-
withResolvedChildren(createNamedStruct, expressionResolver.resolve)
52-
case other => expressionResolver.resolve(other)
53-
}
54-
)
37+
override def resolve(unresolvedAlias: UnresolvedAlias): NamedExpression =
38+
scopes.top.lcaRegistry.withNewLcaScope {
39+
val aliasWithResolvedChildren =
40+
withResolvedChildren(unresolvedAlias, expressionResolver.resolve)
5541

56-
val resolvedAlias =
57-
AliasResolution.resolve(aliasWithResolvedChildren).asInstanceOf[NamedExpression]
42+
val resolvedAlias =
43+
AliasResolution.resolve(aliasWithResolvedChildren).asInstanceOf[NamedExpression]
5844

59-
scopes.top.addAlias(resolvedAlias.name)
60-
AliasResolver.cleanupAliases(resolvedAlias)
61-
}
45+
resolvedAlias match {
46+
case multiAlias: MultiAlias =>
47+
throw new ExplicitlyUnsupportedResolverFeature(
48+
s"unsupported expression: ${multiAlias.getClass.getName}"
49+
)
50+
case alias: Alias =>
51+
expressionResolver.getExpressionIdAssigner
52+
.mapExpression(alias)
53+
.asInstanceOf[Alias]
54+
}
55+
}
6256

6357
/**
64-
* Handle already resolved [[Alias]] nodes, i.e. user-specified aliases. We disallow stacking
65-
* of [[Alias]] nodes by collapsing them so that only the top node remains.
66-
*
67-
* For an example query like:
68-
*
69-
* {{{ SELECT 1 AS a }}}
70-
*
71-
* parsed plan will be:
72-
*
73-
* Project [Alias(1, a)]
74-
* +- OneRowRelation
75-
*
58+
* Handle already resolved [[Alias]] nodes, i.e. user-specified aliases. Here we only need to
59+
* resolve its children and afterwards reassign exprId to the resulting [[Alias]].
7660
*/
7761
def handleResolvedAlias(alias: Alias): Alias = {
78-
val aliasWithResolvedChildren = withResolvedChildren(alias, expressionResolver.resolve)
79-
scopes.top.addAlias(aliasWithResolvedChildren.name)
80-
AliasResolver.collapseAlias(aliasWithResolvedChildren)
81-
}
82-
}
83-
84-
object AliasResolver {
85-
86-
/**
87-
* For a query like:
88-
*
89-
* {{{ SELECT STRUCT(1 AS a, 2 AS b) AS st }}}
90-
*
91-
* After resolving [[CreateNamedStruct]] the plan will be:
92-
* CreateNamedStruct(Seq("a", Alias(1, "a"), "b", Alias(2, "b")))
93-
*
94-
* For a query like:
95-
*
96-
* {{{ df.select($"col1".cast("int").cast("double")) }}}
97-
*
98-
* After resolving top-most [[Alias]] the plan will be:
99-
* Alias(Cast(Alias(Cast(col1, int), col1)), double), col1)
100-
*
101-
* Both examples contain inner aliases that are not expected in the analyzed logical plan,
102-
* therefore need to be removed. However, in both examples inner aliases are necessary in order
103-
* for the outer alias to compute its name. To achieve this, we delay removal of inner aliases
104-
* until after the outer alias name is computed.
105-
*
106-
* For cases where there are no dependencies on inner alias, inner alias should be removed by the
107-
* resolver that produces it.
108-
*/
109-
private def cleanupAliases(namedExpression: NamedExpression): NamedExpression =
110-
namedExpression
111-
.withNewChildren(namedExpression.children.map {
112-
case cast @ Cast(alias: Alias, _, _, _) =>
113-
cast.copy(child = alias.child)
114-
case createNamedStruct: CreateNamedStruct =>
115-
CreateNamedStructResolver.cleanupAliases(createNamedStruct)
116-
case other => other
117-
})
118-
.asInstanceOf[NamedExpression]
119-
120-
/**
121-
* If an [[Alias]] node appears on top of another [[Alias]], remove the bottom one. Here we don't
122-
* handle a case where a node of different type appears between two [[Alias]] nodes: in this
123-
* case, removal of inner alias (if it is unnecessary) should be handled by respective node's
124-
* resolver, in order to preserve the bottom-up contract.
125-
*/
126-
private def collapseAlias(alias: Alias): Alias =
127-
alias.child match {
128-
case innerAlias: Alias =>
129-
val metadata = if (alias.metadata.isEmpty) {
130-
None
131-
} else {
132-
Some(alias.metadata)
133-
}
134-
alias.copy(child = innerAlias.child)(
135-
exprId = alias.exprId,
136-
qualifier = alias.qualifier,
137-
explicitMetadata = metadata,
138-
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
139-
)
140-
case _ => alias
62+
scopes.top.lcaRegistry.withNewLcaScope {
63+
val aliasWithResolvedChildren = withResolvedChildren(alias, expressionResolver.resolve)
64+
expressionResolver.getExpressionIdAssigner
65+
.mapExpression(aliasWithResolvedChildren)
66+
.asInstanceOf[Alias]
14167
}
68+
}
14269
}

0 commit comments

Comments
 (0)