Skip to content

Commit 47edf4b

Browse files
committed
[SPARK-51213][SQL] Keep Expression class info when resolving hint parameters
### What changes were proposed in this pull request? Currently, the expression class info is explicitly erased when resolving hint parameters, this PR undo this action to keep the class info, so that it can be used in error handling for better and consistent representation in error messages. ### Why are the changes needed? code refactoring and error improvement. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests added ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#49950 from yaooqinn/SPARK-51213. Authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent 7992a2f commit 47edf4b

File tree

5 files changed

+21
-20
lines changed

5 files changed

+21
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.internal.{Logging, MDC}
2121
import org.apache.spark.internal.LogKeys.{QUERY_HINT, RELATION_NAME, UNSUPPORTED_HINT_REASON}
22+
import org.apache.spark.sql.catalyst.expressions.Expression
2223
import org.apache.spark.sql.catalyst.plans.logical.{HintErrorHandler, HintInfo}
2324

2425
/**
@@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{HintErrorHandler, HintInfo}
2728
object HintErrorLogger extends HintErrorHandler with Logging {
2829
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2930

30-
override def hintNotRecognized(name: String, parameters: Seq[Any]): Unit = {
31+
override def hintNotRecognized(name: String, parameters: Seq[Expression]): Unit = {
3132
logWarning(log"Unrecognized hint: " +
3233
log"${MDC(QUERY_HINT, hintToPrettyString(name, parameters))}")
3334
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

+9-13
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ object ResolveHints {
174174
* COALESCE Hint accepts names "COALESCE", "REPARTITION", and "REPARTITION_BY_RANGE".
175175
*/
176176
object ResolveCoalesceHints extends Rule[LogicalPlan] {
177-
private def getNumOfPartitions(hint: UnresolvedHint): (Option[Int], Seq[Any]) = {
177+
private def getNumOfPartitions(hint: UnresolvedHint): (Option[Int], Seq[Expression]) = {
178178
hint.parameters match {
179179
case Seq(ByteLiteral(numPartitions), _*) =>
180180
(Some(numPartitions.toInt), hint.parameters.tail)
@@ -185,7 +185,7 @@ object ResolveHints {
185185
}
186186
}
187187

188-
private def validateParameters(hint: String, parms: Seq[Any]): Unit = {
188+
private def validateParameters(hint: String, parms: Seq[Expression]): Unit = {
189189
val invalidParams = parms.filter(!_.isInstanceOf[UnresolvedAttribute])
190190
if (invalidParams.nonEmpty) {
191191
val hintName = hint.toUpperCase(Locale.ROOT)
@@ -198,18 +198,16 @@ object ResolveHints {
198198
* The "COALESCE" hint only has a partition number as a parameter. The "REPARTITION" hint
199199
* has a partition number, columns, or both of them as parameters.
200200
*/
201-
private def createRepartition(
202-
shuffle: Boolean, hint: UnresolvedHint): LogicalPlan = {
201+
private def createRepartition(shuffle: Boolean, hint: UnresolvedHint): LogicalPlan = {
203202

204203
def createRepartitionByExpression(
205-
numPartitions: Option[Int], partitionExprs: Seq[Any]): RepartitionByExpression = {
204+
numPartitions: Option[Int], partitionExprs: Seq[Expression]): RepartitionByExpression = {
206205
val sortOrders = partitionExprs.filter(_.isInstanceOf[SortOrder])
207206
if (sortOrders.nonEmpty) {
208207
throw QueryCompilationErrors.invalidRepartitionExpressionsError(sortOrders)
209208
}
210209
validateParameters(hint.name, partitionExprs)
211-
RepartitionByExpression(
212-
partitionExprs.map(_.asInstanceOf[Expression]), hint.child, numPartitions)
210+
RepartitionByExpression(partitionExprs, hint.child, numPartitions)
213211
}
214212

215213
getNumOfPartitions(hint) match {
@@ -232,7 +230,7 @@ object ResolveHints {
232230
*/
233231
private def createRepartitionByRange(hint: UnresolvedHint): RepartitionByExpression = {
234232
def createRepartitionByExpression(
235-
numPartitions: Option[Int], partitionExprs: Seq[Any]): RepartitionByExpression = {
233+
numPartitions: Option[Int], partitionExprs: Seq[Expression]): RepartitionByExpression = {
236234
validateParameters(hint.name, partitionExprs)
237235
val sortOrder = partitionExprs.map {
238236
case expr: SortOrder => expr
@@ -251,12 +249,10 @@ object ResolveHints {
251249

252250
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
253251
def createRebalancePartitions(
254-
partitionExprs: Seq[Any], initialNumPartitions: Option[Int]): RebalancePartitions = {
252+
partitionExprs: Seq[Expression],
253+
initialNumPartitions: Option[Int]): RebalancePartitions = {
255254
validateParameters(hint.name, partitionExprs)
256-
RebalancePartitions(
257-
partitionExprs.map(_.asInstanceOf[Expression]),
258-
hint.child,
259-
initialNumPartitions)
255+
RebalancePartitions(partitionExprs, hint.child, initialNumPartitions)
260256
}
261257

262258
getNumOfPartitions(hint) match {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ trait HintErrorHandler {
213213
* @param name the unrecognized hint name
214214
* @param parameters the hint parameters
215215
*/
216-
def hintNotRecognized(name: String, parameters: Seq[Any]): Unit
216+
def hintNotRecognized(name: String, parameters: Seq[Expression]): Unit
217217

218218
/**
219219
* Callback for relation names specified in a hint that cannot be associated with any relation

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -949,21 +949,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
949949
messageParameters = Map.empty)
950950
}
951951

952-
def joinStrategyHintParameterNotSupportedError(unsupported: Any): Throwable = {
952+
def joinStrategyHintParameterNotSupportedError(unsupported: Expression): Throwable = {
953953
new AnalysisException(
954954
errorClass = "_LEGACY_ERROR_TEMP_1046",
955955
messageParameters = Map(
956-
"unsupported" -> unsupported.toString,
956+
"unsupported" -> toSQLExpr(unsupported),
957957
"class" -> unsupported.getClass.toString))
958958
}
959959

960960
def invalidHintParameterError(
961-
hintName: String, invalidParams: Seq[Any]): Throwable = {
961+
hintName: String, invalidParams: Seq[Expression]): Throwable = {
962962
new AnalysisException(
963963
errorClass = "_LEGACY_ERROR_TEMP_1047",
964964
messageParameters = Map(
965965
"hintName" -> hintName,
966-
"invalidParams" -> invalidParams.mkString(", ")))
966+
"invalidParams" -> invalidParams.map(toSQLExpr).mkString(", ")))
967967
}
968968

969969
def invalidCoalesceHintParameterError(hintName: String): Throwable = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -347,14 +347,18 @@ class ResolveHintsSuite extends AnalysisTest {
347347
}
348348

349349
val msg = "REBALANCE Hint parameters should include an optional integral partitionNum " +
350-
"and/or columns, but 1 can not be recognized as either partitionNum or columns."
350+
"and/or columns, but \"1\" can not be recognized as either partitionNum or columns."
351351
assertAnalysisError(
352352
UnresolvedHint("REBALANCE", Seq(Literal(1), Literal(1)), table("TaBlE")),
353353
Seq(msg))
354354

355355
assertAnalysisError(
356356
UnresolvedHint("REBALANCE", Seq(1, Literal(1)), table("TaBlE")),
357357
Seq(msg))
358+
359+
assertAnalysisError(
360+
UnresolvedHint("REBALANCE", Seq(1, Literal(Array[Byte](0, 1, 3))), table("TaBlE")),
361+
Seq("X'000103'"))
358362
}
359363

360364
test("SPARK-38410: Support specify initial partition number for rebalance") {

0 commit comments

Comments
 (0)