Skip to content

Commit

Permalink
Update HiveWriterFactory to use actualStorageFormat
Browse files Browse the repository at this point in the history
Use `actualStorageFormat` because storage format decision is
already made, having duplicate logic in HiveWriterFactory could
result in bug.

Also getTableStorageFormat and getPartitionStorageFormat API
will be deprecated from HiveWritableTableHandle
  • Loading branch information
kewang1024 authored and amitkdutta committed Oct 31, 2024
1 parent 4391c9b commit 8f630f9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
handle.getTableName(),
isCreateTable,
handle.getInputColumns(),
handle.getTableStorageFormat(),
handle.getPartitionStorageFormat(),
handle.getActualStorageFormat(),
handle.getCompressionCodec(),
additionalTableParameters,
bucketCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ public class HiveWriterFactory
private final List<String> partitionColumnNames;
private final List<Type> partitionColumnTypes;

private final HiveStorageFormat tableStorageFormat;
private final HiveStorageFormat partitionStorageFormat;
private final HiveStorageFormat storageFormat;
private final HiveCompressionCodec compressionCodec;
private final Map<String, String> additionalTableParameters;
private final LocationHandle locationHandle;
Expand Down Expand Up @@ -158,8 +157,7 @@ public HiveWriterFactory(
String tableName,
boolean isCreateTable,
List<HiveColumnHandle> inputColumns,
HiveStorageFormat tableStorageFormat,
HiveStorageFormat partitionStorageFormat,
HiveStorageFormat storageFormat,
HiveCompressionCodec compressionCodec,
Map<String, String> additionalTableParameters,
OptionalInt bucketCount,
Expand Down Expand Up @@ -188,8 +186,7 @@ public HiveWriterFactory(
this.tableName = requireNonNull(tableName, "tableName is null");
this.isCreateTable = isCreateTable;

this.tableStorageFormat = requireNonNull(tableStorageFormat, "tableStorageFormat is null");
this.partitionStorageFormat = requireNonNull(partitionStorageFormat, "partitionStorageFormat is null");
this.storageFormat = requireNonNull(storageFormat, "storageFormat is null");
this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null");
this.additionalTableParameters = ImmutableMap.copyOf(requireNonNull(additionalTableParameters, "additionalTableParameters is null"));
this.locationHandle = requireNonNull(locationHandle, "locationHandle is null");
Expand Down Expand Up @@ -392,7 +389,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
.collect(toList()),
writerParameters.getOutputStorageFormat(),
schema,
partitionStorageFormat.getEstimatedWriterSystemMemoryUsage(),
storageFormat.getEstimatedWriterSystemMemoryUsage(),
conf,
typeManager,
session);
Expand Down Expand Up @@ -449,7 +446,7 @@ private WriterParameters getWriterParametersForNewUnpartitionedTable()
UpdateMode.NEW,
createHiveSchema(dataColumns),
locationService.getTableWriteInfo(locationHandle),
fromHiveStorageFormat(tableStorageFormat));
fromHiveStorageFormat(storageFormat));
}

private WriterParameters getWriterParametersForNewPartitionedTable(String partitionName)
Expand All @@ -472,7 +469,7 @@ private WriterParameters getWriterParametersForNewPartitionedTable(String partit
UpdateMode.NEW,
createHiveSchema(dataColumns),
writeInfo,
fromHiveStorageFormat(partitionStorageFormat));
fromHiveStorageFormat(storageFormat));
}

private WriterParameters getWriterParametersForExistingUnpartitionedTable(OptionalInt bucketNumber)
Expand All @@ -490,7 +487,7 @@ private WriterParameters getWriterParametersForExistingUnpartitionedTable(Option
UpdateMode.APPEND,
getHiveSchema(table),
locationService.getTableWriteInfo(locationHandle),
fromHiveStorageFormat(tableStorageFormat));
fromHiveStorageFormat(storageFormat));
}

private WriterParameters getWriterParametersForExistingPartitionedTable(String partitionName, OptionalInt bucketNumber)
Expand Down Expand Up @@ -520,7 +517,7 @@ private WriterParameters getWriterParametersForAppendPartition(String partitionN
UpdateMode.NEW,
getHiveSchema(table),
locationService.getPartitionWriteInfo(locationHandle, Optional.empty(), partitionName),
fromHiveStorageFormat(partitionStorageFormat));
fromHiveStorageFormat(storageFormat));
}
// Append to an existing partition
checkState(!immutablePartitions);
Expand Down Expand Up @@ -550,7 +547,7 @@ private WriterParameters getWriterParametersForOverwritePartition(String partiti
UpdateMode.OVERWRITE,
getHiveSchema(table),
writeInfo,
fromHiveStorageFormat(partitionStorageFormat));
fromHiveStorageFormat(storageFormat));
}

private WriterParameters getWriterParametersForImmutablePartition(String partitionName)
Expand All @@ -570,7 +567,7 @@ private WriterParameters getWriterParametersForImmutablePartition(String partiti
UpdateMode.NEW,
getHiveSchema(table),
writerInfo,
fromHiveStorageFormat(partitionStorageFormat));
fromHiveStorageFormat(storageFormat));
}

private void validateSchema(Optional<String> partitionName, Properties schema)
Expand Down

0 comments on commit 8f630f9

Please sign in to comment.