Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.iceberg.input;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.DruidException;
import org.apache.druid.iceberg.filter.IcebergFilter;
Expand All @@ -28,34 +29,36 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/*
* Druid wrapper for an iceberg catalog.
* The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot.
* This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files.
* This applies column projections to read only required columns from the data files, reducing data transfer and memory usage.
*/

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY)
public abstract class IcebergCatalog
{
public abstract class IcebergCatalog {
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
private static final Logger log = new Logger(IcebergCatalog.class);

public abstract Catalog retrieveCatalog();

public boolean isCaseSensitive()
{
public boolean isCaseSensitive() {
return true;
}

Expand All @@ -68,16 +71,17 @@ public boolean isCaseSensitive()
* @param snapshotTime Datetime that will be used to fetch the most recent snapshot as of this time
* @param residualFilterMode Controls how residual filters are handled. When filtering on non-partition
* columns, residual rows may be returned that need row-level filtering.
* @param columnsFilter Column filter used to project the table scan. If null, all columns are read.
* @return a list of data file paths
*/
public List<String> extractSnapshotDataFiles(
String tableNamespace,
String tableName,
IcebergFilter icebergFilter,
DateTime snapshotTime,
ResidualFilterMode residualFilterMode
)
{
String tableNamespace,
String tableName,
IcebergFilter icebergFilter,
DateTime snapshotTime,
ResidualFilterMode residualFilterMode,
@Nullable ColumnsFilter columnsFilter
) {
Catalog catalog = retrieveCatalog();
Namespace namespace = Namespace.of(tableNamespace);
String tableIdentifier = tableNamespace + "." + tableName;
Expand All @@ -88,15 +92,27 @@ public List<String> extractSnapshotDataFiles(
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream()
.filter(tableId -> tableId.toString().equals(tableIdentifier))
.findFirst()
.orElseThrow(() -> new IAE(
" Couldn't retrieve table identifier for '%s'. Please verify that the table exists in the given catalog",
tableIdentifier
));
.filter(tableId -> tableId.toString().equals(tableIdentifier))
.findFirst()
.orElseThrow(() -> new IAE(
" Couldn't retrieve table identifier for '%s'. Please verify that the table exists in the given catalog",
tableIdentifier
));

long start = System.currentTimeMillis();
TableScan tableScan = catalog.loadTable(icebergTableIdentifier).newScan();
Table table = catalog.loadTable(icebergTableIdentifier);
TableScan tableScan = table.newScan();

if (columnsFilter != null) {
List<String> projectedColumns = table
.schema()
.columns()
.stream()
.map(Types.NestedField::name)
.filter(columnsFilter::apply)
.collect(Collectors.toList());
tableScan = tableScan.select(new ArrayList<>(projectedColumns));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Projection is discarded before data is read

This selects projected columns on the Iceberg TableScan, but the method only returns task.file().location() afterward. The projected FileScanTask schema is discarded, and IcebergInputSource builds the warehouse delegate from the same raw file paths, so Druid's Parquet reader still opens the original files without the Iceberg projection. The new test also manually projects with Parquet.read(...).project(...), so it would pass even if this select had no effect. To make column projection work, the projected schema/split information needs to be carried into the reader path or pruning needs to happen in the delegate input format.

}

if (icebergFilter != null) {
tableScan = icebergFilter.filter(tableScan);
Expand Down Expand Up @@ -124,17 +140,17 @@ public List<String> extractSnapshotDataFiles(
// Handle residual filter based on mode
if (detectedResidual != null) {
String message = StringUtils.format(
"Iceberg filter produced residual expression that requires row-level filtering. "
+ "This typically means the filter is on a non-partition column. "
+ "Residual rows may be ingested unless filtered by transformSpec. "
+ "Residual filter: [%s]",
detectedResidual
"Iceberg filter produced residual expression that requires row-level filtering. "
+ "This typically means the filter is on a non-partition column. "
+ "Residual rows may be ingested unless filtered by transformSpec. "
+ "Residual filter: [%s]",
detectedResidual
);

if (residualFilterMode == ResidualFilterMode.FAIL) {
throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(message);
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(message);
}
log.warn(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
Expand Down Expand Up @@ -114,7 +115,7 @@ public InputSourceReader reader(
)
{
if (!isLoaded) {
retrieveIcebergDatafiles();
retrieveIcebergDatafiles(inputRowSchema);
}
return getDelegateInputSource().reader(inputRowSchema, inputFormat, temporaryDirectory);
}
Expand All @@ -126,7 +127,7 @@ public Stream<InputSplit<List<String>>> createSplits(
) throws IOException
{
if (!isLoaded) {
retrieveIcebergDatafiles();
retrieveIcebergDatafiles(null);
}
return getDelegateInputSource().createSplits(inputFormat, splitHintSpec);
}
Expand All @@ -135,7 +136,7 @@ public Stream<InputSplit<List<String>>> createSplits(
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
{
if (!isLoaded) {
retrieveIcebergDatafiles();
retrieveIcebergDatafiles(null);
}
return getDelegateInputSource().estimateNumSplits(inputFormat, splitHintSpec);
}
Expand Down Expand Up @@ -194,14 +195,21 @@ public SplittableInputSource getDelegateInputSource()
return delegateInputSource;
}

protected void retrieveIcebergDatafiles()
private ColumnsFilter getColumnsFilter(@Nullable final InputRowSchema inputRowSchema)
{
List<String> snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
return inputRowSchema == null ? null : inputRowSchema.getColumnsFilter();
}

protected void retrieveIcebergDatafiles(@Nullable final InputRowSchema inputRowSchema)
{
final ColumnsFilter columnsFilter = getColumnsFilter(inputRowSchema);
final List<String> snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
getNamespace(),
getTableName(),
getIcebergFilter(),
getSnapshotTime(),
getResidualFilterMode()
getResidualFilterMode(),
columnsFilter
);
if (snapshotDataFiles.isEmpty()) {
delegateInputSource = new EmptyInputSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.LocalInputSourceFactory;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -55,6 +61,7 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -126,6 +133,91 @@ public void testInputSource() throws IOException
}
}

// Reads the table without InputRowSchema and then once per column using InputRowSchema column filters.
@Test
public void testReadTableWithAndWithoutInputRowSchema() throws IOException
{
// Read the table with no InputRowSchema to preserve the legacy full-table scan behavior.
final IcebergInputSource inputSourceWithoutSchema = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceFactory(),
null,
null
);

inputSourceWithoutSchema.retrieveIcebergDatafiles(null);

final List<File> fullScanFiles = ((LocalInputSource) inputSourceWithoutSchema.getDelegateInputSource())
.getFiles();
Assert.assertEquals(1, fullScanFiles.size());

try (final CloseableIterable<Record> datafileReader = Parquet.read(Files.localInput(fullScanFiles.get(0)))
.project(tableSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(
tableSchema,
fileSchema
))
.build()) {
for (Record record : datafileReader) {
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
}

// Read the table once per column using InputRowSchema column filters to verify single-column projections.
for (final Types.NestedField column : tableSchema.columns()) {
final IcebergInputSource inputSourceWithSchema = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceFactory(),
null,
null
);
final InputRowSchema inputRowSchema = createInputRowSchema(
ColumnsFilter.inclusionBased(ImmutableSet.of(column.name()))
);

inputSourceWithSchema.retrieveIcebergDatafiles(inputRowSchema);

final List<File> projectedFiles = ((LocalInputSource) inputSourceWithSchema.getDelegateInputSource())
.getFiles();
Assert.assertEquals(1, projectedFiles.size());

final Schema projectedSchema = new Schema(column);
try (final CloseableIterable<Record> datafileReader = Parquet.read(Files.localInput(projectedFiles.get(0)))
.project(projectedSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(
projectedSchema,
fileSchema
))
.build()) {
for (Record record : datafileReader) {
Assert.assertEquals(1, record.size());
Assert.assertEquals(tableData.get(column.name()), record.get(0));
}
}
}
}

private InputRowSchema createInputRowSchema(final ColumnsFilter columnsFilter)
{
return new InputRowSchema(
new TimestampSpec("id", "auto", null),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("id"),
new StringDimensionSchema("name")
)
),
columnsFilter
);
}

@Test
public void testInputSourceWithEmptySource() throws IOException
{
Expand Down
Loading