Skip to content

fix: Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR hack #1736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b3cf832
Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR config
andygrove May 13, 2025
cdf0999
upmerge
andygrove May 30, 2025
81b3384
format
andygrove May 30, 2025
8626f51
Specify -Dsbt.log.noformat=true in sbt CI runs
andygrove May 30, 2025
dfe2d47
Merge branch 'sbt-log-noformat' into remove-shuffle-fallback-hack
andygrove May 30, 2025
a570f58
Merge remote-tracking branch 'apache/main' into remove-shuffle-fallba…
andygrove Jun 2, 2025
628a623
Merge remote-tracking branch 'apache/main' into remove-shuffle-fallba…
andygrove Jun 3, 2025
dab5fbf
update test
andygrove Jun 3, 2025
8dc4e9b
Fix shuffle writing rows containing null struct fields
Kontinuation Jun 4, 2025
0fcd7eb
Ignore miri for test_append_null_struct_field_to_struct_builder
Kontinuation Jun 4, 2025
f69a4c6
Merge remote-tracking branch 'kontinuation/fix-appending-null-rows' i…
andygrove Jun 4, 2025
b867d3e
Merge remote-tracking branch 'apache/main' into remove-shuffle-fallba…
andygrove Jun 4, 2025
2cc00c2
ignore test for 3.5.5
andygrove Jun 4, 2025
0fda09b
update docs
andygrove Jun 4, 2025
c4219cd
3.4.3
andygrove Jun 4, 2025
eb69d50
3.5.4
andygrove Jun 4, 2025
6f90a2b
4.0.0-preview1
andygrove Jun 4, 2025
25fcccf
fall back to Spark for window ranges
andygrove Jun 5, 2025
baad6df
Merge branch 'window-int-long' into remove-shuffle-fallback-hack
andygrove Jun 5, 2025
1f25095
Merge branch 'count-distinct-nan-in-aggregates' into remove-shuffle-f…
andygrove Jun 5, 2025
d7b09ed
Merge remote-tracking branch 'apache/main' into remove-shuffle-fallba…
andygrove Jun 5, 2025
cd03a1c
ignore DPP test with 3.5.5
andygrove Jun 5, 2025
a79b445
fix
andygrove Jun 5, 2025
3bf0e72
fix
andygrove Jun 5, 2025
4d7919c
fix
andygrove Jun 5, 2025
5aca868
disable some tests temporarily
andygrove Jun 5, 2025
a0f70f1
ignore some tests
andygrove Jun 5, 2025
c5a1153
update tests
andygrove Jun 5, 2025
f17601c
update tests
andygrove Jun 5, 2025
18a993e
trigger ci
andygrove Jun 5, 2025
f91b06e
upmerge
andygrove Jun 5, 2025
57b5ffb
debug
andygrove Jun 5, 2025
09519e7
debug
andygrove Jun 5, 2025
019bcd4
fmt
andygrove Jun 6, 2025
d054f6b
fix?
andygrove Jun 6, 2025
ccd710c
format
andygrove Jun 6, 2025
01c43aa
fix?
andygrove Jun 6, 2025
6d7f893
Merge remote-tracking branch 'apache/main' into remove-shuffle-fallba…
andygrove Jun 6, 2025
28787ed
upmerge and remove one change
andygrove Jun 6, 2025
ab127fa
format
andygrove Jun 6, 2025
940a046
Update 3.5.5 diff
andygrove Jun 6, 2025
89ce244
Update 3.4.3 diff
andygrove Jun 6, 2025
e4d57ce
Update 3.5.4 diff
andygrove Jun 6, 2025
980d175
Update 3.4.3 diff
andygrove Jun 6, 2025
97a19b9
Update 4.0.0-preview1 diff
andygrove Jun 6, 2025
469b78b
ignore wholestagecodegen tests
andygrove Jun 6, 2025
99f6cb9
ignore wholestagecodegen tests
andygrove Jun 6, 2025
fe8b887
enable tests for 3.4.3
andygrove Jun 6, 2025
eaf2768
enable tests for 3.4.3
andygrove Jun 6, 2025
3c2d06b
3.5.4
andygrove Jun 6, 2025
30c962c
4.0.0
andygrove Jun 6, 2025
2b416d3
prep for review
andygrove Jun 6, 2025
0968c29
fix 3.4.3 diff
andygrove Jun 6, 2025
3f6d03d
fix 3.5.4 diff
andygrove Jun 7, 2025
c105513
Merge remote-tracking branch 'apache/main' into remove-shuffle-fallba…
andygrove Jun 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("auto")

val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
.doc("Whether to try falling back to columnar shuffle when native shuffle is not supported")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
107 changes: 94 additions & 13 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,44 @@ index a9f69ab28a1..5d9d4f2cb83 100644
withTable("tbl") {
sql(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 433b4741979..07148eee480 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral}
import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}

def isShuffleExecByRequirement(
- plan: ShuffleExchangeExec,
+ plan: ShuffleExchangeLike,
desiredClusterColumns: Seq[String]): Boolean = plan match {
case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) =>
partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
case _ => false
}

@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
case w: WindowExec =>
w.child.exists {
- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2"))
+ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2"))
case _ => false
}
case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index daef11ae4d6..9f3cc9181f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Expand All @@ -386,7 +424,7 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..cc5224af735 100644
index f33432ddb6f..1925aac8d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -417,17 +455,37 @@ index f33432ddb6f..cc5224af735 100644
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("avoid reordering broadcast join keys to match input hash partitioning") {
+ test("avoid reordering broadcast join keys to match input hash partitioning",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -611,7 +669,7 @@ index 1792b4c32eb..1616e6f39bd 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7f062bfb899..b347ef905d2 100644
index 7f062bfb899..0ed85486e80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -707,7 +765,7 @@ index 7f062bfb899..b347ef905d2 100644
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
@@ -1282,18 +1292,25 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1282,18 +1292,26 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}

// Test shuffled hash join
Expand All @@ -722,6 +780,7 @@ index 7f062bfb899..b347ef905d2 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true
+ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)

Expand All @@ -735,7 +794,7 @@ index 7f062bfb899..b347ef905d2 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
@@ -1341,7 +1358,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1341,7 +1359,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
// Have shuffle before aggregation
Expand All @@ -745,7 +804,7 @@ index 7f062bfb899..b347ef905d2 100644
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1370,9 +1388,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1370,9 +1389,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -760,7 +819,7 @@ index 7f062bfb899..b347ef905d2 100644
}

// Test output ordering is not preserved
@@ -1381,9 +1402,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1381,9 +1403,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -775,7 +834,7 @@ index 7f062bfb899..b347ef905d2 100644
}

// Test singe partition
@@ -1393,7 +1417,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1393,7 +1418,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
Expand All @@ -785,7 +844,7 @@ index 7f062bfb899..b347ef905d2 100644
checkAnswer(fullJoinDF, Row(100))
}
}
@@ -1438,6 +1463,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1438,6 +1464,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
Expand All @@ -795,7 +854,7 @@ index 7f062bfb899..b347ef905d2 100644
}.size == 1)
}
}
@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1482,14 +1511,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -818,7 +877,7 @@ index 7f062bfb899..b347ef905d2 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1605,7 +1640,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down Expand Up @@ -1280,6 +1339,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.TestUtils.assertSpilled
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
import org.apache.spark.sql.test.SharedSparkSession

@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession {
Row(1, 3, null) :: Row(2, null, 4) :: Nil)
}

- test("test with low buffer spill threshold") {
+ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..ab7baf434a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
Expand Down
Loading
Loading