Skip to content

Commit

Permalink
[Iceberg] Session property for target split size
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Feb 18, 2025
1 parent 8accda9 commit c1ec5a7
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 46 deletions.
13 changes: 10 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ connector using a WITH clause:

The following table properties are available, which are specific to the Presto Iceberg connector:

======================================= =============================================================== ============
======================================= =============================================================== =========================
Property Name Description Default
======================================= =============================================================== ============
======================================= =============================================================== =========================
``format`` Optionally specifies the format of table data files, ``PARQUET``
either ``PARQUET`` or ``ORC``.

Expand Down Expand Up @@ -388,7 +388,11 @@ Property Name Description

``metrics_max_inferred_column`` Optionally specifies the maximum number of columns for which ``100``
metrics are collected.
======================================= =============================================================== ============

``read.split.target-size`` The target size for an individual split when generating splits ``134217728`` (128MB)
for a table scan. Generated splits may still be larger or
smaller than this value. Must be specified in bytes.
======================================= =============================================================== =========================

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
and a file system location of ``s3://test_bucket/test_schema/test_table``:
Expand Down Expand Up @@ -421,6 +425,9 @@ Property Name Description
``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property
``iceberg.rows-for-metadata-optimization-threshold`` in the current
session.
``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes.
Set to 0 to use the value in each Iceberg table's
``read.split.target-size`` property.
===================================================== ======================================================================

Caching Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;

public abstract class IcebergAbstractMetadata
Expand Down Expand Up @@ -719,6 +720,7 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));

SortOrder sortOrder = icebergTable.sortOrder();
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250)
Expand Down Expand Up @@ -1127,6 +1129,9 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
case COMMIT_RETRIES:
updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue()));
break;
case SPLIT_SIZE:
updateProperties.set(TableProperties.SPLIT_SIZE, entry.getValue().toString());
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;

public final class IcebergSessionProperties
Expand All @@ -65,6 +66,7 @@ public final class IcebergSessionProperties
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter";
public static final String TARGET_SPLIT_SIZE = "target_split_size";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -189,6 +191,11 @@ public IcebergSessionProperties(
.add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER,
"The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics",
icebergConfig.getStatisticsKllSketchKParameter(),
false))
.add(longProperty(
TARGET_SPLIT_SIZE,
"The target split size. Set to 0 to use the iceberg table's read.split.target-size property",
0L,
false));

nessieConfig.ifPresent((config) -> propertiesBuilder
Expand Down Expand Up @@ -323,4 +330,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session)
{
return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class);
}

public static Long getTargetSplitSize(ConnectorSession session)
{
return session.getProperty(TARGET_SPLIT_SIZE, Long.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

Expand All @@ -41,7 +40,6 @@
import java.util.concurrent.ThreadPoolExecutor;

import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
Expand Down Expand Up @@ -95,7 +93,7 @@ public ConnectorSplitSource getSplits(
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
}
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
Expand All @@ -117,8 +115,6 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
IcebergSplitSource splitSource = new IcebergSplitSource(
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
getMinimumAssignedSplitWeight(session),
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
return splitSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;

import java.io.IOException;
Expand All @@ -39,40 +39,45 @@

import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.iceberg.util.TableScanUtil.splitFiles;

public class IcebergSplitSource
implements ConnectorSplitSource
{
private CloseableIterator<FileScanTask> fileScanTaskIterator;

private final TableScan tableScan;
private final Closer closer = Closer.create();
private final double minimumAssignedSplitWeight;
private final ConnectorSession session;
private final long targetSplitSize;
private final NodeSelectionStrategy nodeSelectionStrategy;

private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;

public IcebergSplitSource(
ConnectorSession session,
TableScan tableScan,
CloseableIterable<FileScanTask> fileScanTaskIterable,
double minimumAssignedSplitWeight,
TupleDomain<IcebergColumnHandle> metadataColumnConstraints)
{
this.session = requireNonNull(session, "session is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
requireNonNull(session, "session is null");
this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null");
closer.register(fileScanTaskIterator);
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
this.fileScanTaskIterator = closer.register(
splitFiles(
closer.register(tableScan.planFiles()),
targetSplitSize)
.iterator());
}

@Override
Expand Down Expand Up @@ -130,8 +135,8 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
getPartitionKeys(task),
PartitionSpecParser.toJson(spec),
partitionData.map(PartitionData::toJson),
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
nodeSelectionStrategy,
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ENGLISH;
Expand All @@ -47,6 +48,7 @@ public class IcebergTableProperties
public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max";
public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit";
public static final String METRICS_MAX_INFERRED_COLUMN = "metrics_max_inferred_column";
public static final String TARGET_SPLIT_SIZE = TableProperties.SPLIT_SIZE;
private static final String DEFAULT_FORMAT_VERSION = "2";

private final List<PropertyMetadata<?>> tableProperties;
Expand Down Expand Up @@ -133,6 +135,10 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
false,
value -> RowLevelOperationMode.fromName((String) value),
RowLevelOperationMode::modeName))
.add(longProperty(TARGET_SPLIT_SIZE,
"Desired size of split to generate during query scan planning",
TableProperties.SPLIT_SIZE_DEFAULT,
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -210,4 +216,9 @@ public static RowLevelOperationMode getUpdateMode(Map<String, Object> tablePrope
{
return (RowLevelOperationMode) tableProperties.get(UPDATE_MODE);
}

public static Long getTargetSplitSize(Map<String, Object> tableProperties)
{
return (Long) tableProperties.get(TableProperties.SPLIT_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.units.DataSize;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
Expand All @@ -61,6 +62,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -157,6 +159,7 @@
import static com.google.common.collect.Streams.stream;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
import static java.lang.Double.parseDouble;
Expand Down Expand Up @@ -195,6 +198,8 @@
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand Down Expand Up @@ -1176,6 +1181,9 @@ public static Map<String, String> populateTableProperties(ConnectorTableMetadata

Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));

propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));

return propertiesBuilder.build();
}

Expand Down Expand Up @@ -1286,4 +1294,23 @@ public static String dataLocation(Table icebergTable)
}
return dataLocation;
}

public static Long getSplitSize(Table table)
{
return Long.parseLong(table.properties()
.getOrDefault(SPLIT_SIZE,
String.valueOf(SPLIT_SIZE_DEFAULT)));
}

public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
{
return sessionValueProperty == 0 ?
succinctBytes(icebergScanTargetSplitSize) :
succinctBytes(sessionValueProperty);
}

public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
{
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
}
}
Loading

0 comments on commit c1ec5a7

Please sign in to comment.