Skip to content

Commit 32b78fd

Browse files
Support arrays_overlap function
1 parent 517c255 commit 32b78fd

File tree

4 files changed

+40
-0
lines changed

4 files changed

+40
-0
lines changed

native/core/src/execution/planner.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use datafusion::{
6565
prelude::SessionContext,
6666
};
6767
use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr};
68+
use datafusion_functions_nested::array_has::array_has_any_udf;
6869
use datafusion_functions_nested::concat::ArrayAppend;
6970
use datafusion_functions_nested::remove::array_remove_all_udf;
7071
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
@@ -765,6 +766,21 @@ impl PhysicalPlanner {
765766

766767
Ok(Arc::new(case_expr))
767768
}
769+
ExprStruct::ArraysOverlap(expr) => {
770+
let left_array_expr =
771+
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
772+
let right_array_expr =
773+
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
774+
let args = vec![Arc::clone(&left_array_expr), right_array_expr];
775+
let datafusion_array_has_any = array_has_any_udf();
776+
let array_has_any_expr = Arc::new(ScalarFunctionExpr::new(
777+
"array_has_any",
778+
datafusion_array_has_any,
779+
args,
780+
DataType::Boolean,
781+
));
782+
Ok(array_has_any_expr)
783+
}
768784
expr => Err(ExecutionError::GeneralError(format!(
769785
"Not implemented: {:?}",
770786
expr

native/proto/src/proto/expr.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ message Expr {
8686
ArrayInsert array_insert = 59;
8787
BinaryExpr array_contains = 60;
8888
BinaryExpr array_remove = 61;
89+
BinaryExpr arrays_overlap = 62;
8990
}
9091
}
9192

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2300,6 +2300,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
23002300
expr.children(1),
23012301
inputs,
23022302
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
2303+
case ArraysOverlap(leftArrayExpr, rightArrayExpr) =>
2304+
createBinaryExpr(
2305+
leftArrayExpr,
2306+
rightArrayExpr,
2307+
inputs,
2308+
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
23032309
case _ =>
23042310
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
23052311
None

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2568,4 +2568,21 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25682568
}
25692569
}
25702570
}
2571+
2572+
test("arrays_overlap") {
2573+
Seq(true, false).foreach { dictionaryEnabled =>
2574+
withTempDir { dir =>
2575+
val path = new Path(dir.toURI.toString, "test.parquet")
2576+
makeParquetFileAllTypes(path, dictionaryEnabled, 10000)
2577+
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
2578+
checkSparkAnswerAndOperator(sql(
2579+
"SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null"))
2580+
checkSparkAnswerAndOperator(sql(
2581+
"SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null"))
2582+
checkSparkAnswerAndOperator(sql(
2583+
"SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null"))
2584+
}
2585+
}
2586+
}
2587+
25712588
}

0 commit comments

Comments
 (0)