Skip to content

Commit

Permalink
Remove duplicate code for getting Iceberg table of various catalogTypes
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd authored and tdcmeehan committed Jan 18, 2024
1 parent f75f9db commit d11d79f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.rule.BaseSubfieldExtractionRewriter;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergHiveMetadata;
import com.facebook.presto.iceberg.IcebergResourceFactory;
import com.facebook.presto.iceberg.IcebergTableHandle;
import com.facebook.presto.iceberg.IcebergTableLayoutHandle;
import com.facebook.presto.iceberg.IcebergTransactionManager;
Expand Down Expand Up @@ -57,8 +53,7 @@
import static com.facebook.presto.hive.rule.FilterPushdownUtils.getDomainPredicate;
import static com.facebook.presto.hive.rule.FilterPushdownUtils.getPredicateColumnNames;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
Expand All @@ -75,8 +70,6 @@ public class IcebergFilterPushdown
private final RowExpressionService rowExpressionService;
private final StandardFunctionResolution functionResolution;
private final FunctionMetadataManager functionMetadataManager;
private final IcebergResourceFactory resourceFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final IcebergTransactionManager icebergTransactionManager;

Expand All @@ -85,16 +78,12 @@ public IcebergFilterPushdown(
StandardFunctionResolution functionResolution,
FunctionMetadataManager functionMetadataManager,
IcebergTransactionManager transactionManager,
IcebergResourceFactory resourceFactory,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager)
{
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
this.icebergTransactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

Expand All @@ -111,16 +100,12 @@ public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, Variable
functionResolution,
functionMetadataManager,
icebergTransactionManager,
resourceFactory,
hdfsEnvironment,
typeManager), maxSubplan);
}

public static class SubfieldExtractionRewriter
extends BaseSubfieldExtractionRewriter
{
private final IcebergResourceFactory resourceFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;

public SubfieldExtractionRewriter(
Expand All @@ -130,14 +115,10 @@ public SubfieldExtractionRewriter(
StandardFunctionResolution functionResolution,
FunctionMetadataManager functionMetadataManager,
IcebergTransactionManager icebergTransactionManager,
IcebergResourceFactory resourceFactory,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager)
{
super(session, idAllocator, rowExpressionService, functionResolution, functionMetadataManager, tableHandle -> getConnectorMetadata(icebergTransactionManager, tableHandle));

this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

Expand All @@ -156,15 +137,7 @@ protected ConnectorPushdownFilterResult getConnectorPushdownFilterResult(
checkArgument(metadata instanceof IcebergAbstractMetadata, "metadata must be IcebergAbstractMetadata");
checkArgument(tableHandle instanceof IcebergTableHandle, "tableHandle must be IcebergTableHandle");

Table icebergTable;
if (metadata instanceof IcebergHiveMetadata) {
ExtendedHiveMetastore metastore = ((IcebergHiveMetadata) metadata).getMetastore();
icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
}
else {
icebergTable = getNativeIcebergTable(resourceFactory, session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
}

Table icebergTable = getIcebergTable(metadata, session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
TupleDomain<ColumnHandle> unenforcedConstraint = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), not(Predicates.in(getPartitionKeyColumnHandles(icebergTable, typeManager)))));

TupleDomain<Subfield> domainPredicate = getDomainPredicate(decomposedFilter, unenforcedConstraint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package com.facebook.presto.iceberg.optimizer;

import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.iceberg.IcebergResourceFactory;
import com.facebook.presto.iceberg.IcebergTransactionManager;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
Expand All @@ -40,20 +38,16 @@ public IcebergPlanOptimizerProvider(
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
FunctionMetadataManager functionMetadataManager,
IcebergResourceFactory resourceFactory,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager)
{
requireNonNull(transactionManager, "transactionManager is null");
requireNonNull(rowExpressionService, "rowExpressionService is null");
requireNonNull(functionResolution, "functionResolution is null");
requireNonNull(functionMetadataManager, "functionMetadataManager is null");
requireNonNull(resourceFactory, "resourceFactory is null");
requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
requireNonNull(typeManager, "typeManager is null");
this.planOptimizers = ImmutableSet.of(
new IcebergPlanOptimizer(functionResolution, rowExpressionService, transactionManager),
new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, resourceFactory, hdfsEnvironment, typeManager),
new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, typeManager),
new IcebergParquetDereferencePushDown(transactionManager, rowExpressionService, typeManager));
}

Expand Down

0 comments on commit d11d79f

Please sign in to comment.