Skip to content

Commit

Permalink
Add configs for sink table creation (#190)
Browse files Browse the repository at this point in the history
1. Accept additional configs in BQ sink (Datastream and Table API) for
table creation, including partitioning and clustering
2. Create default schema provider in sink if not provided by user
  • Loading branch information
jayehwhyehentee authored Dec 18, 2024
1 parent 4900b08 commit 4228405
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProviderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,10 +46,14 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {

BigQueryBaseSink(BigQuerySinkConfig sinkConfig) {
validateSinkConfig(sinkConfig);
this.connectOptions = sinkConfig.getConnectOptions();
this.schemaProvider = sinkConfig.getSchemaProvider();
this.serializer = sinkConfig.getSerializer();
this.tablePath =
connectOptions = sinkConfig.getConnectOptions();
if (sinkConfig.getSchemaProvider() == null) {
schemaProvider = new BigQuerySchemaProviderImpl(connectOptions);
} else {
schemaProvider = sinkConfig.getSchemaProvider();
}
serializer = sinkConfig.getSerializer();
tablePath =
String.format(
"projects/%s/datasets/%s/tables/%s",
connectOptions.getProjectId(),
Expand All @@ -63,9 +68,6 @@ private void validateSinkConfig(BigQuerySinkConfig sinkConfig) {
if (sinkConfig.getSerializer() == null) {
throw new IllegalArgumentException("BigQuery serializer cannot be null");
}
if (sinkConfig.getSchemaProvider() == null) {
throw new IllegalArgumentException("BigQuery schema provider cannot be null");
}
}

/** Ensures Sink's parallelism does not exceed the allowed maximum when scaling Flink job. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.LogicalType;

import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
Expand All @@ -35,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;

/**
Expand All @@ -53,6 +55,12 @@ public class BigQuerySinkConfig {
private final DeliveryGuarantee deliveryGuarantee;
private final BigQuerySchemaProvider schemaProvider;
private final BigQueryProtoSerializer serializer;
private final boolean enableTableCreation;
private final String partitionField;
private final TimePartitioning.Type partitionType;
private final Long partitionExpirationMillis;
private final List<String> clusteredFields;
private final String region;

public static Builder newBuilder() {
return new Builder();
Expand All @@ -61,7 +69,16 @@ public static Builder newBuilder() {
@Override
public int hashCode() {
return Objects.hash(
this.connectOptions, this.deliveryGuarantee, this.schemaProvider, this.serializer);
connectOptions,
deliveryGuarantee,
schemaProvider,
serializer,
enableTableCreation,
partitionField,
partitionType,
partitionExpirationMillis,
clusteredFields,
region);
}

@Override
Expand All @@ -76,25 +93,40 @@ public boolean equals(Object obj) {
return false;
}
BigQuerySinkConfig object = (BigQuerySinkConfig) obj;
if (this.getConnectOptions() == object.getConnectOptions()
return (this.getConnectOptions() == object.getConnectOptions()
&& (this.getSerializer().getClass() == object.getSerializer().getClass())
&& (this.getDeliveryGuarantee() == object.getDeliveryGuarantee())) {
BigQuerySchemaProvider thisSchemaProvider = this.getSchemaProvider();
BigQuerySchemaProvider objSchemaProvider = object.getSchemaProvider();
return thisSchemaProvider.getAvroSchema().equals(objSchemaProvider.getAvroSchema());
}
return false;
&& (this.getDeliveryGuarantee() == object.getDeliveryGuarantee())
&& (this.enableTableCreation() == object.enableTableCreation())
&& (Objects.equals(this.getPartitionField(), object.getPartitionField()))
&& (this.getPartitionType() == object.getPartitionType())
&& (Objects.equals(
this.getPartitionExpirationMillis(), object.getPartitionExpirationMillis()))
&& (Objects.equals(this.getClusteredFields(), object.getClusteredFields()))
&& (Objects.equals(this.getRegion(), object.getRegion()))
&& (Objects.equals(this.getSchemaProvider(), object.getSchemaProvider())));
}

private BigQuerySinkConfig(
BigQueryConnectOptions connectOptions,
DeliveryGuarantee deliveryGuarantee,
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer) {
BigQueryProtoSerializer serializer,
boolean enableTableCreation,
String partitionField,
TimePartitioning.Type partitionType,
Long partitionExpirationMillis,
List<String> clusteredFields,
String region) {
this.connectOptions = connectOptions;
this.deliveryGuarantee = deliveryGuarantee;
this.schemaProvider = schemaProvider;
this.serializer = serializer;
this.enableTableCreation = enableTableCreation;
this.partitionField = partitionField;
this.partitionType = partitionType;
this.partitionExpirationMillis = partitionExpirationMillis;
this.clusteredFields = clusteredFields;
this.region = region;
}

public BigQueryConnectOptions getConnectOptions() {
Expand All @@ -113,13 +145,43 @@ public BigQuerySchemaProvider getSchemaProvider() {
return schemaProvider;
}

public boolean enableTableCreation() {
return enableTableCreation;
}

public String getPartitionField() {
return partitionField;
}

public TimePartitioning.Type getPartitionType() {
return partitionType;
}

public Long getPartitionExpirationMillis() {
return partitionExpirationMillis;
}

public List<String> getClusteredFields() {
return clusteredFields;
}

public String getRegion() {
return region;
}

/** Builder for BigQuerySinkConfig. */
public static class Builder {

private BigQueryConnectOptions connectOptions;
private DeliveryGuarantee deliveryGuarantee;
private BigQuerySchemaProvider schemaProvider;
private BigQueryProtoSerializer serializer;
private boolean enableTableCreation;
private String partitionField;
private TimePartitioning.Type partitionType;
private Long partitionExpirationMillis;
private List<String> clusteredFields;
private String region;
private StreamExecutionEnvironment env;

public Builder connectOptions(BigQueryConnectOptions connectOptions) {
Expand All @@ -142,6 +204,36 @@ public Builder serializer(BigQueryProtoSerializer serializer) {
return this;
}

public Builder enableTableCreation(boolean enableTableCreation) {
this.enableTableCreation = enableTableCreation;
return this;
}

public Builder partitionField(String partitionField) {
this.partitionField = partitionField;
return this;
}

public Builder partitionType(TimePartitioning.Type partitionType) {
this.partitionType = partitionType;
return this;
}

public Builder partitionExpirationMillis(Long partitionExpirationMillis) {
this.partitionExpirationMillis = partitionExpirationMillis;
return this;
}

public Builder clusteredFields(List<String> clusteredFields) {
this.clusteredFields = clusteredFields;
return this;
}

public Builder region(String region) {
this.region = region;
return this;
}

public Builder streamExecutionEnvironment(
StreamExecutionEnvironment streamExecutionEnvironment) {
this.env = streamExecutionEnvironment;
Expand All @@ -153,7 +245,16 @@ public BigQuerySinkConfig build() {
validateStreamExecutionEnvironment(env);
}
return new BigQuerySinkConfig(
connectOptions, deliveryGuarantee, schemaProvider, serializer);
connectOptions,
deliveryGuarantee,
schemaProvider,
serializer,
enableTableCreation,
partitionField,
partitionType,
partitionExpirationMillis,
clusteredFields,
region);
}
}

Expand All @@ -164,13 +265,25 @@ public BigQuerySinkConfig build() {
public static BigQuerySinkConfig forTable(
BigQueryConnectOptions connectOptions,
DeliveryGuarantee deliveryGuarantee,
LogicalType logicalType) {
LogicalType logicalType,
boolean enableTableCreation,
String partitionField,
TimePartitioning.Type partitionType,
Long partitionExpirationMillis,
List<String> clusteredFields,
String region) {
return new BigQuerySinkConfig(
connectOptions,
deliveryGuarantee,
new BigQuerySchemaProviderImpl(
BigQueryTableSchemaProvider.getAvroSchemaFromLogicalSchema(logicalType)),
new RowDataToProtoSerializer());
new RowDataToProtoSerializer(),
enableTableCreation,
partitionField,
partitionType,
partitionExpirationMillis,
clusteredFields,
region);
}

public static void validateStreamExecutionEnvironment(StreamExecutionEnvironment env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
Expand Down Expand Up @@ -92,6 +93,29 @@ public Schema getAvroSchema() {
return this.avroSchema;
}

@Override
public int hashCode() {
Schema thisAvroSchema = getAvroSchema();
if (thisAvroSchema == null) {
return Integer.MIN_VALUE;
}
return thisAvroSchema.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
return Objects.equals(getAvroSchema(), ((BigQuerySchemaProviderImpl) obj).getAvroSchema());
}

// ----------- Initialize Maps between Avro Schema to Descriptor Proto schema -------------
static {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ public Set<ConfigOption<?>> optionalOptions() {
additionalOptions.add(BigQueryConnectorOptions.DELIVERY_GUARANTEE);
additionalOptions.add(BigQueryConnectorOptions.PARTITION_DISCOVERY_INTERVAL);
additionalOptions.add(BigQueryConnectorOptions.SINK_PARALLELISM);
additionalOptions.add(BigQueryConnectorOptions.ENABLE_TABLE_CREATION);
additionalOptions.add(BigQueryConnectorOptions.PARTITION_FIELD);
additionalOptions.add(BigQueryConnectorOptions.PARTITION_TYPE);
additionalOptions.add(BigQueryConnectorOptions.PARTITION_EXPIRATION_MILLIS);
additionalOptions.add(BigQueryConnectorOptions.CLUSTERED_FIELDS);
additionalOptions.add(BigQueryConnectorOptions.REGION);

return additionalOptions;
}
Expand All @@ -101,6 +107,12 @@ public Set<ConfigOption<?>> forwardOptions() {
forwardOptions.add(BigQueryConnectorOptions.DELIVERY_GUARANTEE);
forwardOptions.add(BigQueryConnectorOptions.PARTITION_DISCOVERY_INTERVAL);
forwardOptions.add(BigQueryConnectorOptions.SINK_PARALLELISM);
forwardOptions.add(BigQueryConnectorOptions.ENABLE_TABLE_CREATION);
forwardOptions.add(BigQueryConnectorOptions.PARTITION_FIELD);
forwardOptions.add(BigQueryConnectorOptions.PARTITION_TYPE);
forwardOptions.add(BigQueryConnectorOptions.PARTITION_EXPIRATION_MILLIS);
forwardOptions.add(BigQueryConnectorOptions.CLUSTERED_FIELDS);
forwardOptions.add(BigQueryConnectorOptions.REGION);

return forwardOptions;
}
Expand Down Expand Up @@ -146,8 +158,14 @@ public DynamicTableSink createDynamicTableSink(Context context) {

return new BigQueryDynamicTableSink(
configProvider.translateBigQueryConnectOptions(),
configProvider.translateDeliveryGuarantee(),
configProvider.getDeliveryGuarantee(),
context.getPhysicalRowDataType().getLogicalType(),
configProvider.getParallelism().orElse(null));
configProvider.getParallelism().orElse(null),
configProvider.enableTableCreation(),
configProvider.getPartitionField().orElse(null),
configProvider.getPartitionType().orElse(null),
configProvider.getPartitionExpirationMillis().orElse(null),
configProvider.getClusteredFields().orElse(null),
configProvider.getRegion().orElse(null));
}
}
Loading

0 comments on commit 4228405

Please sign in to comment.