Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class RewriteManifestsSparkAction
.build();

private static final String DATA_FILE_PARTITION_COLUMN_NAME = "data_file.partition";
private static final String DATA_FILE_PATH_COLUMN_NAME = "data_file.file_path";

private final Table table;
private final int formatVersion;
Expand Down Expand Up @@ -329,7 +330,10 @@ private Column sortColumn() {
}

private Dataset<Row> repartitionAndSort(Dataset<Row> df, Column col, int numPartitions) {
return df.repartitionByRange(numPartitions, col).sortWithinPartitions(col);
// add xxhash64 of file path for range partition to make sure we have enough parallelism
return df.repartitionByRange(
numPartitions, col, functions.xxhash64(df.col(DATA_FILE_PATH_COLUMN_NAME)))
.sortWithinPartitions(col);
}

private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
Expand Down Expand Up @@ -82,6 +84,8 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -1124,6 +1128,70 @@ public void testRewriteManifestsAfterUpgradeToV3() throws IOException {
}
}

@TestTemplate
public void testRewriteManifestsParallelism() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

StructLike partition = TestHelpers.Row.of("AAAA");
List<DataFile> dataFiles = Lists.newArrayList();
for (int i = 0; i < 10000; i++) {
dataFiles.add(FileGenerationUtil.generateDataFile(table, partition));
}
ManifestFile appendManifest = writeManifest(table, dataFiles);
table.newFastAppend().appendManifest(appendManifest).commit();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).as("Should have 1 manifest before rewrite").hasSize(1);

table
.updateProperties()
.set(
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
String.valueOf((manifests.get(0).length() + 1) / 2))
.commit();

// track reduce tasks that actually receive data
Map<Integer, AtomicInteger> nonEmptyTasksByStage = Maps.newConcurrentMap();
SparkListener listener =
new SparkListener() {
@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
if (taskEnd.taskMetrics() == null
|| taskEnd.taskMetrics().shuffleReadMetrics() == null) {
return;
}
if (taskEnd.taskMetrics().shuffleReadMetrics().recordsRead() > 0) {
nonEmptyTasksByStage
.computeIfAbsent(taskEnd.stageId(), k -> new AtomicInteger(0))
.incrementAndGet();
}
}
};

spark.sparkContext().addSparkListener(listener);
try {
SparkActions.get()
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
spark.sparkContext().listenerBus().waitUntilEmpty();
} finally {
spark.sparkContext().removeSparkListener(listener);
}

// find the last reduce stage
Optional<Integer> id = nonEmptyTasksByStage.keySet().stream().max(Integer::compareTo);
assertThat(id).as("Expecting the job to have at least one reduce stage").isPresent();
int nonEmptyReduceTasks = nonEmptyTasksByStage.get(id.get()).get();

assertThat(nonEmptyReduceTasks).as("Both reducers should receive data").isEqualTo(2);
}

private List<ThreeColumnRecord> actualRecords() {
return spark
.read()
Expand Down