Skip to content

Commit e823b20

Browse files
committed
Address comment: add a new method in ConnectorSplitSource to avoid adding a customized split source
1 parent bd06a6b commit e823b20

File tree

7 files changed

+51
-199
lines changed

7 files changed

+51
-199
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java

Lines changed: 0 additions & 137 deletions
This file was deleted.

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.facebook.presto.spi.ConnectorNewTableLayout;
4343
import com.facebook.presto.spi.ConnectorOutputTableHandle;
4444
import com.facebook.presto.spi.ConnectorSession;
45-
import com.facebook.presto.spi.ConnectorSplitSource;
4645
import com.facebook.presto.spi.ConnectorTableHandle;
4746
import com.facebook.presto.spi.ConnectorTableLayout;
4847
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
@@ -327,9 +326,9 @@ protected abstract void updateIcebergViewProperties(
327326

328327
public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);
329328

330-
public Optional<ConnectorSplitSource> getSplitSourceInCurrentCallProcedureTransaction()
329+
public Optional<IcebergProcedureContext> getProcedureContext()
331330
{
332-
return procedureContext.flatMap(IcebergProcedureContext::getConnectorSplitSource);
331+
return this.procedureContext;
333332
}
334333

335334
/**

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
*/
1414
package com.facebook.presto.iceberg;
1515

16-
import com.facebook.presto.spi.ConnectorSplitSource;
1716
import com.facebook.presto.spi.connector.ConnectorProcedureContext;
1817
import org.apache.iceberg.DataFile;
1918
import org.apache.iceberg.DeleteFile;
19+
import org.apache.iceberg.FileScanTask;
2020
import org.apache.iceberg.Table;
2121
import org.apache.iceberg.Transaction;
2222

@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.Optional;
2727
import java.util.Set;
28+
import java.util.function.Consumer;
2829

2930
import static java.util.Objects.requireNonNull;
3031

@@ -36,7 +37,7 @@ public class IcebergProcedureContext
3637
final Map<String, Object> relevantData = new HashMap<>();
3738
Optional<Table> table = Optional.empty();
3839
Transaction transaction;
39-
Optional<ConnectorSplitSource> connectorSplitSource = Optional.empty();
40+
Optional<Consumer<FileScanTask>> fileScanTaskConsumer = Optional.empty();
4041

4142
public void setTable(Table table)
4243
{
@@ -58,15 +59,15 @@ public Transaction getTransaction()
5859
return transaction;
5960
}
6061

61-
public void setConnectorSplitSource(ConnectorSplitSource connectorSplitSource)
62+
public void setFileScanTaskConsumer(Consumer<FileScanTask> fileScanTaskConsumer)
6263
{
63-
requireNonNull(connectorSplitSource, "connectorSplitSource is null");
64-
this.connectorSplitSource = Optional.of(connectorSplitSource);
64+
requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null");
65+
this.fileScanTaskConsumer = Optional.of(fileScanTaskConsumer);
6566
}
6667

67-
public Optional<ConnectorSplitSource> getConnectorSplitSource()
68+
public Optional<Consumer<FileScanTask>> getFileScanTaskConsumer()
6869
{
69-
return this.connectorSplitSource;
70+
return this.fileScanTaskConsumer;
7071
}
7172

7273
public Set<DataFile> getScannedDataFiles()
@@ -89,7 +90,6 @@ public void destroy()
8990
this.relevantData.clear();
9091
this.scannedDataFiles.clear();
9192
this.fullyAppliedDeleteFiles.clear();
92-
this.connectorSplitSource.ifPresent(ConnectorSplitSource::close);
93-
this.connectorSplitSource = Optional.empty();
93+
this.fileScanTaskConsumer = Optional.empty();
9494
}
9595
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.weakref.jmx.Managed;
3737
import org.weakref.jmx.Nested;
3838

39-
import java.util.Optional;
4039
import java.util.concurrent.ExecutorService;
4140
import java.util.concurrent.ThreadPoolExecutor;
4241

@@ -84,14 +83,6 @@ public ConnectorSplitSource getSplits(
8483

8584
TupleDomain<IcebergColumnHandle> predicate = getNonMetadataColumnConstraints(layoutHandle
8685
.getValidPredicate());
87-
ConnectorMetadata connectorMetadata = transactionManager.get(transaction);
88-
if (connectorMetadata != null) {
89-
IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata;
90-
Optional<ConnectorSplitSource> connectorSplitSource = icebergMetadata.getSplitSourceInCurrentCallProcedureTransaction();
91-
if (connectorSplitSource.isPresent()) {
92-
return connectorSplitSource.get();
93-
}
94-
}
9586

9687
Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName());
9788

@@ -129,6 +120,11 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
129120
session,
130121
tableScan,
131122
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
123+
ConnectorMetadata connectorMetadata = transactionManager.get(transaction);
124+
if (connectorMetadata != null) {
125+
IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata;
126+
icebergMetadata.getProcedureContext().ifPresent(splitSource::initDistributedProcedureContext);
127+
}
132128
return splitSource;
133129
}
134130
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.facebook.presto.spi.ConnectorSplitSource;
2121
import com.facebook.presto.spi.SplitWeight;
2222
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
23+
import com.facebook.presto.spi.connector.ConnectorProcedureContext;
2324
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
2425
import com.google.common.collect.ImmutableList;
2526
import com.google.common.io.Closer;
@@ -65,6 +66,8 @@ public class IcebergSplitSource
6566

6667
private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;
6768

69+
private Optional<ConnectorProcedureContext> procedureContext = Optional.empty();
70+
6871
public IcebergSplitSource(
6972
ConnectorSession session,
7073
TableScan tableScan,
@@ -91,6 +94,9 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
9194
Iterator<FileScanTask> iterator = limit(fileScanTaskIterator, maxSize);
9295
while (iterator.hasNext()) {
9396
FileScanTask task = iterator.next();
97+
procedureContext.map(IcebergProcedureContext.class::cast)
98+
.flatMap(IcebergProcedureContext::getFileScanTaskConsumer)
99+
.ifPresent(consumer -> consumer.accept(task));
94100
IcebergSplit icebergSplit = (IcebergSplit) toIcebergSplit(task);
95101
if (metadataColumnsMatchPredicates(metadataColumnConstraints, icebergSplit.getPath(), icebergSplit.getDataSequenceNumber())) {
96102
splits.add(icebergSplit);
@@ -99,6 +105,12 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
99105
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
100106
}
101107

108+
@Override
109+
public void initDistributedProcedureContext(ConnectorProcedureContext procedureContext)
110+
{
111+
this.procedureContext = Optional.of(requireNonNull(procedureContext, "procedureContext is null"));
112+
}
113+
102114
@Override
103115
public boolean isFinished()
104116
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import com.facebook.presto.common.type.TypeManager;
1919
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
2020
import com.facebook.presto.spi.ConnectorSession;
21-
import com.facebook.presto.spi.ConnectorSplitSource;
22-
import com.facebook.presto.spi.FixedSplitSource;
2321
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
2422
import com.facebook.presto.spi.procedure.DistributedProcedure;
2523
import com.facebook.presto.spi.procedure.DistributedProcedure.Argument;
@@ -36,24 +34,19 @@
3634
import org.apache.iceberg.RewriteFiles;
3735
import org.apache.iceberg.Snapshot;
3836
import org.apache.iceberg.Table;
39-
import org.apache.iceberg.TableScan;
4037
import org.apache.iceberg.types.Type;
41-
import org.apache.iceberg.util.TableScanUtil;
4238

4339
import javax.inject.Inject;
4440
import javax.inject.Provider;
4541

4642
import java.util.Collection;
4743
import java.util.HashSet;
4844
import java.util.List;
49-
import java.util.Optional;
5045
import java.util.Set;
5146
import java.util.function.Consumer;
5247

5348
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
54-
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
5549
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
56-
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
5750
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
5851
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
5952
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
@@ -100,40 +93,23 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec
10093
Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
10194
IcebergTableHandle tableHandle = layoutHandle.getTable();
10295

103-
ConnectorSplitSource splitSource;
104-
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
105-
splitSource = new FixedSplitSource(ImmutableList.of());
106-
}
107-
else {
108-
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
109-
TableScan tableScan = icebergTable.newScan()
110-
.filter(toIcebergExpression(predicate))
111-
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
112-
113-
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
114-
procedureContext.getScannedDataFiles().add(task.file());
115-
if (!task.deletes().isEmpty()) {
116-
task.deletes().forEach(deleteFile -> {
117-
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
118-
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
119-
!predicate.isAll()) {
120-
// Equality files with an unpartitioned spec are applied as global deletes
121-
// So they should not be cleaned up unless the whole table is optimized
122-
return;
123-
}
124-
procedureContext.getFullyAppliedDeleteFiles().add(deleteFile);
125-
});
126-
}
127-
};
128-
129-
splitSource = new CallDistributedProcedureSplitSource(
130-
session,
131-
tableScan,
132-
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
133-
Optional.of(fileScanTaskConsumer),
134-
getMinimumAssignedSplitWeight(session));
135-
}
136-
procedureContext.setConnectorSplitSource(splitSource);
96+
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
97+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
98+
procedureContext.getScannedDataFiles().add(task.file());
99+
if (!task.deletes().isEmpty()) {
100+
task.deletes().forEach(deleteFile -> {
101+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
102+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
103+
!predicate.isAll()) {
104+
// Equality files with an unpartitioned spec are applied as global deletes
105+
// So they should not be cleaned up unless the whole table is optimized
106+
return;
107+
}
108+
procedureContext.getFullyAppliedDeleteFiles().add(deleteFile);
109+
});
110+
}
111+
};
112+
procedureContext.setFileScanTaskConsumer(fileScanTaskConsumer);
137113

138114
return new IcebergDistributedProcedureHandle(
139115
tableHandle.getSchemaName(),

0 commit comments

Comments
 (0)