Skip to content

Commit

Permalink
Making the Bytes converted to the Binary type instead of an Array typ…
Browse files Browse the repository at this point in the history
…e. (#102)

* Making the Bytes converted to the Binary type instead of an Array type.

* Fixed the schema tests.

* Fix the schema test on SpannerScanBuilderTest.

* Fixed the SpannerInputPartitionReaderContextTest failure.

* Changed the order of bytes.

* Made the acceptance test run in parallel with the integration test.

* Made a change on the DataBaseId + nanotime for Dataproc job.

* Made the acceptance test name shorter.
  • Loading branch information
halio-g authored Oct 5, 2023
1 parent c099567 commit 753f65a
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 29 deletions.
6 changes: 3 additions & 3 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ steps:
# TODO: Make the acceptance test run in parallel with integration-real-spanner.
- name: 'gcr.io/$PROJECT_ID/dataproc-spark-spanner-connector-presubmit'
id: 'acceptance-test'
waitFor: ['integration-real-spanner']
waitFor: ['init']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/presubmit.sh', 'acceptance-test']
env:
- 'SPANNER_PROJECT_ID=$PROJECT_ID'
- 'GOOGLE_CLOUD_PROJECT=$PROJECT_ID'
- 'SPANNER_INSTANCE_ID=acceptance-test-instance'
- 'SPANNER_DATABASE_ID=acceptance-test-database'
- 'SPANNER_INSTANCE_ID=accept-testins'
- 'SPANNER_DATABASE_ID=accept-testdb'
- 'ACCEPTANCE_TEST_BUCKET=spark-spanner-connector-acceptance-test'


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static DataType ofSpannerStrType(String spannerStrType, boolean isNullabl
return DataTypes.BooleanType;

case "BYTES":
return DataTypes.createArrayType(DataTypes.ByteType);
return DataTypes.BinaryType;

case "DATE":
return DataTypes.DateType;
Expand Down Expand Up @@ -131,7 +131,7 @@ public static DataType ofSpannerStrType(String spannerStrType, boolean isNullabl
return DataTypes.StringType;
}
if (spannerStrType.indexOf("BYTES") == 0) {
return DataTypes.createArrayType(DataTypes.ByteType);
return DataTypes.BinaryType;
}

if (spannerStrType.indexOf("ARRAY") == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) {
break;

case BYTES:
sparkRow.update(i, new GenericArrayData(spannerRow.getBytes(i).toByteArray()));
sparkRow.update(i, spannerRow.getBytes(i).toByteArray());
break;

case STRUCT:
Expand Down Expand Up @@ -344,9 +344,13 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) {
sparkRow.update(i, new GenericArrayData(dest.toArray(new UTF8String[0])));
} else if (fieldTypeName.indexOf("ARRAY<BYTES") == 0) {
List<ByteArray> src = value.getBytesArray();
List<GenericArrayData> dest = new ArrayList<GenericArrayData>(src.size());
src.forEach((s) -> dest.add(s == null ? null : new GenericArrayData(s.toByteArray())));
sparkRow.update(i, new GenericArrayData(dest.toArray(new GenericArrayData[0])));
byte[][] byteArray = new byte[src.size()][];
List<byte[]> dest = new ArrayList<byte[]>(src.size());
int it = 0;
for (ByteArray bytes : src) {
byteArray[it++] = bytes == null ? null : bytes.toByteArray();
}
sparkRow.update(i, new GenericArrayData(byteArray));
} else if (fieldTypeName.indexOf("ARRAY<STRUCT<") == 0) {
List<InternalRow> dest = new ArrayList<>();
value.getStructArray().forEach((st) -> dest.add(spannerStructToInternalRow(st)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public void testReadFromCompositeTable() {
Collections.sort(gotDs);
Collections.sort(expectDs);
assertThat(gotDs).containsExactlyElementsIn(expectDs);

List<byte[]> gotJs = df.select("J").as(Encoders.BINARY()).collectAsList();
List<byte[]> expectJs = Arrays.asList(stringToBytes("deadbeef"), stringToBytes("beefdead"));
List<String> actualStringJs =
gotJs.stream().map(b -> bytesToString(b)).collect(Collectors.toList());
List<String> expectedStringJs =
expectJs.stream().map(b -> bytesToString(b)).collect(Collectors.toList());
assertThat(actualStringJs).containsExactlyElementsIn(expectedStringJs);
}

Timestamp asSparkTimestamp(String s) {
Expand All @@ -129,7 +137,8 @@ public void testDataset_schema() {
new StructField(
"H", DataTypes.createArrayType(DataTypes.DateType, true), true, null),
new StructField(
"I", DataTypes.createArrayType(DataTypes.TimestampType, true), true, null))
"I", DataTypes.createArrayType(DataTypes.TimestampType, true), true, null),
new StructField("J", DataTypes.BinaryType, true, null))
.toArray(new StructField[0]));

// For easy debugging, let's firstly compare the .treeString() values.
Expand Down Expand Up @@ -435,10 +444,8 @@ public void testDataNullsTable() {
nullArrayCounts++;
continue;
}

List<Byte> expectedBytes = Arrays.asList((byte) -66, (byte) -17, (byte) -34, (byte) -83);

assertThat(row.getList(j)).containsExactly(null, toSeq(expectedBytes));
byte[] expectedBytes = stringToBytes("beefdead");
assertArrayEquals(row.getList(j), Arrays.asList(null, expectedBytes));
}
}
assertEquals(3, nullArrayCounts);
Expand Down Expand Up @@ -509,4 +516,28 @@ BigDecimal asSparkBigDecimal(String v) {
private static <T> Seq<T> toSeq(List<T> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}

private static byte[] stringToBytes(String str) {
byte[] val = new byte[str.length() / 2];
for (int i = 0; i < val.length; i++) {
int index = i * 2;
int j = Integer.parseInt(str.substring(index, index + 2), 16);
val[i] = (byte) j;
}
return val;
}

private static String bytesToString(byte[] bytes) {
return bytes == null ? "" : new String(bytes);
}

private void assertArrayEquals(List<byte[]> bytesList1, List<byte[]> bytesList2) {
assertThat(bytesList1.size()).isEqualTo(bytesList2.size());

for (int i = 0; i < bytesList1.size(); i++) {
byte[] bytes1 = bytesList1.get(i);
byte[] bytes2 = bytesList2.get(i);
assertThat(bytesToString(bytes1)).isEqualTo(bytesToString(bytes2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.cloud.spark.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;

import com.google.cloud.spanner.BatchClient;
Expand Down Expand Up @@ -229,7 +230,8 @@ public void testArraysConversions() throws Exception {
new ZonedDateTime[] {
ZonedDateTime.parse("2023-08-26T12:11:10Z"),
ZonedDateTime.parse("2023-08-27T12:11:09Z"),
}),
},
"beefdead"),
makeCompositeTableRow(
"id2",
new long[] {20, 200, 2991, 888885},
Expand All @@ -246,13 +248,42 @@ public void testArraysConversions() throws Exception {
new ZonedDateTime[] {
ZonedDateTime.parse("2023-09-22T12:11:10Z"),
ZonedDateTime.parse("2023-09-23T12:11:09Z"),
}));
},
"deadbeef"));

Comparator<InternalRow> cmp = new InternalRowComparator();
Collections.sort(expectRows, cmp);
Collections.sort(gotRows, cmp);

assertEquals(expectRows.size(), gotRows.size());
assertEquals(expectRows, gotRows);
assertInternalRow(gotRows, expectRows);
}

private static void assertInternalRow(
List<InternalRow> actualRows, List<InternalRow> expectedRows) {
assertEquals(expectedRows.size(), actualRows.size());
for (int i = 0; i < actualRows.size(); i++) {
// We cannot use assertEqual for the whole List, since the byte[] will be
// compared with the object's address.
GenericInternalRow actualRow = (GenericInternalRow) actualRows.get(i);
GenericInternalRow expectedRow = (GenericInternalRow) expectedRows.get(i);

assertThat(actualRow.getUTF8String(0)).isEqualTo(expectedRow.getUTF8String(0));
assertThat(actualRow.getArray(1)).isEqualTo(expectedRow.getArray(1));
assertThat(actualRow.getArray(2)).isEqualTo(expectedRow.getArray(2));
assertThat(actualRow.getUTF8String(3)).isEqualTo(expectedRow.getUTF8String(3));
assertThat(actualRow.getDecimal(4, 38, 9)).isEqualTo(expectedRow.getDecimal(4, 38, 9));
assertThat(actualRow.getInt(5)).isEqualTo(expectedRow.getInt(5));
assertThat(actualRow.getLong(6)).isEqualTo(expectedRow.getLong(6));
assertThat(actualRow.getBoolean(7)).isEqualTo(expectedRow.getBoolean(7));
assertThat(actualRow.getArray(8)).isEqualTo(expectedRow.getArray(8));
assertThat(actualRow.getArray(9)).isEqualTo(expectedRow.getArray(9));
assertThat(bytesToString(actualRow.getBinary(10)))
.isEqualTo(bytesToString(expectedRow.getBinary(10)));
}
}

private static String bytesToString(byte[] bytes) {
return bytes == null ? "" : new String(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public void readSchemaShouldWorkInSpannerScanBuilder() throws Exception {
Arrays.asList(
new StructField("A", DataTypes.LongType, false, null),
new StructField("B", DataTypes.StringType, true, null),
new StructField(
"C", DataTypes.createArrayType(DataTypes.ByteType, true), true, null),
new StructField("C", DataTypes.BinaryType, true, null),
new StructField("D", DataTypes.TimestampType, true, null),
new StructField("E", DataTypes.createDecimalType(38, 9), true, null),
new StructField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public void querySchemaShouldSuccessInSpannerTable() {
Arrays.asList(
new StructField("A", DataTypes.LongType, false, null),
new StructField("B", DataTypes.StringType, true, null),
new StructField(
"C", DataTypes.createArrayType(DataTypes.ByteType, true), true, null),
new StructField("C", DataTypes.BinaryType, true, null),
new StructField("D", DataTypes.TimestampType, true, null),
new StructField("E", DataTypes.createDecimalType(38, 9), true, null),
new StructField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ public InternalRow makeCompositeTableRow(
ZonedDateTime F,
boolean G,
ZonedDateTime[] H,
ZonedDateTime[] I) {
GenericInternalRow row = new GenericInternalRow(10);
ZonedDateTime[] I,
String J) {
GenericInternalRow row = new GenericInternalRow(11);
row.update(0, UTF8String.fromString(id));
row.update(1, new GenericArrayData(A));
row.update(2, new GenericArrayData(toSparkStrList(B)));
Expand All @@ -241,6 +242,8 @@ public InternalRow makeCompositeTableRow(
row.setBoolean(7, G);
row.update(8, SpannerUtils.zonedDateTimeIterToSparkDates(Arrays.asList(H)));
row.update(9, SpannerUtils.zonedDateTimeIterToSparkTimestamps(Arrays.asList(I)));
row.update(10, stringToBytes(J));

return row;
}

Expand All @@ -251,4 +254,14 @@ private UTF8String[] toSparkStrList(String[] strs) {
}
return dest.toArray(new UTF8String[0]);
}

private static byte[] stringToBytes(String str) {
byte[] val = new byte[str.length() / 2];
for (int i = 0; i < val.length; i++) {
int index = i * 2;
int j = Integer.parseInt(str.substring(index, index + 2), 16);
val[i] = (byte) j;
}
return val;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public class DataprocAcceptanceTestBase {
"Please set the 'GOOGLE_CLOUD_PROJECT' environment variable");
private static final String DATABASE_ID =
Preconditions.checkNotNull(
System.getenv("SPANNER_DATABASE_ID"),
"Please set the 'SPANNER_DATABASE_ID' environment variable");
System.getenv("SPANNER_DATABASE_ID"),
"Please set the 'SPANNER_DATABASE_ID' environment variable")
+ "-"
+ System.nanoTime();
private static final String INSTANCE_ID =
Preconditions.checkNotNull(
System.getenv("SPANNER_INSTANCE_ID"),
Expand Down
7 changes: 3 additions & 4 deletions spark-3.1-spanner-lib/src/test/resources/db/insert_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,16 @@ VALUES

DELETE FROM compositeTable WHERE 1=1;
INSERT INTO
compositeTable(id, A, B, C, D, E, F, G, H, I)
compositeTable(id, A, B, C, D, E, F, G, H, I, J)
VALUES
(
"id1", [10, 100, 991, 567282], ["a", "b", "c"], "foobar", 2934, DATE("2023-01-01T00:00:00Z"),
TIMESTAMP("2023-08-26T12:22:05Z"), true, [DATE("2023-01-02T00:00:00Z"), DATE("2023-12-31T00:00:00Z")],
[TIMESTAMP("2023-08-26T12:11:10Z"), TIMESTAMP("2023-08-27T12:11:09Z")]),
[TIMESTAMP("2023-08-26T12:11:10Z"), TIMESTAMP("2023-08-27T12:11:09Z")], FROM_HEX("beefdead")),
(
"id2", [20, 200, 2991, 888885], ["A", "B", "C"], "this one", 93411, DATE("2023-09-23T00:00:00Z"),
TIMESTAMP("2023-09-22T12:22:05Z"), false, [DATE("2023-09-02T00:00:00Z"), DATE("2023-12-31T00:00:00Z")],
[TIMESTAMP("2023-09-22T12:11:10Z"), TIMESTAMP("2023-09-23T12:11:09Z")]);

[TIMESTAMP("2023-09-22T12:11:10Z"), TIMESTAMP("2023-09-23T12:11:09Z")], FROM_HEX("deadbeef"));

DELETE FROM nullsTable WHERE 1=1;
INSERT INTO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ CREATE TABLE compositeTable (
G BOOL,
H ARRAY<DATE>,
I ARRAY<TIMESTAMP>,
J BYTES(20),
) PRIMARY KEY(id);

CREATE TABLE nullsTable (
Expand Down

0 comments on commit 753f65a

Please sign in to comment.