Skip to content

Commit

Permalink
[HUDI-8103] Introduce Table Write Version and version table configs (a…
Browse files Browse the repository at this point in the history
…pache#11866)

*  Introduce Table Write Version and version table configs

 - Cleanup HoodieTableMetaClient static helper methods
 - Streamline creation of meta client and table initialization across the tests

* Wiring in the table version in all write paths

 - Add hoodie.write.auto.upgrade write config to control automatic upgrade behavior
 - Only table versions 6,8 are supported as valid value for hoodie.write.table.version
 - Handle rollback done in startCommit* methods and as part of upgrade, w.r.t table version mismatches
 - Validate that write and table versions match, on all write operations and tooling.
 - Cleanup HoodieBaseClient's children around createTable
 - Chase down adhoc creation of HoodieTable, without creating writeClient, handle case-by-case
 - Renaming few methods that are out of convention

* Adding code to validate table configs based on table versions

 - Configs set, that belong to higher table versions are skipped
 - Tests across all major write paths, to ensure table creation is supported at given version
 - Tests covering table config validation and other paths.

* Match MT writer version with data table writer version

 - Fix tests in Flink compaction by updating table version for metadata table as well
  • Loading branch information
vinothchandar authored Sep 23, 2024
1 parent e4cc7f0 commit 5a171e9
Show file tree
Hide file tree
Showing 107 changed files with 1,199 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void setUp() throws Exception {
fileSystem = hiveSyncConfig.getHadoopFileSystem();
fileSystem.mkdirs(new Path(tablePath));
StorageConfiguration<?> configuration = HadoopFSUtils.getStorageConf(new Configuration());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient metaClient = HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void createHoodieTable() throws IOException {
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
metaClient = HoodieTableMetaClient.withPropertyBuilder()
metaClient = HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ protected static void clean(JavaSparkContext jsc, String basePath, String propsF

protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) {
try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false)) {

HoodieWriteConfig config = client.getConfig();
HoodieEngineContext context = client.getEngineContext();
HoodieSparkTable table = HoodieSparkTable.create(config, context);
client.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), config);
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -112,6 +113,16 @@ public String connect(
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " loaded";
}

public String createTable(
final String path,
final String name,
final String tableTypeStr,
String archiveFolder,
Integer layoutVersion,
final String payloadClass) throws IOException {
return createTable(path, name, tableTypeStr, archiveFolder, layoutVersion, HoodieTableVersion.current().versionCode(), payloadClass);
}

/**
* Create a Hoodie Table if it does not exist.
*
Expand All @@ -130,6 +141,8 @@ public String createTable(
defaultValue = ShellOption.NULL) String archiveFolder,
@ShellOption(value = {"--layoutVersion"}, help = "Specific Layout Version to use",
defaultValue = ShellOption.NULL) Integer layoutVersion,
@ShellOption(value = {"--tableVersion"}, help = "Specific table Version to create table as",
defaultValue = ShellOption.NULL) Integer tableVersion,
@ShellOption(value = {"--payloadClass"}, defaultValue = "org.apache.hudi.common.model.HoodieAvroPayload",
help = "Payload Class") final String payloadClass) throws IOException {

Expand All @@ -149,12 +162,13 @@ public String createTable(
throw new IllegalStateException("Table already existing in path : " + path);
}

HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient.newTableBuilder()
.setTableType(tableTypeStr)
.setTableName(name)
.setArchiveLogFolder(archiveFolder)
.setPayloadClassName(payloadClass)
.setTimelineLayoutVersion(layoutVersion)
.setTableVersion(tableVersion == null ? HoodieTableVersion.current().versionCode() : tableVersion)
.initTable(HoodieCLI.conf.newInstance(), path);
// Now connect to ensure loading works
return connect(path, layoutVersion, false, 0, 0, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static String getHoodieTableVersionName(String versionOption, boolean overrideWi

try {
int versionCode = Integer.parseInt(versionOption);
return HoodieTableVersion.versionFromCode(versionCode).name();
return HoodieTableVersion.fromVersionCode(versionCode).name();
} catch (NumberFormatException e) {
// The version option from the CLI is not a number, returns the original String
return versionOption;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void init() throws IOException {

@Test
public void testMetadataDelete() throws Exception {
HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
.setTableName(tableName())
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testShowFailedCommits() {
@Test
public void testRepairDeprecatedPartition() throws IOException {
tablePath = tablePath + "/repair_test/";
HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
.setTableName(tableName())
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testRepairDeprecatedPartition() throws IOException {
@Test
public void testRenamePartition() throws IOException {
tablePath = tablePath + "/rename_partition_test/";
HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
.setTableName(tableName())
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimeGeneratorType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -158,13 +159,14 @@ public void testDefaultCreate() {
public void testCreateWithSpecifiedValues() {
// Test create with specified values
Object result = shell.evaluate(() -> "create --path " + tablePath + " --tableName " + tableName
+ " --tableType MERGE_ON_READ --archiveLogFolder archive");
+ " --tableType MERGE_ON_READ --archiveLogFolder archive --tableVersion 6");
assertTrue(ShellEvaluationResultUtil.isSuccess(result));
assertEquals("Metadata for table " + tableName + " loaded", result.toString());
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
assertEquals(metaPath + StoragePath.SEPARATOR + "archive", client.getArchivePath());
assertEquals(tablePath, client.getBasePath().toString());
assertEquals(metaPath, client.getMetaPath().toString());
assertEquals(HoodieTableVersion.SIX, client.getTableConfig().getTableVersion());
assertEquals(HoodieTableType.MERGE_ON_READ, client.getTableType());
}

Expand Down Expand Up @@ -237,7 +239,8 @@ public void testFetchTableSchema() throws Exception {
HoodieCLI.conf = storageConf();
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

String schemaStr = "{\n"
+ " \"type\" : \"record\",\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -77,7 +78,8 @@ public void init() throws IOException {
// Create table and connect
new TableCommand().createTable(
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

initMetaClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestTable;
Expand Down Expand Up @@ -76,7 +77,8 @@ public void init() throws IOException {
// Create table and connect
new TableCommand().createTable(
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

initMetaClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -83,7 +84,8 @@ public void init() throws IOException {
// Create table and connect
new TableCommand().createTable(
basePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

initMetaClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -59,7 +60,8 @@ public void init() throws IOException {
// Create table and connect
new TableCommand().createTable(
tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
Expand Down Expand Up @@ -101,7 +102,8 @@ public void init() throws Exception {
// Create cow table and connect
new TableCommand().createTable(
cowTablePath, "cow_table", HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

Expand All @@ -124,7 +126,8 @@ public void init() throws Exception {
// Create mor table and connect
new TableCommand().createTable(
morTablePath, "mor_table", HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");
HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

morTable.addDeltaCommit("20160401010101");
Expand All @@ -146,7 +149,8 @@ public void init() throws Exception {
// Create cow table and connect
new TableCommand().createTable(
cowNonPartitionedTablePath, "cow_table_non_partitioned", HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -71,7 +72,8 @@ public void init() throws IOException {
// Create table and connect
new TableCommand().createTable(
tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
"", TimelineLayoutVersion.VERSION_1, HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CommonClientUtils;

import com.codahale.metrics.Timer;
import org.slf4j.Logger;
Expand All @@ -79,6 +80,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -675,7 +677,22 @@ protected Option<String> scheduleTableServiceInternal(String instantTime, Option
return option;
}

protected abstract HoodieTable<?, I, ?, T> createTable(HoodieWriteConfig config, StorageConfiguration<?> storageConf);
protected HoodieTable createTableAndValidate(HoodieWriteConfig config,
BiFunction<HoodieWriteConfig,
HoodieEngineContext, HoodieTable> createTableFn,
boolean skipValidation) {
HoodieTable table = createTableFn.apply(config, context);
if (!skipValidation) {
CommonClientUtils.validateTableVersion(table.getMetaClient().getTableConfig(), config);
}
return table;
}

protected HoodieTable<?, I, ?, T> createTable(HoodieWriteConfig config, StorageConfiguration<?> storageConf) {
return createTable(config, storageConf, false);
}

protected abstract HoodieTable<?, I, ?, T> createTable(HoodieWriteConfig config, StorageConfiguration<?> storageConf, boolean skipValidation);

/**
* Executes a clustering plan on a table, serially before or after an insert/upsert action.
Expand Down Expand Up @@ -942,6 +959,10 @@ protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo
}

protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
rollbackFailedWrites(instantsToRollback, skipLocking, false);
}

protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking, boolean skipVersionCheck) {
// sort in reverse order of commit times
LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet()
.stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
Expand All @@ -956,7 +977,7 @@ protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo
HeartbeatUtils.deleteHeartbeatFile(storage, basePath, entry.getKey(), config);
break;
} else {
rollback(entry.getKey(), entry.getValue(), skipLocking);
rollback(entry.getKey(), entry.getValue(), skipLocking, skipVersionCheck);
HeartbeatUtils.deleteHeartbeatFile(storage, basePath, entry.getKey(), config);
}
}
Expand Down Expand Up @@ -1031,10 +1052,11 @@ private List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
*/
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo,
boolean skipLocking, boolean skipVersionCheck) throws HoodieRollbackException {
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp())
.orElseGet(() -> createNewInstantTime(!skipLocking));
return rollback(commitInstantTime, pendingRollbackInfo, rollbackInstantTime, skipLocking);
return rollback(commitInstantTime, pendingRollbackInfo, rollbackInstantTime, skipLocking, skipVersionCheck);
}

/**
Expand All @@ -1047,11 +1069,11 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
*/
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String rollbackInstantTime,
boolean skipLocking) throws HoodieRollbackException {
boolean skipLocking, boolean skipVersionCheck) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf);
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
Expand Down
Loading

0 comments on commit 5a171e9

Please sign in to comment.