Skip to content

Commit

Permalink
Refactor for BigQueryPartition class (#54)
Browse files Browse the repository at this point in the history
In preparation for the unbounded source implementation. Test coverage
for partition related methods was improved as well.

---------

Co-authored-by: Jayant Jain <[email protected]>
Co-authored-by: Jayant Jain <[email protected]>
  • Loading branch information
3 people authored Dec 8, 2023
1 parent 351d608 commit 9748984
Show file tree
Hide file tree
Showing 12 changed files with 1,015 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,4 @@ static TableFieldSchema fieldToTableFieldSchema(Field field) {
public static TableSchema bigQuerySchemaToTableSchema(com.google.cloud.bigquery.Schema schema) {
return new TableSchema().setFields(fieldListToListOfTableFieldSchema(schema.getFields()));
}

public static StandardSQLTypeName bigQueryTableFieldSchemaTypeToSQLType(
String tableFieldSchemaType) {
return BIG_QUERY_TO_SQL_TYPES.getOrDefault(tableFieldSchemaType, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.threeten.bp.Duration;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -213,6 +215,34 @@ public List<String> retrieveTablePartitions(String project, String dataset, Stri
}
}

public List<PartitionIdWithInfoAndStatus> retrievePartitionsStatus(
String project, String dataset, String table) {
try {
return retrievePartitionColumnInfo(project, dataset, table)
.map(
info ->
info
.toPartitionsWithInfo(
retrieveTablePartitions(
project, dataset, table))
.stream()
.map(
pInfo ->
BigQueryPartition
.checkPartitionCompleted(
pInfo))
.collect(Collectors.toList()))
.orElse(new ArrayList<>());
} catch (Exception ex) {
throw new RuntimeException(
String.format(
"Problems while trying to retrieve table partitions status"
+ " (table: %s.%s.%s).",
project, dataset, table),
ex);
}
}

@Override
public Optional<TablePartitionInfo> retrievePartitionColumnInfo(
String project, String dataset, String table) {
Expand All @@ -227,6 +257,11 @@ public Optional<TablePartitionInfo> retrievePartitionColumnInfo(
&& tableInfo.getTimePartitioning() == null) {
return Optional.empty();
}
Instant bqStreamingBufferOldestEntryTime =
Optional.ofNullable(tableInfo.getStreamingBuffer())
.map(sbuffer -> sbuffer.getOldestEntryTime().longValue())
.map(millisFromEpoch -> Instant.ofEpochMilli(millisFromEpoch))
.orElse(Instant.MAX);
return Optional.ofNullable(tableInfo.getTimePartitioning())
.map(
tp ->
Expand All @@ -238,14 +273,16 @@ public Optional<TablePartitionInfo> retrievePartitionColumnInfo(
BigQueryPartition
.retrievePartitionColumnType(
tableInfo.getSchema(),
tp.getField()))))
tp.getField()),
bqStreamingBufferOldestEntryTime)))
.orElseGet(
() ->
Optional.of(
new TablePartitionInfo(
tableInfo.getRangePartitioning().getField(),
BigQueryPartition.PartitionType.INT_RANGE,
StandardSQLTypeName.INT64)));
StandardSQLTypeName.INT64,
bqStreamingBufferOldestEntryTime)));
} catch (Exception ex) {
throw new RuntimeException(
String.format(
Expand Down Expand Up @@ -279,7 +316,6 @@ public Job dryRunQuery(String projectId, String query) {
.setUseLegacySql(false);
/** first we need to execute a dry-run to understand the expected query location. */
return BigQueryUtils.dryRunQuery(bigquery, projectId, queryConfiguration, null);

} catch (Exception ex) {
throw new RuntimeException(
"Problems occurred while trying to dry-run a BigQuery query job.", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static <T> FailsafeExecutor<T> buildRetriableExecutorForOperation(String operati

static <T> T executeOperation(
FailsafeExecutor<T> failsafeExecutor, CheckedSupplier<T> operation) {
return failsafeExecutor.get(() -> operation.get());
return failsafeExecutor.get(operation);
}

static Job runInsertJob(Bigquery client, String projectId, Job job) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2023 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.services;

import org.apache.flink.annotation.Internal;

import java.util.Objects;

/** */
@Internal
public class PartitionIdWithInfo {
private final String partitionId;
private final TablePartitionInfo info;

public PartitionIdWithInfo(String partitionId, TablePartitionInfo info) {
this.partitionId = partitionId;
this.info = info;
}

public String getPartitionId() {
return partitionId;
}

public TablePartitionInfo getInfo() {
return info;
}

@Override
public int hashCode() {
int hash = 5;
hash = 97 * hash + Objects.hashCode(this.getInfo());
hash = 97 * hash + Objects.hashCode(this.getPartitionId());
return hash;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final PartitionIdWithInfo other = (PartitionIdWithInfo) obj;
if (!Objects.equals(this.getPartitionId(), other.getPartitionId())) {
return false;
}
return Objects.equals(this.getInfo(), other.getInfo());
}

@Override
public String toString() {
return "PartitionIdWithInfo{" + "partitionId=" + partitionId + ", info=" + info + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2023 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.services;

import org.apache.flink.annotation.Internal;

import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition;

import java.util.Objects;

/** */
@Internal
public class PartitionIdWithInfoAndStatus {
private final String partitionId;
private final TablePartitionInfo info;
private final BigQueryPartition.PartitionStatus status;

public PartitionIdWithInfoAndStatus(
String partitionId, TablePartitionInfo info, BigQueryPartition.PartitionStatus status) {
this.partitionId = partitionId;
this.info = info;
this.status = status;
}

public String getPartitionId() {
return partitionId;
}

public TablePartitionInfo getInfo() {
return info;
}

public BigQueryPartition.PartitionStatus getStatus() {
return status;
}

@Override
public int hashCode() {
int hash = 7;
hash = 17 * hash + Objects.hashCode(this.getPartitionId());
hash = 17 * hash + Objects.hashCode(this.getInfo());
hash = 17 * hash + Objects.hashCode(this.getStatus());
return hash;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final PartitionIdWithInfoAndStatus other = (PartitionIdWithInfoAndStatus) obj;
if (!Objects.equals(this.getPartitionId(), other.getPartitionId())) {
return false;
}
if (!Objects.equals(this.getInfo(), other.getInfo())) {
return false;
}
return this.getStatus() == other.getStatus();
}

@Override
public String toString() {
return "PartitionIdWithInfoAndStatus{"
+ "partitionId="
+ partitionId
+ ", info="
+ info
+ ", status="
+ status
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,34 @@

import org.apache.flink.annotation.Internal;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition;
import com.google.cloud.flink.bigquery.table.restrictions.BigQueryPartition.PartitionType;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/** Represents the information of the BigQuery table's partition. */
@Internal
public class TablePartitionInfo {

private final String columnName;
private final StandardSQLTypeName columnType;
private final BigQueryPartition.PartitionType partitionType;
private final PartitionType partitionType;
private final Instant streamingBufferOldestEntryTime;

public TablePartitionInfo(
String columnName,
BigQueryPartition.PartitionType partitionType,
StandardSQLTypeName columnType) {
PartitionType partitionType,
StandardSQLTypeName columnType,
Instant sbOldestEntryTime) {
this.columnName = columnName;
this.columnType = columnType;
this.partitionType = partitionType;
this.streamingBufferOldestEntryTime = sbOldestEntryTime;
}

public String getColumnName() {
Expand All @@ -46,10 +56,24 @@ public StandardSQLTypeName getColumnType() {
return columnType;
}

public BigQueryPartition.PartitionType getPartitionType() {
public PartitionType getPartitionType() {
return partitionType;
}

public Instant getStreamingBufferOldestEntryTime() {
return streamingBufferOldestEntryTime;
}

public List<PartitionIdWithInfo> toPartitionsWithInfo(List<String> partitionIds) {
return Optional.ofNullable(partitionIds)
.map(
ps ->
ps.stream()
.map(id -> new PartitionIdWithInfo(id, this))
.collect(Collectors.toList()))
.orElse(Lists.newArrayList());
}

@Override
public String toString() {
return "TablePartitionInfo{"
Expand Down
Loading

0 comments on commit 9748984

Please sign in to comment.