Skip to content

fix: Support Schema Evolution in iceberg #1723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public abstract class AbstractColumnReader implements AutoCloseable {
/** A pointer to the native implementation of ColumnReader. */
protected long nativeHandle;

/**
* Whether to enable schema evolution in Comet. For instance, promoting a integer column to a long
* column, a float column to a double column, etc. This is automatically enabled when reading from
* Iceberg tables.
*/
protected boolean supportsSchemaEvolution;

public AbstractColumnReader(
DataType type,
Type fieldType,
Expand All @@ -80,9 +87,11 @@ public AbstractColumnReader(
DataType type,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
boolean useLegacyDateTimestamp,
boolean supportsSchemaEvolution) {
this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp);
TypeUtil.checkParquetType(descriptor, type);
this.supportsSchemaEvolution = supportsSchemaEvolution;
TypeUtil.checkParquetType(descriptor, type, supportsSchemaEvolution);
}

public ColumnDescriptor getDescriptor() {
Expand Down Expand Up @@ -120,7 +129,10 @@ public void close() {

protected void initNative() {
LOG.debug("initializing the native column reader");
DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null;
DataType readType =
((boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() || supportsSchemaEvolution)
? type
: null;
boolean useLegacyDateTimestampOrNTZ =
useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$;
nativeHandle =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
capacity,
useDecimal128,
useLazyMaterialization,
useLegacyDateTimestamp);
useLegacyDateTimestamp,
false);
reader.setPageReader(rowGroupReader.getPageReader(columns.get(i)));
columnReaders[i] = reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public ColumnReader(
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
boolean useLegacyDateTimestamp,
boolean supportsSchemaEvolution) {
super(type, descriptor, useDecimal128, useLegacyDateTimestamp, supportsSchemaEvolution);
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
this.batchSize = batchSize;
this.importer = importer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ public LazyColumnReader(
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
boolean useLegacyDateTimestamp,
boolean supportsSchemaEvolution) {
super(
sparkReadType,
descriptor,
importer,
batchSize,
useDecimal128,
useLegacyDateTimestamp,
supportsSchemaEvolution);
this.batchSize = 0; // the batch size is set later in `readBatch`
this.vector = new CometLazyVector(sparkReadType, this, useDecimal128);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class MetadataColumnReader extends AbstractColumnReader {
public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);
super(type, descriptor, useDecimal128, false, false);

this.isConstant = isConstant;
}
Expand Down
7 changes: 5 additions & 2 deletions common/src/main/java/org/apache/comet/parquet/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ public static ColumnDescriptor convertToParquet(StructField field) {
* @param descriptor descriptor for a Parquet primitive column
* @param sparkType Spark read type
*/
public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkType) {
public static void checkParquetType(
ColumnDescriptor descriptor, DataType sparkType, boolean allowTypePromotion) {
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
LogicalTypeAnnotation logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
if (!allowTypePromotion) {
allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
}

if (sparkType instanceof NullType) {
return;
Expand Down
32 changes: 16 additions & 16 deletions common/src/main/java/org/apache/comet/parquet/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@

public class Utils {

/** This method is called from Apache Iceberg. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Shall we keep this comment? I think it is useful if it's still valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will use the same method for both Comet and Iceberg. The comment is not needed any more.

public static ColumnReader getColumnReader(
DataType type,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization) {
// TODO: support `useLegacyDateTimestamp` for Iceberg
return getColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true);
}

public static ColumnReader getColumnReader(
DataType type,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization,
boolean useLegacyDateTimestamp) {
boolean useLegacyDateTimestamp,
boolean supportsSchemaEvolution) {
if (useLazyMaterialization && supportLazyMaterialization(type)) {
return new LazyColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
type,
descriptor,
importer,
batchSize,
useDecimal128,
useLegacyDateTimestamp,
supportsSchemaEvolution);
} else {
return new ColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
type,
descriptor,
importer,
batchSize,
useDecimal128,
useLegacyDateTimestamp,
supportsSchemaEvolution);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf
import org.apache.comet.CometConf._
Expand Down Expand Up @@ -180,8 +179,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
}

if (s.isCometEnabled && schemaSupported) {
// When reading from Iceberg, we automatically enable type promotion
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters)
Expand Down
Loading