Skip to content

Commit

Permalink
[HUDI-8112] Fix TestHoodieActiveTimeline#testTimelineGetOperations te…
Browse files Browse the repository at this point in the history
…st logic error (apache#11814)
  • Loading branch information
usberkeley authored Aug 23, 2024
1 parent 5b31ef2 commit e1f70fd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public interface HoodieTimeline extends Serializable {
String SCHEMA_COMMIT_ACTION = "schemacommit";
String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION, REPLACE_COMMIT_ACTION, CLUSTERING_ACTION, INDEXING_ACTION};
COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION, CLUSTERING_ACTION, INDEXING_ACTION};

String COMMIT_EXTENSION = "." + COMMIT_ACTION;
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testTimelineWithSavepointAndHoles() {
@Test
public void testTimelineGetOperations() {
List<HoodieInstant> allInstants = getAllInstants();
Supplier<Stream<HoodieInstant>> sup = allInstants::stream;
Supplier<Stream<HoodieInstant>> allInstantsSup = allInstants::stream;
timeline = new HoodieActiveTimeline(metaClient, true);
timeline.setInstants(allInstants);

Expand All @@ -346,31 +346,38 @@ public void testTimelineGetOperations() {
* @param actions The actions that should be present in the timeline being checked
*/
BiConsumer<HoodieTimeline, Set<String>> checkTimeline = (HoodieTimeline timeline, Set<String> actions) -> {
sup.get().filter(i -> actions.contains(i.getAction())).forEach(i -> assertTrue(timeline.containsInstant(i)));
sup.get().filter(i -> !actions.contains(i.getAction())).forEach(i -> assertFalse(timeline.containsInstant(i)));
List<HoodieInstant> expectedInstants = allInstantsSup.get().filter(i -> actions.contains(i.getAction())).collect(Collectors.toList());
List<HoodieInstant> unexpectedInstants = allInstantsSup.get().filter(i -> !actions.contains(i.getAction())).collect(Collectors.toList());

// The test set is the instant of all actions and states, so the instants returned by the timeline get operation cannot be empty.
// At the same time, it helps to detect errors such as incomplete test sets
assertFalse(expectedInstants.isEmpty());
expectedInstants.forEach(i -> assertTrue(timeline.containsInstant(i)));
unexpectedInstants.forEach(i -> assertFalse(timeline.containsInstant(i)));
};

// Test that various types of getXXX operations from HoodieActiveTimeline
// return the correct set of Instant
checkTimeline.accept(timeline.getCommitsTimeline(), CollectionUtils.createSet(
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.CLUSTERING_ACTION));
checkTimeline.accept(timeline.getWriteTimeline(), CollectionUtils.createSet(
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.CLUSTERING_ACTION));
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION,
HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.CLUSTERING_ACTION));
checkTimeline.accept(timeline.getCommitAndReplaceTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.CLUSTERING_ACTION));
checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION));
checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION));
checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION));
checkTimeline.accept(timeline.getRestoreTimeline(), Collections.singleton(HoodieTimeline.RESTORE_ACTION));
checkTimeline.accept(timeline.getSavePointTimeline(), Collections.singleton(HoodieTimeline.SAVEPOINT_ACTION));
checkTimeline.accept(timeline.getAllCommitsTimeline(), CollectionUtils.createSet(
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION,
HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.CLUSTERING_ACTION, HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.INDEXING_ACTION));

// Get some random Instants
Random rand = new Random();
Set<String> randomInstants = sup.get().filter(i -> rand.nextBoolean())
Set<String> randomActions = allInstantsSup.get().filter(i -> rand.nextBoolean())
.map(HoodieInstant::getAction).collect(Collectors.toSet());
checkTimeline.accept(timeline.getTimelineOfActions(randomInstants), randomInstants);
checkTimeline.accept(timeline.getTimelineOfActions(randomActions), randomActions);
}

@Test
Expand Down Expand Up @@ -738,21 +745,18 @@ private List<HoodieInstant> getAllInstants() {
// Following are not valid combinations of actions and state so we should
// not be generating them.
if (state == State.REQUESTED) {
if (action.equals(HoodieTimeline.SAVEPOINT_ACTION) || action.equals(HoodieTimeline.RESTORE_ACTION)
|| action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
if (action.equals(HoodieTimeline.SAVEPOINT_ACTION) || action.equals(HoodieTimeline.RESTORE_ACTION)) {
continue;
}
}
if (state == State.INFLIGHT && action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
continue;
}
if (state == State.COMPLETED && action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
continue;
}
// Compaction complete is called commit complete
if (state == State.COMPLETED && action.equals(HoodieTimeline.COMPACTION_ACTION)) {
action = HoodieTimeline.COMMIT_ACTION;
}
// LogCompaction complete is called deltacommit complete
if (state == State.COMPLETED && action.equals(HoodieTimeline.LOG_COMPACTION_ACTION)) {
action = HoodieTimeline.DELTA_COMMIT_ACTION;
}
// Cluster complete is called replacecommit complete
if (state == State.COMPLETED && action.equals(HoodieTimeline.CLUSTERING_ACTION)) {
action = HoodieTimeline.REPLACE_COMMIT_ACTION;
Expand Down

0 comments on commit e1f70fd

Please sign in to comment.