Skip to content

Commit 26b406d

Browse files
Feat: support array_compact function (#1321)
## Which issue does this PR close? Related to Epic: #1042 array_compact: SELECT array_compact(array(1, 2, 3, null)) => array(1, 2, 3) DataFusion' s array_compact has same behavior with Spark 's array_compact function Spark: https://docs.databricks.com/en/sql/language-manual/functions/array_compact.html DataFusion: https://datafusion.apache.org/user-guide/sql/scalar_functions.html#array-remove-all ## Rationale for this change Defined under Epic: #1042 ## What changes are included in this PR? planner.rs: Maps Spark 's arrays_compact function to DataFusion array_remove_all_udf physical expression from Spark physical expression expr.proto: arrays_compact message has been added, QueryPlanSerde.scala: arrays_compact pattern matching case has been added, CometExpressionSuite.scala: A new UT has been added for arrays_compact function. ## How are these changes tested? A new UT has been added.
1 parent 496dbea commit 26b406d

File tree

3 files changed

+50
-3
lines changed

3 files changed

+50
-3
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1992,6 +1992,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
19921992
case _: ArrayIntersect => convert(CometArrayIntersect)
19931993
case _: ArrayJoin => convert(CometArrayJoin)
19941994
case _: ArraysOverlap => convert(CometArraysOverlap)
1995+
case _ @ArrayFilter(_, func) if func.children.head.isInstanceOf[IsNotNull] =>
1996+
convert(CometArrayCompact)
19951997
case _ =>
19961998
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
19971999
None

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression}
23-
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType}
22+
import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression, Literal}
23+
import org.apache.spark.sql.types._
2424

2525
import org.apache.comet.CometSparkSessionExtensions.withInfo
26-
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto}
26+
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto, scalarExprToProtoWithReturnType}
2727
import org.apache.comet.shims.CometExprShim
2828

2929
object CometArrayRemove extends CometExpressionSerde with CometExprShim {
@@ -126,6 +126,31 @@ object CometArraysOverlap extends CometExpressionSerde with IncompatExpr {
126126
}
127127
}
128128

129+
object CometArrayCompact extends CometExpressionSerde with IncompatExpr {
130+
override def convert(
131+
expr: Expression,
132+
inputs: Seq[Attribute],
133+
binding: Boolean): Option[ExprOuterClass.Expr] = {
134+
val child = expr.children.head
135+
val elementType = child.dataType.asInstanceOf[ArrayType].elementType
136+
137+
val arrayExprProto = exprToProto(child, inputs, binding)
138+
val nullLiteralProto = exprToProto(Literal(null, elementType), Seq.empty)
139+
140+
val arrayCompactScalarExpr = scalarExprToProtoWithReturnType(
141+
"array_remove_all",
142+
ArrayType(elementType = elementType),
143+
arrayExprProto,
144+
nullLiteralProto)
145+
arrayCompactScalarExpr match {
146+
case None =>
147+
withInfo(expr, "unsupported arguments for ArrayCompact", expr.children: _*)
148+
None
149+
case expr => expr
150+
}
151+
}
152+
}
153+
129154
object CometArrayJoin extends CometExpressionSerde with IncompatExpr {
130155
override def convert(
131156
expr: Expression,

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,4 +292,24 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
292292
}
293293
}
294294

295+
test("array_compact") {
296+
assume(isSpark34Plus)
297+
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
298+
Seq(true, false).foreach { dictionaryEnabled =>
299+
withTempDir { dir =>
300+
val path = new Path(dir.toURI.toString, "test.parquet")
301+
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, n = 10000)
302+
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
303+
304+
checkSparkAnswerAndOperator(
305+
sql("SELECT array_compact(array(_2)) FROM t1 WHERE _2 IS NULL"))
306+
checkSparkAnswerAndOperator(
307+
sql("SELECT array_compact(array(_2)) FROM t1 WHERE _2 IS NOT NULL"))
308+
checkSparkAnswerAndOperator(
309+
sql("SELECT array_compact(array(_2, _3, null)) FROM t1 WHERE _2 IS NOT NULL"))
310+
}
311+
}
312+
}
313+
}
314+
295315
}

0 commit comments

Comments
 (0)