Skip to content

Commit

Permalink
support require_partition_filter=true for range partitioned table in …
Browse files Browse the repository at this point in the history
…DPO (#1326)
  • Loading branch information
isha97 authored Dec 18, 2024
1 parent f95e198 commit 8414275
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class BigQueryUtil {
public static final int DEFAULT_BIG_NUMERIC_PRECISION = 76;
public static final int DEFAULT_BIG_NUMERIC_SCALE = 38;
private static final int NO_VALUE = -1;
private static final long BIGQUERY_INTEGER_MIN_VALUE = Long.MIN_VALUE;
static final ImmutableSet<String> INTERNAL_ERROR_MESSAGES =
ImmutableSet.of(
"HTTP/2 error code: INTERNAL_ERROR",
Expand Down Expand Up @@ -714,37 +715,6 @@ public static String prepareQueryForLog(String query, int maxLength) {
: noNewLinesQuery;
}

static String getMergeQueryForPartitionedTable(
String destinationTableName,
String temporaryTableName,
StandardTableDefinition destinationDefinition,
String extractedPartitionedSource,
String extractedPartitionedTarget) {
FieldList allFields = destinationDefinition.getSchema().getFields();
String commaSeparatedFields =
allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`"));
String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L);

String queryFormat =
"MERGE `%s` AS target\n"
+ "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n"
+ "ON %s = %s AND %s\n"
+ "WHEN MATCHED THEN DELETE\n"
+ "WHEN NOT MATCHED AND NOT %s THEN\n"
+ "INSERT(%s) VALUES(%s)";
return String.format(
queryFormat,
destinationTableName,
temporaryTableName,
booleanInjectedColumn,
extractedPartitionedSource,
extractedPartitionedTarget,
booleanInjectedColumn,
booleanInjectedColumn,
commaSeparatedFields,
commaSeparatedFields);
}

static String getQueryForTimePartitionedTable(
String destinationTableName,
String temporaryTableName,
Expand Down Expand Up @@ -829,12 +799,32 @@ static String getQueryForRangePartitionedTable(
end,
interval);

return getMergeQueryForPartitionedTable(
FieldList allFields = destinationDefinition.getSchema().getFields();
String commaSeparatedFields =
allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`"));
String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L);

String queryFormat =
"MERGE `%s` AS target\n"
+ "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n"
+ "ON %s = %s AND %s AND (target.%s >= %d OR target.%s IS NULL )\n"
+ "WHEN MATCHED THEN DELETE\n"
+ "WHEN NOT MATCHED AND NOT %s THEN\n"
+ "INSERT(%s) VALUES(%s)";
return String.format(
queryFormat,
destinationTableName,
temporaryTableName,
destinationDefinition,
booleanInjectedColumn,
extractedPartitionedSource,
extractedPartitionedTarget);
extractedPartitionedTarget,
booleanInjectedColumn,
partitionField,
BIGQUERY_INTEGER_MIN_VALUE,
partitionField,
booleanInjectedColumn,
commaSeparatedFields,
commaSeparatedFields);
}

// based on https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration, it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2119,7 +2119,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(8, 1005), ( 21, 1010), (83, 1020)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2170,7 +2170,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(2, 1005), ( 150, 1010)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2205,7 +2205,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(2, 1005), ( -1, 1010)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2240,7 +2240,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(11, 1005), ( 100, 1010)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2279,7 +2279,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(NULL, 1000), "
+ "(11, 1005)])",
testDataset, testTable, orderId, orderCount));
Expand Down

0 comments on commit 8414275

Please sign in to comment.