From 753f65a2444439cddc9226fce78748a98362ee5e Mon Sep 17 00:00:00 2001 From: halio-g <82131353+halio-g@users.noreply.github.com> Date: Thu, 5 Oct 2023 01:44:39 -0700 Subject: [PATCH] Making the Bytes converted to the Binary type instead of an Array type. (#102) * Making the Bytes converted to the Binary type instead of an Array type. * Fixed the schema tests. * Fix the schema test on SpannerScanBuilderTest. * Fixed the SpannerInputPartitionReaderContextTest failure. * Changed the order of bytes. * Made the acceptance test run in parallel with the integration test. * Made a change on the DataBaseId + nanotime for Dataproc job. * Made the acceptance test name shorter. --- cloudbuild/cloudbuild.yaml | 6 +-- .../cloud/spark/spanner/SpannerTable.java | 4 +- .../cloud/spark/spanner/SpannerUtils.java | 12 ++++-- .../spanner/ReadIntegrationTestBase.java | 41 ++++++++++++++++--- ...pannerInputPartitionReaderContextTest.java | 37 +++++++++++++++-- .../spark/spanner/SpannerScanBuilderTest.java | 3 +- .../cloud/spark/spanner/SpannerTableTest.java | 3 +- .../cloud/spark/spanner/SpannerTestBase.java | 17 +++++++- .../DataprocAcceptanceTestBase.java | 6 ++- .../src/test/resources/db/insert_data.sql | 7 ++-- .../src/test/resources/db/populate_ddl.sql | 1 + 11 files changed, 108 insertions(+), 29 deletions(-) diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml index 55260184..728c3dfa 100644 --- a/cloudbuild/cloudbuild.yaml +++ b/cloudbuild/cloudbuild.yaml @@ -26,14 +26,14 @@ steps: # TODO: Make the acceptance test run in parallel with integration-real-spanner. - name: 'gcr.io/$PROJECT_ID/dataproc-spark-spanner-connector-presubmit' id: 'acceptance-test' - waitFor: ['integration-real-spanner'] + waitFor: ['init'] entrypoint: 'bash' args: ['/workspace/cloudbuild/presubmit.sh', 'acceptance-test'] env: - 'SPANNER_PROJECT_ID=$PROJECT_ID' - 'GOOGLE_CLOUD_PROJECT=$PROJECT_ID' - - 'SPANNER_INSTANCE_ID=acceptance-test-instance' - - 'SPANNER_DATABASE_ID=acceptance-test-database' + - 'SPANNER_INSTANCE_ID=accept-testins' + - 'SPANNER_DATABASE_ID=accept-testdb' - 'ACCEPTANCE_TEST_BUCKET=spark-spanner-connector-acceptance-test' diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java index db83d1e7..d173eec9 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java @@ -99,7 +99,7 @@ public static DataType ofSpannerStrType(String spannerStrType, boolean isNullabl return DataTypes.BooleanType; case "BYTES": - return DataTypes.createArrayType(DataTypes.ByteType); + return DataTypes.BinaryType; case "DATE": return DataTypes.DateType; @@ -131,7 +131,7 @@ public static DataType ofSpannerStrType(String spannerStrType, boolean isNullabl return DataTypes.StringType; } if (spannerStrType.indexOf("BYTES") == 0) { - return DataTypes.createArrayType(DataTypes.ByteType); + return DataTypes.BinaryType; } if (spannerStrType.indexOf("ARRAY") == 0) { diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java index 0ddcda37..3ef70c77 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java @@ -292,7 +292,7 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) { break; case BYTES: - sparkRow.update(i, new GenericArrayData(spannerRow.getBytes(i).toByteArray())); + sparkRow.update(i, spannerRow.getBytes(i).toByteArray()); break; case STRUCT: @@ -344,9 +344,13 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) { sparkRow.update(i, new GenericArrayData(dest.toArray(new UTF8String[0]))); } else if (fieldTypeName.indexOf("ARRAY src = value.getBytesArray(); - List dest = new ArrayList(src.size()); - src.forEach((s) -> dest.add(s == null ? null : new GenericArrayData(s.toByteArray()))); - sparkRow.update(i, new GenericArrayData(dest.toArray(new GenericArrayData[0]))); + byte[][] byteArray = new byte[src.size()][]; + List dest = new ArrayList(src.size()); + int it = 0; + for (ByteArray bytes : src) { + byteArray[it++] = bytes == null ? null : bytes.toByteArray(); + } + sparkRow.update(i, new GenericArrayData(byteArray)); } else if (fieldTypeName.indexOf("ARRAY dest = new ArrayList<>(); value.getStructArray().forEach((st) -> dest.add(spannerStructToInternalRow(st))); diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/ReadIntegrationTestBase.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/ReadIntegrationTestBase.java index e9fc6810..fba38624 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/ReadIntegrationTestBase.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/ReadIntegrationTestBase.java @@ -103,6 +103,14 @@ public void testReadFromCompositeTable() { Collections.sort(gotDs); Collections.sort(expectDs); assertThat(gotDs).containsExactlyElementsIn(expectDs); + + List gotJs = df.select("J").as(Encoders.BINARY()).collectAsList(); + List expectJs = Arrays.asList(stringToBytes("deadbeef"), stringToBytes("beefdead")); + List actualStringJs = + gotJs.stream().map(b -> bytesToString(b)).collect(Collectors.toList()); + List expectedStringJs = + expectJs.stream().map(b -> bytesToString(b)).collect(Collectors.toList()); + assertThat(actualStringJs).containsExactlyElementsIn(expectedStringJs); } Timestamp asSparkTimestamp(String s) { @@ -129,7 +137,8 @@ public void testDataset_schema() { new StructField( "H", DataTypes.createArrayType(DataTypes.DateType, true), true, null), new StructField( - "I", DataTypes.createArrayType(DataTypes.TimestampType, true), true, null)) + "I", DataTypes.createArrayType(DataTypes.TimestampType, true), true, null), + new StructField("J", DataTypes.BinaryType, true, null)) .toArray(new StructField[0])); // For easy debugging, let's firstly compare the .treeString() values. @@ -435,10 +444,8 @@ public void testDataNullsTable() { nullArrayCounts++; continue; } - - List expectedBytes = Arrays.asList((byte) -66, (byte) -17, (byte) -34, (byte) -83); - - assertThat(row.getList(j)).containsExactly(null, toSeq(expectedBytes)); + byte[] expectedBytes = stringToBytes("beefdead"); + assertArrayEquals(row.getList(j), Arrays.asList(null, expectedBytes)); } } assertEquals(3, nullArrayCounts); @@ -509,4 +516,28 @@ BigDecimal asSparkBigDecimal(String v) { private static Seq toSeq(List list) { return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); } + + private static byte[] stringToBytes(String str) { + byte[] val = new byte[str.length() / 2]; + for (int i = 0; i < val.length; i++) { + int index = i * 2; + int j = Integer.parseInt(str.substring(index, index + 2), 16); + val[i] = (byte) j; + } + return val; + } + + private static String bytesToString(byte[] bytes) { + return bytes == null ? "" : new String(bytes); + } + + private void assertArrayEquals(List bytesList1, List bytesList2) { + assertThat(bytesList1.size()).isEqualTo(bytesList2.size()); + + for (int i = 0; i < bytesList1.size(); i++) { + byte[] bytes1 = bytesList1.get(i); + byte[] bytes2 = bytesList2.get(i); + assertThat(bytesToString(bytes1)).isEqualTo(bytesToString(bytes2)); + } + } } diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java index 26f58d4f..ca367f0b 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java @@ -14,6 +14,7 @@ package com.google.cloud.spark.spanner; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import com.google.cloud.spanner.BatchClient; @@ -229,7 +230,8 @@ public void testArraysConversions() throws Exception { new ZonedDateTime[] { ZonedDateTime.parse("2023-08-26T12:11:10Z"), ZonedDateTime.parse("2023-08-27T12:11:09Z"), - }), + }, + "beefdead"), makeCompositeTableRow( "id2", new long[] {20, 200, 2991, 888885}, @@ -246,13 +248,42 @@ public void testArraysConversions() throws Exception { new ZonedDateTime[] { ZonedDateTime.parse("2023-09-22T12:11:10Z"), ZonedDateTime.parse("2023-09-23T12:11:09Z"), - })); + }, + "deadbeef")); Comparator cmp = new InternalRowComparator(); Collections.sort(expectRows, cmp); Collections.sort(gotRows, cmp); assertEquals(expectRows.size(), gotRows.size()); - assertEquals(expectRows, gotRows); + assertInternalRow(gotRows, expectRows); + } + + private static void assertInternalRow( + List actualRows, List expectedRows) { + assertEquals(expectedRows.size(), actualRows.size()); + for (int i = 0; i < actualRows.size(); i++) { + // We cannot use assertEqual for the whole List, since the byte[] will be + // compared with the object's address. + GenericInternalRow actualRow = (GenericInternalRow) actualRows.get(i); + GenericInternalRow expectedRow = (GenericInternalRow) expectedRows.get(i); + + assertThat(actualRow.getUTF8String(0)).isEqualTo(expectedRow.getUTF8String(0)); + assertThat(actualRow.getArray(1)).isEqualTo(expectedRow.getArray(1)); + assertThat(actualRow.getArray(2)).isEqualTo(expectedRow.getArray(2)); + assertThat(actualRow.getUTF8String(3)).isEqualTo(expectedRow.getUTF8String(3)); + assertThat(actualRow.getDecimal(4, 38, 9)).isEqualTo(expectedRow.getDecimal(4, 38, 9)); + assertThat(actualRow.getInt(5)).isEqualTo(expectedRow.getInt(5)); + assertThat(actualRow.getLong(6)).isEqualTo(expectedRow.getLong(6)); + assertThat(actualRow.getBoolean(7)).isEqualTo(expectedRow.getBoolean(7)); + assertThat(actualRow.getArray(8)).isEqualTo(expectedRow.getArray(8)); + assertThat(actualRow.getArray(9)).isEqualTo(expectedRow.getArray(9)); + assertThat(bytesToString(actualRow.getBinary(10))) + .isEqualTo(bytesToString(expectedRow.getBinary(10))); + } + } + + private static String bytesToString(byte[] bytes) { + return bytes == null ? "" : new String(bytes); } } diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java index 74a14d4a..9fb9c94a 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java @@ -52,8 +52,7 @@ public void readSchemaShouldWorkInSpannerScanBuilder() throws Exception { Arrays.asList( new StructField("A", DataTypes.LongType, false, null), new StructField("B", DataTypes.StringType, true, null), - new StructField( - "C", DataTypes.createArrayType(DataTypes.ByteType, true), true, null), + new StructField("C", DataTypes.BinaryType, true, null), new StructField("D", DataTypes.TimestampType, true, null), new StructField("E", DataTypes.createDecimalType(38, 9), true, null), new StructField( diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java index f0d41c47..395766d1 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java @@ -38,8 +38,7 @@ public void querySchemaShouldSuccessInSpannerTable() { Arrays.asList( new StructField("A", DataTypes.LongType, false, null), new StructField("B", DataTypes.StringType, true, null), - new StructField( - "C", DataTypes.createArrayType(DataTypes.ByteType, true), true, null), + new StructField("C", DataTypes.BinaryType, true, null), new StructField("D", DataTypes.TimestampType, true, null), new StructField("E", DataTypes.createDecimalType(38, 9), true, null), new StructField( diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTestBase.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTestBase.java index 4c46442e..6fcc4c4f 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTestBase.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTestBase.java @@ -229,8 +229,9 @@ public InternalRow makeCompositeTableRow( ZonedDateTime F, boolean G, ZonedDateTime[] H, - ZonedDateTime[] I) { - GenericInternalRow row = new GenericInternalRow(10); + ZonedDateTime[] I, + String J) { + GenericInternalRow row = new GenericInternalRow(11); row.update(0, UTF8String.fromString(id)); row.update(1, new GenericArrayData(A)); row.update(2, new GenericArrayData(toSparkStrList(B))); @@ -241,6 +242,8 @@ public InternalRow makeCompositeTableRow( row.setBoolean(7, G); row.update(8, SpannerUtils.zonedDateTimeIterToSparkDates(Arrays.asList(H))); row.update(9, SpannerUtils.zonedDateTimeIterToSparkTimestamps(Arrays.asList(I))); + row.update(10, stringToBytes(J)); + return row; } @@ -251,4 +254,14 @@ private UTF8String[] toSparkStrList(String[] strs) { } return dest.toArray(new UTF8String[0]); } + + private static byte[] stringToBytes(String str) { + byte[] val = new byte[str.length() / 2]; + for (int i = 0; i < val.length; i++) { + int index = i * 2; + int j = Integer.parseInt(str.substring(index, index + 2), 16); + val[i] = (byte) j; + } + return val; + } } diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/acceptance/DataprocAcceptanceTestBase.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/acceptance/DataprocAcceptanceTestBase.java index a4f7396b..e5986d5c 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/acceptance/DataprocAcceptanceTestBase.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/acceptance/DataprocAcceptanceTestBase.java @@ -56,8 +56,10 @@ public class DataprocAcceptanceTestBase { "Please set the 'GOOGLE_CLOUD_PROJECT' environment variable"); private static final String DATABASE_ID = Preconditions.checkNotNull( - System.getenv("SPANNER_DATABASE_ID"), - "Please set the 'SPANNER_DATABASE_ID' environment variable"); + System.getenv("SPANNER_DATABASE_ID"), + "Please set the 'SPANNER_DATABASE_ID' environment variable") + + "-" + + System.nanoTime(); private static final String INSTANCE_ID = Preconditions.checkNotNull( System.getenv("SPANNER_INSTANCE_ID"), diff --git a/spark-3.1-spanner-lib/src/test/resources/db/insert_data.sql b/spark-3.1-spanner-lib/src/test/resources/db/insert_data.sql index fc884a07..cf45d28c 100644 --- a/spark-3.1-spanner-lib/src/test/resources/db/insert_data.sql +++ b/spark-3.1-spanner-lib/src/test/resources/db/insert_data.sql @@ -46,17 +46,16 @@ VALUES DELETE FROM compositeTable WHERE 1=1; INSERT INTO - compositeTable(id, A, B, C, D, E, F, G, H, I) + compositeTable(id, A, B, C, D, E, F, G, H, I, J) VALUES ( "id1", [10, 100, 991, 567282], ["a", "b", "c"], "foobar", 2934, DATE("2023-01-01T00:00:00Z"), TIMESTAMP("2023-08-26T12:22:05Z"), true, [DATE("2023-01-02T00:00:00Z"), DATE("2023-12-31T00:00:00Z")], - [TIMESTAMP("2023-08-26T12:11:10Z"), TIMESTAMP("2023-08-27T12:11:09Z")]), + [TIMESTAMP("2023-08-26T12:11:10Z"), TIMESTAMP("2023-08-27T12:11:09Z")], FROM_HEX("beefdead")), ( "id2", [20, 200, 2991, 888885], ["A", "B", "C"], "this one", 93411, DATE("2023-09-23T00:00:00Z"), TIMESTAMP("2023-09-22T12:22:05Z"), false, [DATE("2023-09-02T00:00:00Z"), DATE("2023-12-31T00:00:00Z")], - [TIMESTAMP("2023-09-22T12:11:10Z"), TIMESTAMP("2023-09-23T12:11:09Z")]); - + [TIMESTAMP("2023-09-22T12:11:10Z"), TIMESTAMP("2023-09-23T12:11:09Z")], FROM_HEX("deadbeef")); DELETE FROM nullsTable WHERE 1=1; INSERT INTO diff --git a/spark-3.1-spanner-lib/src/test/resources/db/populate_ddl.sql b/spark-3.1-spanner-lib/src/test/resources/db/populate_ddl.sql index 3baae243..5c2df30d 100644 --- a/spark-3.1-spanner-lib/src/test/resources/db/populate_ddl.sql +++ b/spark-3.1-spanner-lib/src/test/resources/db/populate_ddl.sql @@ -58,6 +58,7 @@ CREATE TABLE compositeTable ( G BOOL, H ARRAY, I ARRAY, + J BYTES(20), ) PRIMARY KEY(id); CREATE TABLE nullsTable (