@@ -125,7 +125,11 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
125
125
"schema" ,
126
126
Schema .Field .of ("ID" , Schema .nullableOf (Schema .of (Schema .Type .LONG ))),
127
127
Schema .Field .of ("StringCol" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
128
- Schema .Field .of ("BOOL_COL" , Schema .nullableOf (Schema .of (Schema .Type .BOOLEAN )))
128
+ Schema .Field .of ("BoolCol" , Schema .nullableOf (Schema .of (Schema .Type .BOOLEAN ))),
129
+ Schema .Field .of ("TimestampCol" , Schema .nullableOf (Schema .of (Schema .LogicalType .TIMESTAMP_MICROS ))),
130
+ Schema .Field .of ("ArrayIntCol" , Schema .arrayOf (Schema .nullableOf (Schema .of (Schema .Type .LONG )))),
131
+ Schema .Field .of ("BytesCol" , Schema .nullableOf (Schema .of (Schema .Type .BYTES ))),
132
+ Schema .Field .of ("DateCol" , Schema .nullableOf (Schema .of (Schema .LogicalType .DATE )))
129
133
);
130
134
131
135
private static final ZonedDateTime NOW = ZonedDateTime .now ();
@@ -168,6 +172,10 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
168
172
.set ("ID" ).to (3 )
169
173
.set ("STRING_COL" ).to ("some string" )
170
174
.set ("BOOL_COL" ).to (false )
175
+ .set ("TIMESTAMP_COL" ).to (Timestamp .ofTimeSecondsAndNanos (NOW .toEpochSecond (), NOW .getNano ()))
176
+ .set ("ARRAY_INT_COL" ).toInt64Array (Arrays .asList (1L , 2L , null ))
177
+ .set ("BYTES_COL" ).to (ByteArray .copyFrom ("some value" .getBytes ()))
178
+ .set ("DATE_COL" ).to (Date .fromYearMonthDay (NOW .getYear (), NOW .getMonthValue (), NOW .getDayOfMonth ()))
171
179
.build ()
172
180
);
173
181
@@ -331,7 +339,7 @@ private void testReadWithImportQuery(Engine engine) throws Exception {
331
339
.put ("instance" , "${instance}" )
332
340
.put ("database" , "${database}" )
333
341
.put ("table" , "${srcTable}" )
334
- .put ("importQuery" ,"Select ID, STRING_COL as StringCol, BOOL_COL from " + SOURCE_TABLE_NAME )
342
+ .put ("importQuery" ,"${importQuery}" )
335
343
.build ();
336
344
337
345
Map <String , String > sinkProperties = new ImmutableMap .Builder <String , String >()
@@ -340,7 +348,6 @@ private void testReadWithImportQuery(Engine engine) throws Exception {
340
348
.put ("instance" , "${instance}" )
341
349
.put ("database" , "${database}" )
342
350
.put ("table" , "${dstTable}" )
343
- .put ("serviceAccountType" , "JSON" )
344
351
.put ("schema" , IMPORT_SCHEMA .toString ())
345
352
.put ("keys" , "${keys}" )
346
353
.build ();
@@ -356,6 +363,8 @@ private void testReadWithImportQuery(Engine engine) throws Exception {
356
363
args .put ("srcTable" , SOURCE_TABLE_NAME );
357
364
args .put ("dstTable" , nonExistentSinkTableName );
358
365
args .put ("keys" , "ID" );
366
+ args .put ("importQuery" ,"Select ID, STRING_COL as StringCol, BOOL_COL as BoolCol, TIMESTAMP_COL as TimestampCol, " +
367
+ "ARRAY_INT_COL as ArrayIntCol, BYTES_COL as BytesCol, DATE_COL as DateCol from " + SOURCE_TABLE_NAME );
359
368
startWorkFlow (applicationManager , ProgramRunStatus .COMPLETED , args );
360
369
361
370
ResultSet resultSet = spanner .getDatabaseClient (database .getId ())
@@ -372,10 +381,15 @@ private void testReadWithImportQuery(Engine engine) throws Exception {
372
381
Assert .assertEquals (secondRowExpected .keySet ().size (), resultSet .getColumnCount ());
373
382
Assert .assertEquals (secondRowExpected .get ("ID" ).getInt64 (), resultSet .getLong ("ID" ));
374
383
Assert .assertEquals (secondRowExpected .get ("STRING_COL" ).getString (), resultSet .getString ("StringCol" ));
375
- Assert .assertEquals (secondRowExpected .get ("BOOL_COL" ).getBool (), resultSet .getBoolean ("BOOL_COL" ));
384
+ Assert .assertEquals (secondRowExpected .get ("BOOL_COL" ).getBool (), resultSet .getBoolean ("BoolCol" ));
385
+ Assert .assertEquals (secondRowExpected .get ("TIMESTAMP_COL" ).getTimestamp (), resultSet .getTimestamp ("TimestampCol" ));
386
+ Assert .assertEquals (secondRowExpected .get ("ARRAY_INT_COL" ).getInt64Array (), resultSet .getLongList ("ArrayIntCol" ));
387
+ Assert .assertEquals (secondRowExpected .get ("BYTES_COL" ).getBytes (), resultSet .getBytes ("BytesCol" ));
388
+ Assert .assertEquals (secondRowExpected .get ("DATE_COL" ).getDate (), resultSet .getDate ("DateCol" ));
389
+ spanner .getDatabaseClient (database .getId ()).singleUse ()
390
+ .executeQuery (Statement .of (String .format ("drop table %s;" , nonExistentSinkTableName )));
376
391
}
377
392
378
-
379
393
//TODO:(CDAP-16040) re-enable once plugin is fixed
380
394
//@Test
381
395
public void testReadAndStoreInNewTableWithNoSourceSchema () throws Exception {
0 commit comments