Skip to content

fix: initialize lastCompletedTxnAndMetadata before recommitInstant in Flink StreamWriteOperatorCoordinator#19023

Open
suryaprasanna wants to merge 5 commits into
apache:masterfrom
suryaprasanna:fix-recommit-occ-conflict
Open

fix: initialize lastCompletedTxnAndMetadata before recommitInstant in Flink StreamWriteOperatorCoordinator#19023
suryaprasanna wants to merge 5 commits into
apache:masterfrom
suryaprasanna:fix-recommit-occ-conflict

Conversation

@suryaprasanna

Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

During Flink job restart, StreamWriteOperatorCoordinator.restoreEvents() calls recommitInstant()
without initializing lastCompletedTxnAndMetadata. This causes OCC conflict resolution to use
INIT_INSTANT_TS as the baseline, treating all completed instants as conflict candidates. Since
streaming upserts write to the same file groups, this always throws a false
HoodieWriteConflictException, preventing the job from restarting.

Summary and Changelog

Added preTxnForRecommit() to initialize OCC conflict resolution state before recommitting inflight
instants during Flink job recovery. The method finds the last completed instant whose requested time
and completion time are both before the inflight instant, so only genuinely concurrent commits are
checked for conflicts.

Impact

No public API or config changes. Fixes a restart failure for Flink streaming jobs with OCC enabled.

Risk Level

low — the change only adds proper initialization of an existing field that was previously left
uninitialized in the recommit path. Normal (non-recommit) write paths are unaffected.

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

During Flink job restart, StreamWriteOperatorCoordinator.restoreEvents()
calls recommitInstant() without initializing lastCompletedTxnAndMetadata.
This causes OCC conflict resolution to use INIT_INSTANT_TS as baseline,
checking against ALL completed instants on the timeline instead of only
those that completed concurrently. Since streaming upserts repeatedly
write to the same file groups, this always finds overlapping file IDs
and throws HoodieWriteConflictException.

The fix initializes the transaction state before recommitting by finding
the last completed instant whose requested time and completion time are
both before the inflight instant's requested time. This ensures conflict
resolution only checks against genuinely concurrent commits, while still
detecting real conflicts from other writers during downtime.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR fixes a real bug where Flink job restart could fail with false OCC conflict exceptions by initializing lastCompletedTxnAndMetadata before the recommit path. One question worth double-checking inline around the comparator choice that drives the picked "last successful" instant. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.

.findInstantsBefore(currentInstantTime)
.getInstantsAsStream()
.filter(instant -> instant.getCompletionTime() != null
&& compareTimestamps(instant.getCompletionTime(), LESSER_THAN, currentInstantTime))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The V8+ SimpleConcurrentFileWritesConflictResolutionStrategy uses findInstantsAfter(lastSuccessful.requestedTime()) as its candidate-set cutoff, so picking max-by-completion-time can still leave already-completed instants in the candidate set. E.g. slow streaming commit A (req=T1, comp=T5) and fast clustering B (req=T2>T1, comp=T3<T5) both predate currentInstantTime — this picks A, and findInstantsAfter(T1) still includes B, potentially re-triggering the same false-conflict the PR is trying to fix. Would Comparator.comparing(HoodieInstant::requestedTime) be safer here (also matching the existing single-arg lastInstant() semantics)?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wombatu-kun thanks for the review, responded for this in the below comment please check and let me know your thoughts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed -- switched to requestedTime for the max selection. The completionTime filter on the preceding line is retained to ensure we only consider fully completed instants.

@github-actions github-actions Bot added the size:S PR with lines of changes in (10, 100] label Jun 16, 2026

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

The only change since the last review is removing a blank line between import groups in TransactionUtils.java — a trivial whitespace cleanup with no functional impact. The prior finding about the Comparator.comparingLong(... getCompletionTime()) choice (versus Comparator.comparing(HoodieInstant::requestedTime)) interacting with SimpleConcurrentFileWritesConflictResolutionStrategy.findInstantsAfter(lastSuccessful.requestedTime()) remains open — no code change addresses it and no author reply justifies it. Worth a Hudi committer or PMC member taking another look at that thread before merge.

* @param currentInstantTime the requested time of the current inflight instant
* @return the last completed instant before the given instant, with its extra metadata
*/
public static Option<Pair<HoodieInstant, Map<String, String>>> getLastCompletedTxnInstantAndMetadata(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test accompanies this change, though the PR checklist marks tests as added. getLastCompletedTxnInstantAndMetadata is directly unit-testable: a case seeding the timeline with interleaved completions - a slow commit requested early but completed late, plus a faster commit requested later but completed earlier, both before currentInstantTime - would pin down the baseline-selection behavior and objectively settle the comparator question raised in the open review thread on this method. An end-to-end Flink restart/recommit case over the OCC path would also guard against regressing the original false-conflict bug.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, added three unit tests in TestTransactionUtils covering interleaved completions, the completionTime exclusion filter, and the empty-result edge case. Also added testCommittingMultipleInstantsWithOCC in TestWriteMergeOnRead as the end-to-end guard.

.getInstantsAsStream()
.filter(instant -> instant.getCompletionTime() != null
&& compareTimestamps(instant.getCompletionTime(), LESSER_THAN, currentInstantTime))
.max(Comparator.comparing(HoodieInstant::getCompletionTime)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably choose the max requestedTime, not max completionTime. The OCC strategy later uses findInstantsAfter(lastSuccessful.requestedTime()) as the candidate cutoff, so choosing an older slow commit by completion time can still leave newer, already-completed-before-current instants in the candidate set. For example, if A has req=T1/complete=T5 and B has req=T2/complete=T3, and both complete before current=T6, this picks A and conflict resolution still checks B even though B was not concurrent with the recommit. Picking the latest requested instant that also satisfies completionTime < currentInstantTime seems to match the cutoff semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel OCC strategy is unnecessarily complicating the conflict resolution behavior. During conflict resolution, we should only care for completed commits that are committed after the current instant started, apart from that rest of the instants can be ignored. With that approach we can even remove the tracking of pendingInstants during the commit window and also timelineRefreshedWithinTransaction variables. So, I think we should make the change to fix it there as well. Let me know, if you think the same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks fine, let's give it a try.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the comparator: max requestedTime (among completed instants with completionTime < currentInstantTime) is the right choice for this PR. getCandidateInstantsV8AndAbove cuts the candidate set with findInstantsAfter(lastSuccessful.requestedTime()), and the pre-V8 branch (getCommitsCompletedSinceLastCommit) does the same, so keying the baseline on requestedTime keeps the two consistent and avoids leaving an already-completed-but-later-requested instant in the candidate set. It also matches the existing baseline path: preWrite uses the single-arg getLastCompletedTxnInstantAndMetadata, i.e. lastInstant() over the completed commits timeline.

On moving the fix into the OCC strategy (filter purely by completion time, drop pendingInflightAndRequestedInstants / timelineRefreshedWithinTransaction): the direction is cleaner, but resolveWriteConflictIfAny is the shared path for the Spark and Java writers too, and completion time is not reliably populated pre-V8 (the new method already has to null-guard getCompletionTime), so it would need version-gating plus broader cross-engine testing. My suggestion is to keep this PR to the minimal requestedTime fix and track the strategy rework as a separate PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough analysis. Fully aligned -- switched to max requestedTime and added unit tests with interleaved completion times to pin it down. Agree the broader strategy rework should be a separate PR.

// baseline: the last completed instant before this inflight instant was created.
// Without this, lastCompletedTxnAndMetadata is empty and conflict resolution checks
// against all completed instants on the timeline, causing false conflicts.
writeClient.preTxnForRecommit(tableState.operationType, this.metaClient, instant);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still uses the coordinator metaClient/timeline captured before the restore loop. restoreEvents() builds one completedTimeline and then recommits every buffered checkpoint against it; after the first recommit succeeds, this metaClient is not reloaded until after the whole loop. If there are multiple recovered checkpoint buffers, preTxnForRecommit(...) for the second instant may not see the first instant as completed, so OCC can treat that just-recommitted instant as a conflict candidate and fail with the same kind of false positive. Can we reload the active timeline before each recommit, or move the reload into this path before computing the completed timeline and initializing preTxnForRecommit?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 During my test, I allowed 3 times checkpoint failure

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Moved reloadActiveTimeline() inside the loop in restoreEvents() so each iteration sees the previously committed instant. Also found the same issue in commitInstants() (L611) -- when multiple buffered instants are committed in sequence during checkpointComplete, the second commit sees the first as a concurrent conflict. Fixed by refreshing the timeline and preTxn for subsequent instants (skipping the first to preserve multi-writer conflict detection).

@cshuo cshuo left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem can be reproduced by the following test case, can you add the case in TestWriteMergeOnRead?

@Test
  public void testCommittingMultipleInstantsWithOCC() throws Exception {
    // reset the config option
    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
    preparePipeline(conf)
        .consume(TestData.DATA_SET_INSERT)
        .checkpoint(1)
        .assertNextEvent(4, "par1,par2,par3,par4")
        .consume(TestData.DATA_SET_UPDATE_INSERT)
        .checkpoint(2)
        .assertNextEvent(4, "par1,par2,par3,par4")
        .checkpointComplete(2)
        .checkWrittenData(EXPECTED2)
        .end();
  }

@github-actions github-actions Bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Jun 17, 2026

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR initializes lastCompletedTxnAndMetadata properly before recommit and refreshes the timeline per-iteration, which fixes the single-inflight restart with OCC. One scenario worth double-checking around multi-inflight recovery is flagged inline. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One readability suggestion below — using a boolean[] as a mutable closure flag inside the stream pipeline reads as a workaround; a plain for-loop would express the "skip-first" intent more directly.

writeClient.getHeartbeatClient().start(instant);
}
// Initialize the transaction state so that OCC conflict resolution uses the correct
// baseline: the last completed instant before this inflight instant was created.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 @danny0405 could you sanity-check the multi-inflight restart scenario? If two inflight instants A (req=T1) and B (req=T2, T1<T2) are recommitted in order, when B reaches preTxnForRecommit, getLastCompletedTxnInstantAndMetadata excludes A (A.completionTime is now > T2), so the baseline falls back to some pre-T1 instant (or empty), and A then appears in B's OCC candidate set via findInstantsAfter(baseline.requestedTime()). With streaming upserts touching the same file groups, this would still throw the same false HoodieWriteConflictException the PR is trying to fix. Is multi-inflight recovery considered out of scope here, or am I missing a guard somewhere?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeline is now reloaded before each recommit in restoreEvents(), so when B is recommitted, A is already visible as completed on the refreshed timeline. preTxnForRecommit then correctly sets A as the baseline, and findInstantsAfter(A.requestedTime()) excludes A from B's candidate set.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still open for the restoreEvents path. A recommitted inflight instant does not retain an old completion time: transitionStateToComplete -> createCompleteFileInMetaPath in ActiveTimelineV2 stamps a fresh wall-clock completion time at recovery, so a sibling A recommitted before B always has completionTime(A) greater than B's requestedTime. The new two-arg getLastCompletedTxnInstantAndMetadata then drops A through its completionTime < currentInstantTime filter, so A is not selected as B's baseline; the baseline falls back to a pre-restart instant (or empty), and SimpleConcurrentFileWritesConflictResolutionStrategy's findInstantsAfter(baseline.requestedTime()) still leaves A in B's candidate set. With both writes touching the same file groups, 2+ buffered inflight instants throw the same false conflict; the per-iteration reload makes A visible but does not change baseline selection.

testCommittingMultipleInstantsWithOCC exercises commitInstants (checkpointComplete), not restoreEvents, so this path stays uncovered. A restoreEvents test with two buffered inflight instants under OCC would confirm it. The recommit baseline likely needs to pick the max-requestedTime completed instant below currentInstantTime without the completionTime upper bound (the lastInstant semantics the commitInstants preTxn path already uses), so the just-recommitted sibling is chosen as the baseline and then excluded by findInstantsAfter.

*/
private boolean commitInstants(long checkpointId) {
// use < instead of <= because the write metadata event sends the last known checkpoint id which is smaller than the current one.
boolean[] isFirstInstant = {true};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the boolean[] isFirstInstant = {true} trick to fake a mutable flag inside a map reads as a workaround. Could you collect the filtered entries first and use a plain for loop (or iterate with an index) so the "skip the refresh on the first instant" intent is expressed directly?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Refactored to collect the filtered entries first and use a plain for loop.

@suryaprasanna

Copy link
Copy Markdown
Contributor Author

The problem can be reproduced by the following test case, can you add the case in TestWriteMergeOnRead?

@Test
  public void testCommittingMultipleInstantsWithOCC() throws Exception {
    // reset the config option
    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
    preparePipeline(conf)
        .consume(TestData.DATA_SET_INSERT)
        .checkpoint(1)
        .assertNextEvent(4, "par1,par2,par3,par4")
        .consume(TestData.DATA_SET_UPDATE_INSERT)
        .checkpoint(2)
        .assertNextEvent(4, "par1,par2,par3,par4")
        .checkpointComplete(2)
        .checkWrittenData(EXPECTED2)
        .end();
  }

Added the test. While implementing it, discovered the same stale-baseline issue in commitInstants() (L611) -- when checkpointComplete(2) commits both checkpoints sequentially, the second sees the first as a concurrent conflict. Fixed that path too. All 50 tests in TestWriteMergeOnRead pass including existing multi-writer tests.

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this and iterating through the prior review rounds! The fix addresses the single-pending recovery case well. One subtle scenario worth tracing through in the inline comment — the multi-pending restart path (the case @HuangZhenQiu mentioned with multiple checkpoint failures) may still hit a false OCC conflict on the second+ iteration because the just-recommitted instant's reassigned completion time gets filtered out of the baseline. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.

*/
public void preTxnForRecommit(WriteOperationType operationType, HoodieTableMetaClient metaClient, String currentInstantTime) {
if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), config, metaClient.getTableConfig())) {
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, currentInstantTime);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Could you trace through the multi-pending restart case (e.g. @HuangZhenQiu's 3-checkpoint-failure scenario where A and B are both inflight on restart)? In iter 2 for B, A was just recommitted in iter 1, but its completionTime is reassigned to wall-clock-now (post-restart, so > B.requestedTime). The completionTime < currentInstantTime filter then excludes A from the baseline, falling back to some earlier C. SimpleConcurrentFileWritesConflictResolutionStrategy's findInstantsAfter(C.requestedTime) then picks up A as a candidate, and hasConflict(B, A) returns true on the shared file groups typical of streaming upserts — re-triggering the same HoodieWriteConflictException this PR is fixing. The new testCommittingMultipleInstantsWithOCC exercises the regular commitInstants path (which uses the no-arg preTxn), so it wouldn't catch this. @danny0405 thoughts on whether preTxnForRecommit should use the no-arg getLastCompletedTxnInstantAndMetadata here so the just-recommitted instant becomes the baseline?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// Refresh the baseline for subsequent instants so that OCC conflict resolution
// sees the just-committed instant as completed, not as a concurrent conflict.
this.metaClient.reloadActiveTimeline();
this.writeClient.preTxn(tableState.operationType, this.metaClient);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not really work, the preTxn needs to be invoked before each instant starts, so that we can figure out what instants got complted during the current write window.

And another problem with Flink OCC is it does not work as expected, even if the conflict deteced and we throws, the Flink checkpoint already succeeded, there is no way to rollback to the precedding checkpoint by default.

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for iterating on this — the multi-pending recovery concern from the prior review is addressed: with preTxn now invoked inside doCommit and using the no-arg getLastCompletedTxnInstantAndMetadata, the just-recommitted instant A naturally becomes the baseline for the next iteration's recommit of B in restoreEvents, eliminating the false OCC conflict for the multi-checkpoint-failure restart case @HuangZhenQiu described. The boolean[] workaround in commitInstants is also gone now, and the stale-timeline concern in restoreEvents is effectively neutralized because the OCC baseline is refreshed per-commit. One unaddressed concern worth a careful look in the inline comment: moving preTxn out of startInstant and into doCommit shrinks the OCC detection window, which appears to leave normal (non-recovery) multi-writer conflicts undetected — @danny0405 noted earlier that preTxn should be invoked before each instant starts, and the new placement goes the other way. Separately, the new getLastCompletedTxnInstantAndMetadata(metaClient, currentInstantTime) overload in TransactionUtils is no longer used by production code (only by the new tests) — @danny0405's earlier point about it not being necessary for the fix still applies. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.

checkpointCommitMetadata, () -> StreamerUtil.getPreviousCommitMetadata(this.metaClient));

// refresh the last txn metadata for OCC
this.writeClient.preTxn(tableState.operationType, this.metaClient, instant);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Following up on @danny0405's earlier point that preTxn should be invoked before each instant starts: moving it from startInstant into doCommit actually goes the other way and looks like it could regress normal multi-writer OCC. With the baseline captured microseconds before commit, SimpleConcurrentFileWritesConflictResolutionStrategy.getCandidateInstantsV8AndAbove calls findInstantsAfter(lastSuccessful.requestedTime()) (strict >), which excludes the most-recent-completed instant itself — so an external writer that started and completed during our write window becomes the new baseline and is silently dropped from the candidate set (and it's also not in pendingInflightAndRequestedInstants, so getCompletedInstantsDuringCurrentWriteOperation won't catch it either). Spark's BaseHoodieWriteClient.preWrite sets lastCompletedTxnAndMetadata at write start for exactly this reason. Could the previous shape — preTxn at startInstant plus the explicit between-iteration refresh in commitInstants — be restored to preserve the detection window?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@danny0405 danny0405 force-pushed the fix-recommit-occ-conflict branch from 8064260 to 03d4810 Compare June 18, 2026 10:35

@danny0405 danny0405 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the occ does not really work well, we need to think though the solution.

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

No new reviewable changes.

cc @yihua

@danny0405 danny0405 force-pushed the fix-recommit-occ-conflict branch from 2e97849 to 7a0e5d3 Compare June 18, 2026 13:00
@danny0405 danny0405 force-pushed the fix-recommit-occ-conflict branch from 7a0e5d3 to d25fd02 Compare June 18, 2026 13:54

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the iterations on this. The exclusion-list approach is clean, and the single-pending recommit path looks correct. However, I want to flag what looks like a remaining issue in the multi-pending recommit scenario (e.g. @HuangZhenQiu's 3-checkpoint-failure case) — please take a look at the inline comment. Once this is resolved, this should be ready for a Hudi committer or PMC member to take it from here. A couple of naming nits in the new TxnStateMemo class; everything else looks clean.

}
// Initialize the transaction state so same-writer instants can be excluded
// during OCC conflict resolution.
writeClient.preTxn(tableState.operationType, this.metaClient, instant, this.eventBuffers.getAllInstants());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 I think the multi-pending recommit case is still broken — could you double-check? With A and B both inflight on restart, after A's recommit succeeds, doCommit calls eventBuffers.reset(ckp_A), so when B's preTxn runs next, eventBuffers.getAllInstants() returns {B} only — A is no longer in the exclusion set. The coordinator's metaClient isn't reloaded between iterations either (the single reloadActiveTimeline() happens once at restoreEvents entry), so getLastCompletedTxnInstantAndMetadata(metaClient) still returns the pre-A txn as the cutoff. Inside the lock, preCommit builds a fresh table whose timeline now shows A completed; getCandidateInstants returns A via findInstantsAfter(preA.requestedTime()), the exclusion filter drops only B, and A vs B (same file groups from the same writer) throws HoodieWriteConflictException — exactly the failure this PR is fixing. Snapshotting eventBuffers.getAllInstants() once before the loop (or calling metaClient.reloadActiveTimeline() before each preTxn) would close this. @danny0405 could you sanity-check this trace?

- AI-generated; verify before applying. React 👍/👎 to flag quality.


private final Map<String, TxnState> memo = new HashMap<>();

public void memo(String instant,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename this to record or put? Using a noun as a verb (txnStateMemo.memo(...) at the call site) is unusual in Java APIs and reads a bit awkwardly.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return Option.ofNullable(memo.get(instant));
}

public void slip(String instant) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: slip doesn't carry an obvious meaning here — remove or evict would tell a reader immediately what this does without needing to look at the body.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@hudi-bot

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants