Skip to content

Commit

Permalink
Relocate BigQuery Partition Utils (#62)
Browse files Browse the repository at this point in the history
Move `table/restrictions/BigQueryPartition` to
`common/utils/BigQueryParititonUtils`

---------

Co-authored-by: Pablo Rodriguez Defino <[email protected]>
  • Loading branch information
jayehwhyehentee and prodriguezdefino authored Dec 11, 2023
1 parent 9748984 commit 05133bf
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* the License.
*/

package com.google.cloud.flink.bigquery.table.restrictions;
package com.google.cloud.flink.bigquery.common.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -47,7 +47,7 @@

/** Utility class to handle the BigQuery partition conversions to Flink types and structures. */
@Internal
public class BigQueryPartition {
public class BigQueryPartitionUtils {

/**
* Below durations are added to current partitionId to obtain the next tentative partitionId,
Expand Down Expand Up @@ -105,7 +105,7 @@ public class BigQueryPartition {
SQL_YEAR_FORMAT.setTimeZone(UTC_TIME_ZONE);
}

private BigQueryPartition() {}
private BigQueryPartitionUtils() {}

/** Represents the partition types the BigQuery can use in partitioned tables. */
public enum PartitionType {
Expand Down Expand Up @@ -369,13 +369,13 @@ static PartitionIdWithInfoAndStatus retrievePartitionInfoWithStatus(
new PartitionIdWithInfoAndStatus(
partition.getPartitionId(),
partition.getInfo(),
BigQueryPartition.PartitionStatus.COMPLETED))
BigQueryPartitionUtils.PartitionStatus.COMPLETED))
.findFirst()
.orElse(
new PartitionIdWithInfoAndStatus(
partition.getPartitionId(),
partition.getInfo(),
BigQueryPartition.PartitionStatus.IN_PROGRESS));
BigQueryPartitionUtils.PartitionStatus.IN_PROGRESS));
}

static Instant retrieveEpochSecondsFromParsedTemporal(SimpleDateFormat sdf, String tsString) {
Expand Down Expand Up @@ -443,7 +443,7 @@ public static PartitionIdWithInfoAndStatus checkPartitionCompleted(
return new PartitionIdWithInfoAndStatus(
partition.getPartitionId(),
partition.getInfo(),
BigQueryPartition.PartitionStatus.COMPLETED);
BigQueryPartitionUtils.PartitionStatus.COMPLETED);
default:
throw new IllegalArgumentException(
"Partition type not supported: " + partition.getInfo().getPartitionType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
import com.google.cloud.flink.bigquery.common.utils.SchemaTransform;
import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition;
import org.threeten.bp.Duration;

import java.io.IOException;
Expand Down Expand Up @@ -228,7 +228,7 @@ public List<PartitionIdWithInfoAndStatus> retrievePartitionsStatus(
.stream()
.map(
pInfo ->
BigQueryPartition
BigQueryPartitionUtils
.checkPartitionCompleted(
pInfo))
.collect(Collectors.toList()))
Expand Down Expand Up @@ -268,9 +268,9 @@ public Optional<TablePartitionInfo> retrievePartitionColumnInfo(
Optional.of(
new TablePartitionInfo(
tp.getField(),
BigQueryPartition.PartitionType.valueOf(
tp.getType()),
BigQueryPartition
BigQueryPartitionUtils.PartitionType
.valueOf(tp.getType()),
BigQueryPartitionUtils
.retrievePartitionColumnType(
tableInfo.getSchema(),
tp.getField()),
Expand All @@ -280,7 +280,8 @@ public Optional<TablePartitionInfo> retrievePartitionColumnInfo(
Optional.of(
new TablePartitionInfo(
tableInfo.getRangePartitioning().getField(),
BigQueryPartition.PartitionType.INT_RANGE,
BigQueryPartitionUtils.PartitionType
.INT_RANGE,
StandardSQLTypeName.INT64,
bqStreamingBufferOldestEntryTime)));
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.flink.annotation.Internal;

import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;

import java.util.Objects;

Expand All @@ -27,10 +27,12 @@
public class PartitionIdWithInfoAndStatus {
private final String partitionId;
private final TablePartitionInfo info;
private final BigQueryPartition.PartitionStatus status;
private final BigQueryPartitionUtils.PartitionStatus status;

public PartitionIdWithInfoAndStatus(
String partitionId, TablePartitionInfo info, BigQueryPartition.PartitionStatus status) {
String partitionId,
TablePartitionInfo info,
BigQueryPartitionUtils.PartitionStatus status) {
this.partitionId = partitionId;
this.info = info;
this.status = status;
Expand All @@ -44,7 +46,7 @@ public TablePartitionInfo getInfo() {
return info;
}

public BigQueryPartition.PartitionStatus getStatus() {
public BigQueryPartitionUtils.PartitionStatus getStatus() {
return status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition.PartitionType;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils.PartitionType;

import java.time.Instant;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import org.apache.flink.table.types.logical.RowType;

import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.services.TablePartitionInfo;
import com.google.cloud.flink.bigquery.source.BigQuerySource;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.reader.deserializer.AvroToRowDataDeserializationSchema;
import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition;
import com.google.cloud.flink.bigquery.table.restrictions.BigQueryRestriction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -245,7 +245,7 @@ private static List<Map<String, String>> transformPartitionIds(
* we retrieve the existing partition ids and transform them into valid values given the
* column data type
*/
return BigQueryPartition.partitionValuesFromIdAndDataType(
return BigQueryPartitionUtils.partitionValuesFromIdAndDataType(
dataClient.retrieveTablePartitions(projectId, dataset, table),
partitionInfo.getColumnType())
.stream()
Expand Down Expand Up @@ -275,8 +275,11 @@ private static String rebuildRestrictionsApplyingPartitions(
.flatMap(map -> map.entrySet().stream())
.map(
entry ->
BigQueryPartition.formatPartitionRestrictionBasedOnInfo(
partitionInfo, entry.getKey(), entry.getValue()))
BigQueryPartitionUtils
.formatPartitionRestrictionBasedOnInfo(
partitionInfo,
entry.getKey(),
entry.getValue()))
.collect(Collectors.joining(" OR "));
return currentRestriction + " AND (" + partitionRestrictions + ")";
}
Expand Down
Loading

0 comments on commit 05133bf

Please sign in to comment.