Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5 connector #1115

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next

* PR #1115: Added new connector, `spark-3.5-bigquery` aimed to be used in Spark 3.5. This connector implements new APIs and capabilities provided by the Spark Data Source V2 API.
* PR #1117: Make read session caching duration configurable

## 0.34.0 - 2023-10-31
Expand Down
23 changes: 13 additions & 10 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The latest version of the connector is publicly available in the following links

| version | Link |
|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Spark 3.5 | `gs://spark-lib/bigquery/spark-3.5-bigquery-${next-release-tag}.jar`([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-3.5-bigquery-${next-release-tag}.jar)) |
| Spark 3.4 | `gs://spark-lib/bigquery/spark-3.4-bigquery-${next-release-tag}.jar`([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-3.4-bigquery-${next-release-tag}.jar)) |
| Spark 3.3 | `gs://spark-lib/bigquery/spark-3.3-bigquery-${next-release-tag}.jar`([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-3.3-bigquery-${next-release-tag}.jar)) |
| Spark 3.2 | `gs://spark-lib/bigquery/spark-3.2-bigquery-${next-release-tag}.jar`([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-3.2-bigquery-${next-release-tag}.jar)) |
Expand All @@ -79,16 +80,17 @@ The final two connectors are Scala based connectors, please use the jar relevant
below.

### Connector to Spark Compatibility Matrix
| Connector \ Spark | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 |3.4 |
|---------------------------------------|---------|---------|---------|---------|---------|---------|---------|
| spark-3.4-bigquery | | | | | | | ✓ |
| spark-3.3-bigquery | | | | | | ✓ | ✓ |
| spark-3.2-bigquery | | | | | ✓ | ✓ | ✓ |
| spark-3.1-bigquery | | | | ✓ | ✓ | ✓ | ✓ |
| spark-2.4-bigquery | | ✓ | | | | | |
| spark-bigquery-with-dependencies_2.13 | | | | | ✓ | ✓ | ✓ |
| spark-bigquery-with-dependencies_2.12 | | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| spark-bigquery-with-dependencies_2.11 | ✓ | ✓ | | | | | |
| Connector \ Spark | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 |3.4 | 3.5 |
|---------------------------------------|---------|---------|---------|---------|---------|---------|---------|---------|
| spark-3.5-bigquery | | | | | | | | ✓ |
| spark-3.4-bigquery | | | | | | | ✓ | ✓ |
| spark-3.3-bigquery | | | | | | ✓ | ✓ | ✓ |
| spark-3.2-bigquery | | | | | ✓ | ✓ | ✓ | ✓ |
| spark-3.1-bigquery | | | | ✓ | ✓ | ✓ | ✓ | ✓ |
| spark-2.4-bigquery | | ✓ | | | | | | |
| spark-bigquery-with-dependencies_2.13 | | | | | ✓ | ✓ | ✓ | ✓ |
| spark-bigquery-with-dependencies_2.12 | | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| spark-bigquery-with-dependencies_2.11 | ✓ | ✓ | | | | | | |

### Connector to Dataproc Image Compatibility Matrix
| Connector \ Dataproc Image | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | Serverless<br>Image 1.0 | Serverless<br>Image 2.0 | Serverless<br>Image 2.1 |
Expand All @@ -110,6 +112,7 @@ repository. It can be used using the `--packages` option or the

| version | Connector Artifact |
|------------|------------------------------------------------------------------------------------|
| Spark 3.5 | `com.google.cloud.spark:spark-3.5-bigquery:${next-release-tag}` |
| Spark 3.4 | `com.google.cloud.spark:spark-3.4-bigquery:${next-release-tag}` |
| Spark 3.3 | `com.google.cloud.spark:spark-3.3-bigquery:${next-release-tag}` |
| Spark 3.2 | `com.google.cloud.spark:spark-3.2-bigquery:${next-release-tag}` |
Expand Down
14 changes: 13 additions & 1 deletion cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,22 @@ steps:
- 'BIGLAKE_CONNECTION_ID=${_BIGLAKE_CONNECTION_ID}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# 4h. Run integration tests concurrently with unit tests (DSv2, Spark 3.5)
- name: 'gcr.io/$PROJECT_ID/dataproc-spark-bigquery-connector-presubmit'
id: 'integration-tests-3.5'
waitFor: ['integration-tests-3.2']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest-3.5']
env:
- 'GOOGLE_CLOUD_PROJECT=${_GOOGLE_CLOUD_PROJECT}'
- 'TEMPORARY_GCS_BUCKET=${_TEMPORARY_GCS_BUCKET}'
- 'BIGLAKE_CONNECTION_ID=${_BIGLAKE_CONNECTION_ID}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# 5. Upload coverage to CodeCov
- name: 'gcr.io/$PROJECT_ID/dataproc-spark-bigquery-connector-presubmit'
id: 'upload-it-to-codecov'
waitFor: ['integration-tests-2.12','integration-tests-2.13','integration-tests-2.4','integration-tests-3.1','integration-tests-3.2','integration-tests-3.3', 'integration-tests-3.4']
waitFor: ['integration-tests-2.12','integration-tests-2.13','integration-tests-2.4','integration-tests-3.1','integration-tests-3.2','integration-tests-3.3', 'integration-tests-3.4', 'integration-tests-3.5']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/presubmit.sh', 'upload-it-to-codecov']
env:
Expand Down
7 changes: 5 additions & 2 deletions cloudbuild/nightly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ case $STEP in
#coverage report
$MVN test jacoco:report jacoco:report-aggregate -Pcoverage,dsv1_2.12,dsv1_2.13,dsv2
# Run integration tests
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,integration,dsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,integration,dsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4,dsv2_3.5
# Run acceptance tests
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,acceptance,dsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,acceptance,dsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4,dsv2_3.5
# Upload test coverage report to Codecov
bash <(curl -s https://codecov.io/bash) -K -F "nightly"

Expand Down Expand Up @@ -79,6 +79,9 @@ case $STEP in
gsutil cp "${M2REPO}/com/google/cloud/spark/spark-3.4-bigquery/${BUILD_REVISION}/spark-3.4-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}"
gsutil cp "gs://${BUCKET}/spark-3.4-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}/spark-3.4-bigquery-nightly-snapshot.jar"

gsutil cp "${M2REPO}/com/google/cloud/spark/spark-3.5-bigquery/${BUILD_REVISION}/spark-3.5-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}"
gsutil cp "gs://${BUCKET}/spark-3.5-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}/spark-3.5-bigquery-nightly-snapshot.jar"

gsutil cp "${M2REPO}/com/google/cloud/spark/spark-bigquery-metrics/${BUILD_REVISION}/spark-bigquery-metrics-${BUILD_REVISION}.jar" "gs://${BUCKET}"
gsutil cp "gs://${BUCKET}/spark-bigquery-metrics-${BUILD_REVISION}.jar" "gs://${BUCKET}/spark-bigquery-metrics-nightly-snapshot.jar"

Expand Down
8 changes: 6 additions & 2 deletions cloudbuild/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ case $STEP in
# Download maven and all the dependencies
init)
checkenv
$MVN install -DskipTests -Pdsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4
$MVN install -DskipTests -Pdsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4,dsv2_3.5
exit
;;

# Run unit tests
unittest)
$MVN test jacoco:report jacoco:report-aggregate -Pcoverage,dsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4
$MVN test jacoco:report jacoco:report-aggregate -Pcoverage,dsv1_2.12,dsv1_2.13,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4,dsv2_3.5
# Upload test coverage report to Codecov
bash <(curl -s https://codecov.io/bash) -K -F "${STEP}"
;;
Expand Down Expand Up @@ -79,6 +79,10 @@ case $STEP in
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,integration,dsv2_3.4
;;

integrationtest-3.5)
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,integration,dsv2_3.5
;;

upload-it-to-codecov)
checkenv
# Upload test coverage report to Codecov
Expand Down
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,28 @@
<module>spark-bigquery-pushdown/spark-3.3-bigquery-pushdown_2.13</module>
</modules>
</profile>
<profile>
<id>dsv2_3.5</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-common</module>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-parent</module>
<module>spark-bigquery-dsv2/spark-3.1-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-bigquery-metrics</module>
<module>spark-bigquery-dsv2/spark-3.2-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.3-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.4-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.5-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.5-bigquery</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-parent</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-common_2.12</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-common_2.13</module>
<module>spark-bigquery-pushdown/spark-3.3-bigquery-pushdown_2.12</module>
<module>spark-bigquery-pushdown/spark-3.3-bigquery-pushdown_2.13</module>
</modules>
</profile>
<profile>
<id>coverage</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.google.cloud.spark.bigquery.integration;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assume.assumeThat;
import static org.junit.Assume.assumeTrue;

import com.google.cloud.bigquery.BigQuery;
Expand All @@ -30,14 +32,17 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand All @@ -48,15 +53,27 @@ public class ReadByFormatIntegrationTestBase extends SparkBigQueryIntegrationTes
private static final int LARGE_TABLE_NUMBER_OF_PARTITIONS = 138;
protected final String dataFormat;
protected final boolean userProvidedSchemaAllowed;
protected Optional<DataType> timeStampNTZType;

public ReadByFormatIntegrationTestBase(String dataFormat) {
this(dataFormat, true);
this(dataFormat, true, Optional.empty());
}

public ReadByFormatIntegrationTestBase(String dataFormat, boolean userProvidedSchemaAllowed) {
this(dataFormat, userProvidedSchemaAllowed, Optional.empty());
}

public ReadByFormatIntegrationTestBase(
String dataFormat, boolean userProvidedSchemaAllowed, DataType timestampNTZType) {
this(dataFormat, userProvidedSchemaAllowed, Optional.of(timestampNTZType));
}

public ReadByFormatIntegrationTestBase(
String dataFormat, boolean userProvidedSchemaAllowed, Optional<DataType> timestampNTZType) {
super();
this.dataFormat = dataFormat;
this.userProvidedSchemaAllowed = userProvidedSchemaAllowed;
this.timeStampNTZType = timestampNTZType;
}

@Test
Expand Down Expand Up @@ -272,6 +289,33 @@ public void testConvertBigQueryMapToSparkMap() throws Exception {
assertThat(result).contains(ImmutableMap.of("c", Long.valueOf(3)));
}

@Test
public void testTimestampNTZReadFromBigQuery() {
assumeThat(timeStampNTZType.isPresent(), is(true));
BigQuery bigQuery = IntegrationTestUtils.getBigquery();
LocalDateTime dateTime = LocalDateTime.of(2023, 9, 18, 14, 30, 15, 234 * 1_000_000);
bigQuery.create(
TableInfo.newBuilder(
TableId.of(testDataset.toString(), testTable),
StandardTableDefinition.of(Schema.of(Field.of("foo", LegacySQLTypeName.DATETIME))))
.build());
IntegrationTestUtils.runQuery(
String.format(
"INSERT INTO %s.%s (foo) VALUES " + "('%s')", testDataset, testTable, dateTime));

Dataset<Row> df =
spark
.read()
.format("bigquery")
.option("dataset", testDataset.toString())
.option("table", testTable)
.load();
StructType schema = df.schema();
assertThat(schema.apply("foo").dataType()).isEqualTo(timeStampNTZType.get());
Row row = df.head();
assertThat(row.get(0)).isEqualTo(dateTime);
}

static <K, V> Map<K, V> scalaMapToJavaMap(scala.collection.Map<K, V> map) {
ImmutableMap.Builder<K, V> result = ImmutableMap.<K, V>builder();
map.foreach(entry -> result.put(entry._1(), entry._2()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
Expand Down Expand Up @@ -103,18 +105,35 @@ public class ReadIntegrationTestBase extends SparkBigQueryIntegrationTestBase {
protected List<String> gcsObjectsToClean = new ArrayList<>();

public ReadIntegrationTestBase() {
this(true);
this(true, Optional.empty());
}

public ReadIntegrationTestBase(boolean userProvidedSchemaAllowed) {
this(userProvidedSchemaAllowed, ALL_TYPES_TABLE_SCHEMA);
this(userProvidedSchemaAllowed, Optional.empty());
}

public ReadIntegrationTestBase(boolean userProvidedSchemaAllowed, DataType timestampNTZType) {
this(userProvidedSchemaAllowed, Optional.of(timestampNTZType));
}

public ReadIntegrationTestBase(
boolean userProvidedSchemaAllowed, StructType allTypesTableSchema) {
boolean userProvidedSchemaAllowed, Optional<DataType> timeStampNTZType) {
super();
this.userProvidedSchemaAllowed = userProvidedSchemaAllowed;
this.allTypesTableSchema = allTypesTableSchema;
intializeSchema(timeStampNTZType);
}

private void intializeSchema(Optional<DataType> timeStampNTZType) {
if (!timeStampNTZType.isPresent()) {
allTypesTableSchema = ALL_TYPES_TABLE_SCHEMA;
return;
}
allTypesTableSchema = new StructType();
for (StructField field : ALL_TYPES_TABLE_SCHEMA.fields()) {
DataType dateTimeType = field.name().equals("dt") ? timeStampNTZType.get() : field.dataType();
allTypesTableSchema =
allTypesTableSchema.add(field.name(), dateTimeType, field.nullable(), field.metadata());
}
}

@Before
Expand Down
Loading
Loading