Skip to content

Commit

Permalink
[SPARK-40102][SQL] Use SparkException instead of IllegalStateExceptio…
Browse files Browse the repository at this point in the history
…n in SparkPlan

### What changes were proposed in this pull request?

This pr aims to use SparkException instead of IllegalStateException in SparkPlan, for details, see: apache#37524

### Why are the changes needed?

better error reporting

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes apache#37535 from yikf/sparkplan-IllegalStateException.

Authored-by: yikf <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
yikf authored and MaxGekk committed Aug 16, 2022
1 parent 8439d84 commit a28880f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import org.apache.spark.{broadcast, SparkEnv}
import org.apache.spark.{broadcast, SparkEnv, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd.{RDD, RDDOperationScope}
Expand Down Expand Up @@ -189,7 +189,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
Expand All @@ -202,7 +202,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
doExecuteBroadcast()
}
Expand All @@ -216,7 +216,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
doExecuteColumnar()
}
Expand Down Expand Up @@ -318,7 +318,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* when it is no longer needed. This allows input formats to be able to reuse batches if needed.
*/
protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new IllegalStateException(s"Internal Error ${this.getClass} has column support" +
throw SparkException.internalError(s"Internal Error ${this.getClass} has column support" +
s" mismatch:\n${this}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -34,13 +34,13 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
test("SPARK-21619 execution of a canonicalized plan should fail") {
val plan = spark.range(10).queryExecution.executedPlan.canonicalized

intercept[IllegalStateException] { plan.execute() }
intercept[IllegalStateException] { plan.executeCollect() }
intercept[IllegalStateException] { plan.executeCollectPublic() }
intercept[IllegalStateException] { plan.executeToIterator() }
intercept[IllegalStateException] { plan.executeBroadcast() }
intercept[IllegalStateException] { plan.executeTake(1) }
intercept[IllegalStateException] { plan.executeTail(1) }
intercept[SparkException] { plan.execute() }
intercept[SparkException] { plan.executeCollect() }
intercept[SparkException] { plan.executeCollectPublic() }
intercept[SparkException] { plan.executeToIterator() }
intercept[SparkException] { plan.executeBroadcast() }
intercept[SparkException] { plan.executeTake(1) }
intercept[SparkException] { plan.executeTail(1) }
}

test("SPARK-23731 plans should be canonicalizable after being (de)serialized") {
Expand Down

0 comments on commit a28880f

Please sign in to comment.