Skip to content

Commit

Permalink
use parquet reader for cdc reading (apache#11775)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Aug 15, 2024
1 parent 31eea3b commit d4a4d9c
Showing 1 changed file with 9 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,6 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
val cdcFileReader = if (isCDC) {
super.buildReaderWithPartitionValues(
spark,
tableSchema.structTypeSchema,
StructType(Nil),
tableSchema.structTypeSchema,
Nil,
options,
new Configuration(hadoopConf))
}

val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, sanitizedTableName)
val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName)
val parquetFileReader = spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
Expand Down Expand Up @@ -174,29 +163,27 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping =>
val fileSplits = hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
val fileGroupSplit: HoodieCDCFileGroupSplit = HoodieCDCFileGroupSplit(fileSplits)
buildCDCRecordIterator(
fileGroupSplit, cdcFileReader.asInstanceOf[PartitionedFile => Iterator[InternalRow]],
storageConf, fileIndexProps, requiredSchema)
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)

case _ => parquetFileReader.value.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
}
}
}

private def setSchemaEvolutionConfigs(conf: StorageConfiguration[_]): Unit = {
private def setSchemaEvolutionConfigs(conf: StorageConfiguration[Configuration]): Unit = {
if (internalSchemaOpt.isPresent) {
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tableState.tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
}
}

protected def buildCDCRecordIterator(cdcFileGroupSplit: HoodieCDCFileGroupSplit,
cdcFileReader: PartitionedFile => Iterator[InternalRow],
storageConf: StorageConfiguration[_],
protected def buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping,
parquetFileReader: SparkParquetReader,
storageConf: StorageConfiguration[Configuration],
props: TypedProperties,
requiredSchema: StructType): Iterator[InternalRow] = {
val fileSplits = hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
val cdcFileGroupSplit: HoodieCDCFileGroupSplit = HoodieCDCFileGroupSplit(fileSplits)
props.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, tableName)
val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
val metaClient = HoodieTableMetaClient.builder
Expand All @@ -205,7 +192,8 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
cdcFileGroupSplit,
metaClient,
storageConf,
cdcFileReader,
(file: PartitionedFile) =>
parquetFileReader.read(file, tableSchema.structTypeSchema, new StructType(), internalSchemaOpt, Seq.empty, storageConf),
tableSchema,
cdcSchema,
requiredSchema,
Expand Down

0 comments on commit d4a4d9c

Please sign in to comment.