Skip to content

Commit

Permalink
[HUDI-8215] Support composite compaction strategy (apache#11963)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheR1sing3un authored Sep 23, 2024
1 parent 1d1dc5d commit e4cc7f0
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Compaction strategy decides which file groups are picked up for "
+ "compaction during each compaction run. By default. Hudi picks the log file "
+ "with most accumulated unmerged data");
+ "with most accumulated unmerged data. The strategy can be composed with multiple strategies by concatenating the class names with ','.");

public static final ConfigProperty<String> TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
.key("hoodie.compaction.daybased.target.partitions")
Expand Down Expand Up @@ -408,8 +408,15 @@ public Builder approxRecordSize(int recordSizeEstimate) {
return this;
}

public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategy.getClass().getName());
public Builder withCompactionStrategy(CompactionStrategy... compactionStrategies) {
StringBuilder compactionStrategyBuilder = new StringBuilder();
for (CompactionStrategy compactionStrategy : compactionStrategies) {
compactionStrategyBuilder.append(compactionStrategy.getClass().getName()).append(",");
}
if (compactionStrategyBuilder.length() > 0) {
compactionStrategyBuilder.deleteCharAt(compactionStrategyBuilder.length() - 1);
}
compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategyBuilder.toString());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.CompositeCompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.orc.CompressionKind;
Expand Down Expand Up @@ -1655,7 +1656,11 @@ public int getInlineCompactDeltaSecondsMax() {
}

public CompactionStrategy getCompactionStrategy() {
return ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY));
String compactionStrategiesStr = getString(HoodieCompactionConfig.COMPACTION_STRATEGY);
String[] compactionStrategyArr = compactionStrategiesStr.split(",");
List<CompactionStrategy> compactionStrategies = Arrays.stream(compactionStrategyArr)
.map(className -> (CompactionStrategy) ReflectionUtils.loadClass(className)).collect(Collectors.toList());
return compactionStrategies.size() == 1 ? compactionStrategies.get(0) : new CompositeCompactionStrategy(compactionStrategies);
}

public Long getTargetIOPerCompactionInMB() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
int allPartitionSize = partitionPaths.size();

// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths);
partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
writeConfig.getCompactionStrategy().getClass().getSimpleName(), partitionPaths.size(), allPartitionSize);
if (partitionPaths.isEmpty()) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr

protected abstract boolean filterLogCompactionOperations();

protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
protected List<String> filterPartitionPathsByStrategy(List<String> partitionPaths) {
return partitionPaths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,21 +41,25 @@ public class HoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I, K,

private static final Logger LOG = LoggerFactory.getLogger(HoodieCompactionPlanGenerator.class);

private final CompactionStrategy compactionStrategy;

public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
this.compactionStrategy = writeConfig.getCompactionStrategy();
LOG.info("Compaction Strategy used is: " + compactionStrategy.toString());
}

@Override
protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations) {
// Filter the compactions with the passed in filter. This lets us choose most effective
// compactions only
return writeConfig.getCompactionStrategy().generateCompactionPlan(writeConfig, operations,
return compactionStrategy.generateCompactionPlan(writeConfig, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
}

@Override
protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
return writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitionPaths);
protected List<String> filterPartitionPathsByStrategy(List<String> partitionPaths) {
return compactionStrategy.filterPartitionPaths(writeConfig, partitionPaths);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.compact.strategy;

import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.config.HoodieWriteConfig;

import java.util.List;

/**
* CompositeCompactionStrategy chains multiple compaction strategies together.
* Multiple strategies perform like a pipeline with `and` condition instead of `or`.
* The order of the strategies in the chain is important as the output of one strategy is passed as input to the next.
*/
public class CompositeCompactionStrategy extends CompactionStrategy {

private List<CompactionStrategy> strategies;

public CompositeCompactionStrategy(List<CompactionStrategy> strategies) {
this.strategies = strategies;
}

@Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
List<HoodieCompactionOperation> finalOperations = operations;
for (CompactionStrategy strategy : strategies) {
finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, pendingCompactionPlans);
}
return finalOperations;
}

@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
List<String> finalPartitionPaths = allPartitionPaths;
for (CompactionStrategy strategy : strategies) {
finalPartitionPaths = strategy.filterPartitionPaths(writeConfig, finalPartitionPaths);
}
return finalPartitionPaths;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("CompactionStrategyChain [");
for (CompactionStrategy strategy : strategies) {
builder.append(strategy.getClass());
builder.append(" ===> ");
}
builder.append("]");
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testUnBounded() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());
assertEquals(operations, returned, "UnBounded should not re-order or filter");
}

Expand All @@ -79,7 +79,7 @@ public void testBoundedIOSimple() {
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(), "BoundedIOCompaction should have resulted in fewer compactions");
assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted in 2 compactions being chosen");
Expand All @@ -103,7 +103,7 @@ public void testLogFileSizeCompactionSimple() {
.withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions");
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testDayBasedCompactionSimple() {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build();

List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions");

List<HoodieCompactionOperation> operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions);
Expand Down Expand Up @@ -182,11 +182,11 @@ public void testDayBasedCompactionWithIOBounded() {
.build())
.build();

List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions");

List<HoodieCompactionOperation> operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertEquals(1, returned.size(),
"DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer compactions");
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testBoundedPartitionAwareCompactionSimple() {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions");
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testUnboundedPartitionAwareCompactionSimple() {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"UnBoundedPartitionAwareCompactionStrategy should not include last "
Expand All @@ -312,7 +312,7 @@ public void testLogFileLengthBasedCompactionStrategy() {
.withCompactionLogFileNumThreshold(2).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions");
Expand All @@ -331,8 +331,43 @@ public void testLogFileLengthBasedCompactionStrategy() {
// TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80
assertEquals(1594, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total IO of 1594 MB");
}

@Test
public void testCompositeCompactionStrategy() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(new NumStrategy(), new PrefixStrategy()).withTargetIOPerCompactionInMB(1024)
.withCompactionLogFileNumThreshold(2).build()).build();
List<String> allPartitionPaths = Arrays.asList(
"2017/01/01", "2018/01/02", "2017/02/01"
);
List<String> returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths);
// filter by num first and then filter by prefix
assertEquals(1, returned.size());
assertEquals("2017/01/01", returned.get(0));

writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(new PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024)
.withCompactionLogFileNumThreshold(2).build()).build();
returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths);
// filter by prefix first and then filter by num
assertEquals(2, returned.size());
assertEquals("2017/01/01", returned.get(0));
assertEquals("2017/02/01", returned.get(1));
}

public static class NumStrategy extends CompactionStrategy {
@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
return allPartitionPaths.stream().limit(2).collect(Collectors.toList());
}
}

public static class PrefixStrategy extends CompactionStrategy {
@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
return allPartitionPaths.stream().filter(s -> s.startsWith("2017")).collect(Collectors.toList());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -277,9 +278,10 @@ public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig,
*/
public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
Option<CompactionStrategy> strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass);
HoodieCompactionConfig compactionConfig = strategyOpt
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.withCompactionStrategy(strategy).build())
.orElseGet(() -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
Expand Down Expand Up @@ -392,9 +393,10 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas
*/
public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
Option<CompactionStrategy> strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass);
HoodieCompactionConfig compactionConfig = strategyOpt
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.withCompactionStrategy(strategy).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
Expand Down

0 comments on commit e4cc7f0

Please sign in to comment.