Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,13 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare
run: |
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools==77.0.3 && \
pip3 install pyspark==3.5.5 cython && \
pip3 install pandas==2.2.3 pyarrow==20.0.0
- name: Prepare Spark Resources for Spark 4.0.1 #TODO remove after image update
run: |
rm -rf /opt/shims/spark40
Expand Down Expand Up @@ -1414,6 +1421,15 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare
run: |
dnf install -y python3.11 python3.11-pip python3.11-devel && \
ls -la /usr/bin/python3.11 && \
alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 && \
alternatives --set python3 /usr/bin/python3.11 && \
pip3 install setuptools==77.0.3 && \
pip3 install pyspark==3.5.5 cython && \
pip3 install pandas==2.2.3 pyarrow==20.0.0
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
run: |
rm -rf /opt/shims/spark41
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
.set("spark.executor.cores", "1")
}

// TODO: fix on spark-4.1
testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") {
test("arrow_udf test: without projection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand All @@ -65,8 +64,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
checkAnswer(df2, expected)
}

// TODO: fix on spark-4.1
testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") {
test("arrow_udf test: with unrelated projection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand All @@ -87,8 +85,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
checkAnswer(df, expected)
}

// TODO: fix on spark-4.1
testWithMaxSparkVersion("arrow_udf test: with preprojection", "4.0") {
test("arrow_udf test: with preprojection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,9 +1138,11 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("remove redundant WindowGroupLimits")
enableSuite[GlutenSQLCollectLimitExecSuite]
// Generated suites for org.apache.spark.sql.execution.python
// TODO: 4.x enableSuite[GlutenPythonDataSourceSuite] // 1 failure
// TODO: 4.x enableSuite[GlutenPythonUDFSuite] // 1 failure
// TODO: 4.x enableSuite[GlutenPythonUDTFSuite]
enableSuite[GlutenPythonDataSourceSuite]
.exclude("SPARK-50426: should not trigger static Python data source lookup")
enableSuite[GlutenPythonUDFSuite]
.exclude("SPARK-48706: Negative test case for Python UDF in higher order functions")
enableSuite[GlutenPythonUDTFSuite]
enableSuite[GlutenRowQueueSuite]
enableSuite[GlutenBatchEvalPythonExecSuite]
// Replaced with other tests that check for native operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
*/
package org.apache.spark.sql.execution.python

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.{AnalysisException, GlutenSQLTestsTrait, IntegratedUDFTestUtils}
import org.apache.spark.sql.functions.{array, transform}

class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait {}
class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait {

// Override: the original test uses this.getClass.getSimpleName in ExpectedContext pattern,
// which returns "GlutenPythonUDFSuite" but the actual callSite records "PythonUDFSuite".
testGluten("SPARK-48706: Negative test case for Python UDF in higher order functions") {
assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
checkError(
exception = intercept[AnalysisException] {
spark.range(1).select(transform(array("id"), x => pythonTestUDF(x))).collect()
},
condition = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF",
parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""),
context = ExpectedContext("transform", s".*PythonUDFSuite.*")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@ package org.apache.spark.sql.execution.python

import org.apache.spark.sql.GlutenSQLTestsTrait

// TODO: 4.x extends PythonUDTFSuite, currently PythonUDTFSuite requires Python executable
// [python3] and pyspark to be available, which are not present in the 4.0 CI environment.
class GlutenPythonUDTFSuite extends GlutenSQLTestsTrait {}
class GlutenPythonUDTFSuite extends PythonUDTFSuite with GlutenSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -1144,10 +1144,12 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("remove redundant WindowGroupLimits")
enableSuite[GlutenSQLCollectLimitExecSuite]
// Generated suites for org.apache.spark.sql.execution.python
// TODO: 4.x enableSuite[GlutenPythonDataSourceSuite]
// TODO: 4.x enableSuite[GlutenPythonUDFSuite]
// TODO: 4.x enableSuite[GlutenPythonUDTFSuite]
// TODO: 4.x enableSuite[GlutenRowQueueSuite]
enableSuite[GlutenPythonDataSourceSuite]
.exclude("data source reader with filter pushdown")
enableSuite[GlutenPythonUDFSuite]
.exclude("SPARK-48706: Negative test case for Python UDF in higher order functions")
enableSuite[GlutenPythonUDTFSuite]
enableSuite[GlutenRowQueueSuite]
enableSuite[GlutenBatchEvalPythonExecSuite]
// Replaced with other tests that check for native operations
.exclude("Python UDF: push down deterministic FilterExec predicates")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,82 @@
*/
package org.apache.spark.sql.execution.python

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.gluten.execution.FilterExecTransformerBase

class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {}
import org.apache.spark.sql.{GlutenSQLTestsTrait, IntegratedUDFTestUtils, Row}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.python.PythonScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {

import IntegratedUDFTestUtils._

// Gluten replaces FilterExec with FilterExecTransformer and
// BatchScanExec with BatchScanExecTransformer
testGluten("data source reader with filter pushdown") {
assume(shouldTestPandasUDFs)
val dataSourceScript =
s"""
|from pyspark.sql.datasource import (
| DataSource,
| DataSourceReader,
| EqualTo,
| InputPartition,
|)
|
|class SimpleDataSourceReader(DataSourceReader):
| def partitions(self):
| return [InputPartition(i) for i in range(2)]
|
| def pushFilters(self, filters):
| for filter in filters:
| if filter != EqualTo(("partition",), 0):
| yield filter
|
| def read(self, partition):
| yield (0, partition.value)
| yield (1, partition.value)
| yield (2, partition.value)
|
|class SimpleDataSource(DataSource):
| def schema(self):
| return "id int, partition int"
|
| def reader(self, schema):
| return SimpleDataSourceReader()
|""".stripMargin
val schema = StructType.fromDDL("id INT, partition INT")
val dataSource =
createUserDefinedPythonDataSource(name = dataSourceName, pythonScript = dataSourceScript)
withSQLConf(SQLConf.PYTHON_FILTER_PUSHDOWN_ENABLED.key -> "true") {
spark.dataSource.registerPython(dataSourceName, dataSource)
val df =
spark.read.format(dataSourceName).schema(schema).load().filter("id = 1 and partition = 0")
val plan = df.queryExecution.executedPlan

val filter = collectFirst(plan) {
case s: FilterExecTransformerBase =>
val condition = s.cond.toString
assert(!condition.contains("= 0"))
assert(condition.contains("= 1"))
s
}.getOrElse(
fail(s"FilterExecTransformerBase not found in the plan. Actual plan:\n$plan")
)

// Gluten does not replace PythonScan's BatchScanExec - it stays as vanilla
// BatchScanExec with RowToVeloxColumnar transition
collectFirst(filter) {
case s: BatchScanExec if s.scan.isInstanceOf[PythonScan] =>
val p = s.scan.asInstanceOf[PythonScan]
assert(p.getMetaData().get("PushedFilters").contains("[EqualTo(partition,0)]"))
}.getOrElse(
fail(s"BatchScanExec with PythonScan not found. Actual plan:\n$plan")
)

checkAnswer(df, Seq(Row(1, 0), Row(1, 1)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
*/
package org.apache.spark.sql.execution.python

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.{AnalysisException, GlutenSQLTestsTrait, IntegratedUDFTestUtils}
import org.apache.spark.sql.functions.{array, transform}

class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait {}
class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait {

// Override: the original test uses this.getClass.getSimpleName in ExpectedContext pattern,
// which returns "GlutenPythonUDFSuite" but the actual callSite records "PythonUDFSuite".
testGluten("SPARK-48706: Negative test case for Python UDF in higher order functions") {
assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
checkError(
exception = intercept[AnalysisException] {
spark.range(1).select(transform(array("id"), x => pythonTestUDF(x))).collect()
},
condition = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF",
parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""),
context = ExpectedContext("transform", s".*PythonUDFSuite.*")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@ package org.apache.spark.sql.execution.python

import org.apache.spark.sql.GlutenSQLTestsTrait

// TODO: 4.x extends PythonUDTFSuite, currently PythonUDTFSuite requires Python executable
// [python3] and pyspark to be available, which are not present in the 4.1 CI environment.
class GlutenPythonUDTFSuite extends GlutenSQLTestsTrait {}
class GlutenPythonUDTFSuite extends PythonUDTFSuite with GlutenSQLTestsTrait {}
Loading