Skip to content

Commit

Permalink
[HUDI-7918] Remove support of Spark 3.0, 3.1, and 3.2 (apache#11692)
Browse files Browse the repository at this point in the history
* squash all commits

* fix azure

* only download the docker images once

* fix typo

* remove debug print lines

---------

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Aug 16, 2024
1 parent d4a4d9c commit ef8a7ad
Show file tree
Hide file tree
Showing 175 changed files with 401 additions and 20,704 deletions.
32 changes: 8 additions & 24 deletions .github/workflows/release_candidate_validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,23 @@ jobs:
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
sparkRuntime: 'spark3.5.1'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
sparkProfile: 'spark3'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.17'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
sparkRuntime: 'spark3.5.1'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.16'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
sparkRuntime: 'spark3.4.3'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.17'
flinkProfile: 'flink1.15'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
sparkRuntime: 'spark3.3.4'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.16'
flinkProfile: 'flink1.14'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.1'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.15'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- scalaProfile: 'scala-2.11'
flinkProfile: 'flink1.14'
sparkProfile: 'spark'
Expand Down
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ Refer to the table below for building with different Spark and Scala versions.

| Maven build options | Expected Spark bundle jar name | Notes |
|:--------------------------|:---------------------------------------------|:-------------------------------------------------|
| (empty) | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 (default options) |
| (empty) | hudi-spark3.5-bundle_2.12 | For Spark 3.5.x and Scala 2.12 (default options) |
| `-Dspark2.4 -Dscala-2.11` | hudi-spark2.4-bundle_2.11 | For Spark 2.4.4 and Scala 2.11 |
| `-Dspark3.0` | hudi-spark3.0-bundle_2.12 | For Spark 3.0.x and Scala 2.12 |
| `-Dspark3.1` | hudi-spark3.1-bundle_2.12 | For Spark 3.1.x and Scala 2.12 |
| `-Dspark3.2` | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 (same as default) |
| `-Dspark3.3` | hudi-spark3.3-bundle_2.12 | For Spark 3.3.x and Scala 2.12 |
| `-Dspark3.4` | hudi-spark3.4-bundle_2.12 | For Spark 3.4.x and Scala 2.12 |
| `-Dspark3.5 -Dscala-2.12` | hudi-spark3.5-bundle_2.12 | For Spark 3.5.x and Scala 2.12 |
| `-Dspark3.5 -Dscala-2.12` | hudi-spark3.5-bundle_2.12 | For Spark 3.5.x and Scala 2.12 (same as default) |
| `-Dspark3.5 -Dscala-2.13` | hudi-spark3.5-bundle_2.13 | For Spark 3.5.x and Scala 2.13 |
| `-Dspark2 -Dscala-2.11` | hudi-spark-bundle_2.11 (legacy bundle name) | For Spark 2.4.4 and Scala 2.11 |
| `-Dspark2 -Dscala-2.12` | hudi-spark-bundle_2.12 (legacy bundle name) | For Spark 2.4.4 and Scala 2.12 |
Expand Down
2 changes: 0 additions & 2 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ parameters:
- 'hudi-spark-datasource'
- 'hudi-spark-datasource/hudi-spark'
- 'hudi-spark-datasource/hudi-spark3.5.x'
- 'hudi-spark-datasource/hudi-spark3.2plus-common'
- 'hudi-spark-datasource/hudi-spark3-common'
- 'hudi-spark-datasource/hudi-spark-common'
- name: job6UTModules
Expand All @@ -89,7 +88,6 @@ parameters:
- '!hudi-spark-datasource'
- '!hudi-spark-datasource/hudi-spark'
- '!hudi-spark-datasource/hudi-spark3.5.x'
- '!hudi-spark-datasource/hudi-spark3.2plus-common'
- '!hudi-spark-datasource/hudi-spark3-common'
- '!hudi-spark-datasource/hudi-spark-common'
- name: job6FTModules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,10 @@ private[hudi] trait SparkVersionsSupport {

def isSpark2: Boolean = getSparkVersion.startsWith("2.")
def isSpark3: Boolean = getSparkVersion.startsWith("3.")
def isSpark3_0: Boolean = getSparkVersion.startsWith("3.0")
def isSpark3_1: Boolean = getSparkVersion.startsWith("3.1")
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")
def isSpark3_4: Boolean = getSparkVersion.startsWith("3.4")
def isSpark3_5: Boolean = getSparkVersion.startsWith("3.5")

def gteqSpark3_0: Boolean = getSparkVersion >= "3.0"
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1"
def gteqSpark3_2_2: Boolean = getSparkVersion >= "3.2.2"
def gteqSpark3_3: Boolean = getSparkVersion >= "3.3"
def gteqSpark3_3_2: Boolean = getSparkVersion >= "3.3.2"
def gteqSpark3_4: Boolean = getSparkVersion >= "3.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ object SparkAdapterSupport {
"org.apache.spark.sql.adapter.Spark3_4Adapter"
} else if (HoodieSparkUtils.isSpark3_3) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3_1Adapter"
} else if (HoodieSparkUtils.isSpark3_0) {
"org.apache.spark.sql.adapter.Spark3_0Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static SparkConf getSparkConfForTest(String appName) {
sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
}

if (canLoadClass("org.apache.spark.sql.hudi.catalog.HoodieCatalog") && HoodieSparkUtils.gteqSpark3_2()) {
if (canLoadClass("org.apache.spark.sql.hudi.catalog.HoodieCatalog") && HoodieSparkUtils.gteqSpark3_3()) {
sparkConf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static Map<String, String> getSparkSqlConf() {
Map<String, String> sqlConf = new HashMap<>();
sqlConf.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");

if (HoodieSparkUtils.gteqSpark3_2()) {
if (HoodieSparkUtils.gteqSpark3_3()) {
sqlConf.put("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
}

Expand Down
10 changes: 5 additions & 5 deletions hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ This repo contains the code that integrate Hudi with Spark. The repo is split in

`hudi-spark`
`hudi-spark2`
`hudi-spark3.1.x`
`hudi-spark3.2.x`
`hudi-spark3.3.x`
`hudi-spark3.4.x`
`hudi-spark3.5.x`
`hudi-spark2-common`
`hudi-spark3-common`
`hudi-spark-common`

* hudi-spark is the module that contains the code that both spark2 & spark3 version would share, also contains the antlr4
file that supports spark sql on spark 2.x version.
* hudi-spark2 is the module that contains the code that compatible with spark 2.x versions.
* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark3.2.x is the module that contains the code that compatible with spark 3.2.x versions.
* hudi-spark3.3.x is the module that contains the code that compatible with spark 3.3.x+ versions.
* hudi-spark3.3.x is the module that contains the code that compatible with spark3.3.x versions.
* hudi-spark3.4.x is the module that contains the code that compatible with spark 3.4.x versions.
* hudi-spark3.5.x is the module that contains the code that compatible with spark 3.5.x versions.
* hudi-spark2-common is the module that contains the code that would be reused between spark2.x versions, right now the module
has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module.
* hudi-spark3-common is the module that contains the code that would be reused between spark3.x versions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,72 +50,45 @@ object HoodieAnalysis extends SparkAdapterSupport {
// For more details please check out the scala-doc of the rule
val adaptIngestionTargetLogicalRelations: RuleBuilder = session => AdaptIngestionTargetLogicalRelations(session)

if (!HoodieSparkUtils.gteqSpark3_2) {
if (HoodieSparkUtils.isSpark2) {
//Add or correct resolution of MergeInto
// the way we load the class via reflection is diff across spark2 and spark3 and hence had to split it out.
if (HoodieSparkUtils.isSpark2) {
val resolveReferencesClass = "org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
val sparkResolveReferences: RuleBuilder =
session => ReflectionUtils.loadClass(resolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]
// TODO elaborate on the ordering
rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
} else if (HoodieSparkUtils.isSpark3_0) {
val resolveReferencesClass = "org.apache.spark.sql.catalyst.analysis.HoodieSpark30Analysis$ResolveReferences"
val sparkResolveReferences: RuleBuilder = {
session => instantiateKlass(resolveReferencesClass, session)
}
// TODO elaborate on the ordering
rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
} else if (HoodieSparkUtils.isSpark3_1) {
val resolveReferencesClass = "org.apache.spark.sql.catalyst.analysis.HoodieSpark31Analysis$ResolveReferences"
val sparkResolveReferences: RuleBuilder =
session => instantiateKlass(resolveReferencesClass, session)
// TODO elaborate on the ordering
rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
} else {
throw new IllegalStateException("Impossible to be here")
}
val resolveReferencesClass = "org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
val sparkResolveReferences: RuleBuilder =
session => ReflectionUtils.loadClass(resolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]
// TODO elaborate on the ordering
rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
} else {
rules += adaptIngestionTargetLogicalRelations
val dataSourceV2ToV1FallbackClass = if (HoodieSparkUtils.isSpark3_5)
"org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback"
else if (HoodieSparkUtils.isSpark3_4)
"org.apache.spark.sql.hudi.analysis.HoodieSpark34DataSourceV2ToV1Fallback"
else if (HoodieSparkUtils.isSpark3_3)
"org.apache.spark.sql.hudi.analysis.HoodieSpark33DataSourceV2ToV1Fallback"
else {
// Spark 3.2.x
"org.apache.spark.sql.hudi.analysis.HoodieSpark32DataSourceV2ToV1Fallback"
// Spark 3.3.x
"org.apache.spark.sql.hudi.analysis.HoodieSpark33DataSourceV2ToV1Fallback"
}
val dataSourceV2ToV1Fallback: RuleBuilder =
session => instantiateKlass(dataSourceV2ToV1FallbackClass, session)

val spark32PlusResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusResolveReferences"
val spark32PlusResolveReferences: RuleBuilder =
session => instantiateKlass(spark32PlusResolveReferencesClass, session)
val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
val spark3ResolveReferences: RuleBuilder =
session => instantiateKlass(spark3ResolveReferencesClass, session)

// NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
//
// It's critical for this rules to follow in this order; re-ordering this rules might lead to changes in
// behavior of Spark's analysis phase (for ex, DataSource V2 to V1 fallback might not kick in before other rules,
// leading to all relations resolving as V2 instead of current expectation of them being resolved as V1)
rules ++= Seq(dataSourceV2ToV1Fallback, spark32PlusResolveReferences)
}
rules ++= Seq(dataSourceV2ToV1Fallback, spark3ResolveReferences)

if (HoodieSparkUtils.isSpark3) {
val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.gteqSpark3_5) {
"org.apache.spark.sql.hudi.Spark35ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_4) {
"org.apache.spark.sql.hudi.Spark34ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_3) {
"org.apache.spark.sql.hudi.Spark33ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_2) {
"org.apache.spark.sql.hudi.Spark32ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_1) {
"org.apache.spark.sql.hudi.Spark31ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_0) {
"org.apache.spark.sql.hudi.Spark30ResolveHudiAlterTableCommand"
} else {
throw new IllegalStateException("Unsupported Spark version")
}
Expand All @@ -142,8 +115,8 @@ object HoodieAnalysis extends SparkAdapterSupport {
session => HoodiePostAnalysisRule(session)
)

if (HoodieSparkUtils.gteqSpark3_2) {
val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusPostAnalysisRule"
if (HoodieSparkUtils.isSpark3) {
val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
val spark3PostHocResolution: RuleBuilder =
session => instantiateKlass(spark3PostHocResolutionClass, session)

Expand All @@ -158,22 +131,15 @@ object HoodieAnalysis extends SparkAdapterSupport {
// Default rules
)

if (HoodieSparkUtils.gteqSpark3_0) {
if (HoodieSparkUtils.isSpark3) {
val nestedSchemaPruningClass =
if (HoodieSparkUtils.gteqSpark3_5) {
"org.apache.spark.sql.execution.datasources.Spark35NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_4) {
"org.apache.spark.sql.execution.datasources.Spark34NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_3) {
"org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_2) {
"org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_1) {
// spark 3.1
"org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
} else {
// spark 3.0
"org.apache.spark.sql.execution.datasources.Spark30NestedSchemaPruning"
// spark 3.3
"org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
}

val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void initSparkContexts(String appName) {
@ParameterizedTest
@ValueSource(strings = {"cow", "mor"})
public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception {
if (HoodieSparkUtils.gteqSpark3_1()) {
if (HoodieSparkUtils.gteqSpark3_3()) {
String tableName = "hudi_test" + new Date().getTime();
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode, BootstrapMode modeForRegexMatch) throws Exception {
// NOTE: Hudi doesn't support Orc in Spark < 3.0
// Please check HUDI-4496 for more details
if (!HoodieSparkUtils.gteqSpark3_0()) {
if (!HoodieSparkUtils.gteqSpark3_3()) {
return;
}
String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName()
Expand Down
Loading

0 comments on commit ef8a7ad

Please sign in to comment.