|
53 | 53 | import org.junit.Assert;
|
54 | 54 | import org.junit.Before;
|
55 | 55 | import org.junit.BeforeClass;
|
56 |
| -import org.junit.Ignore; |
57 | 56 | import org.junit.Test;
|
58 | 57 | import org.slf4j.Logger;
|
59 | 58 | import org.slf4j.LoggerFactory;
|
@@ -122,6 +121,13 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
|
122 | 121 | Schema.Field.of("ARRAY_DATE_COL", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))))
|
123 | 122 | );
|
124 | 123 |
|
| 124 | + private static final Schema IMPORT_SCHEMA = Schema.recordOf( |
| 125 | + "schema", |
| 126 | + Schema.Field.of("ID", Schema.nullableOf(Schema.of(Schema.Type.LONG))), |
| 127 | + Schema.Field.of("StringCol", Schema.nullableOf(Schema.of(Schema.Type.STRING))), |
| 128 | + Schema.Field.of("BOOL_COL", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))) |
| 129 | + ); |
| 130 | + |
125 | 131 | private static final ZonedDateTime NOW = ZonedDateTime.now();
|
126 | 132 | private static final Function<String, List<Mutation>> TEST_MUTATIONS = (tableName) -> ImmutableList.of(
|
127 | 133 | Mutation.newInsertBuilder(tableName)
|
@@ -156,7 +162,13 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
|
156 | 162 | Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth()),
|
157 | 163 | Date.fromYearMonthDay(NOW.getYear() + 1, NOW.getMonthValue(), NOW.getDayOfMonth()),
|
158 | 164 | null))
|
159 |
| - .build() |
| 165 | + .build(), |
| 166 | + |
| 167 | + Mutation.newInsertBuilder(tableName) |
| 168 | + .set("ID").to(3) |
| 169 | + .set("STRING_COL").to("some string") |
| 170 | + .set("BOOL_COL").to(false) |
| 171 | + .build() |
160 | 172 | );
|
161 | 173 |
|
162 | 174 | private static final List<Mutation> SOURCE_TABLE_TEST_MUTATIONS = TEST_MUTATIONS.apply(SOURCE_TABLE_NAME);
|
@@ -306,6 +318,64 @@ private void testReadAndStore(Engine engine) throws Exception {
|
306 | 318 | Assert.assertTrue(resultSet.isNull("NOT_IN_THE_SCHEMA_COL"));
|
307 | 319 | }
|
308 | 320 |
|
| 321 | + @Test |
| 322 | + public void testReadWithImportQuery() throws Exception { |
| 323 | + testReadWithImportQuery(Engine.MAPREDUCE); |
| 324 | + testReadWithImportQuery(Engine.SPARK); |
| 325 | + } |
| 326 | + |
| 327 | + private void testReadWithImportQuery(Engine engine) throws Exception { |
| 328 | + Map<String, String> sourceProperties = new ImmutableMap.Builder<String, String>() |
| 329 | + .put("referenceName", "spanner_source") |
| 330 | + .put("project", "${project}") |
| 331 | + .put("instance", "${instance}") |
| 332 | + .put("database", "${database}") |
| 333 | + .put("table", "${srcTable}") |
| 334 | + .put("importQuery","Select ID, STRING_COL as StringCol, BOOL_COL from " + SOURCE_TABLE_NAME) |
| 335 | + .build(); |
| 336 | + |
| 337 | + Map<String, String> sinkProperties = new ImmutableMap.Builder<String, String>() |
| 338 | + .put("referenceName", "spanner_sink") |
| 339 | + .put("project", "${project}") |
| 340 | + .put("instance", "${instance}") |
| 341 | + .put("database", "${database}") |
| 342 | + .put("table", "${dstTable}") |
| 343 | + .put("serviceAccountType", "JSON") |
| 344 | + .put("schema", IMPORT_SCHEMA.toString()) |
| 345 | + .put("keys", "${keys}") |
| 346 | + .build(); |
| 347 | + |
| 348 | + String applicationName = SPANNER_PLUGIN_NAME + "-testReadWithImportQuery"; |
| 349 | + String nonExistentSinkTableName = "nonexistent_" + UUID.randomUUID().toString().replaceAll("-", "_"); |
| 350 | + ApplicationManager applicationManager = deployApplication(sourceProperties, sinkProperties, |
| 351 | + applicationName, engine); |
| 352 | + Map<String, String> args = new HashMap<>(); |
| 353 | + args.put("project", getProjectId()); |
| 354 | + args.put("instance", instance.getId().getInstance()); |
| 355 | + args.put("database", database.getId().getDatabase()); |
| 356 | + args.put("srcTable", SOURCE_TABLE_NAME); |
| 357 | + args.put("dstTable", nonExistentSinkTableName); |
| 358 | + args.put("keys", "ID"); |
| 359 | + startWorkFlow(applicationManager, ProgramRunStatus.COMPLETED, args); |
| 360 | + |
| 361 | + ResultSet resultSet = spanner.getDatabaseClient(database.getId()) |
| 362 | + .singleUse() |
| 363 | + .executeQuery(Statement.of(String.format("select * from %s;", nonExistentSinkTableName))); |
| 364 | + |
| 365 | + Assert.assertTrue(resultSet.next()); |
| 366 | + Map<String, Value> firstRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(0).asMap(); |
| 367 | + Assert.assertEquals(firstRowExpected.get("ID").getInt64(), resultSet.getLong("ID")); |
| 368 | + |
| 369 | + Assert.assertTrue(resultSet.next()); |
| 370 | + Assert.assertTrue(resultSet.next()); |
| 371 | + Map<String, Value> secondRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(2).asMap(); |
| 372 | + Assert.assertEquals(secondRowExpected.keySet().size(), resultSet.getColumnCount()); |
| 373 | + Assert.assertEquals(secondRowExpected.get("ID").getInt64(), resultSet.getLong("ID")); |
| 374 | + Assert.assertEquals(secondRowExpected.get("STRING_COL").getString(), resultSet.getString("StringCol")); |
| 375 | + Assert.assertEquals(secondRowExpected.get("BOOL_COL").getBool(), resultSet.getBoolean("BOOL_COL")); |
| 376 | + } |
| 377 | + |
| 378 | + |
309 | 379 | //TODO:(CDAP-16040) re-enable once plugin is fixed
|
310 | 380 | //@Test
|
311 | 381 | public void testReadAndStoreInNewTableWithNoSourceSchema() throws Exception {
|
|
0 commit comments