maintainer: fast retry temporarily ignored WAITING statuses (#4808)#5204
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a fast retry mechanism for temporarily ignored WAITING statuses by adding an IgnoredBlockStatus to the heartbeat protocol and handling it in both the dispatcher and the maintainer barrier. However, there are multiple critical issues where unresolved git merge conflict markers have been left in the code across several files, including maintainer/barrier.go, event_dispatcher_test.go, and the generated heartbeat.pb.go file. These conflict markers must be resolved and cleaned up before the pull request can be merged.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| <<<<<<< HEAD | ||
| log.Info("Get block status from unexisted dispatcher, ignore it", zap.String("changefeed", request.ChangefeedID.GetName()), zap.String("dispatcher", dispatcherID.String()), zap.Uint64("commitTs", status.State.BlockTs)) | ||
| continue | ||
| } else { | ||
| if !b.spanController.IsReplicating(task) { | ||
| log.Info("Get block status from unreplicating dispatcher, ignore it", zap.String("changefeed", request.ChangefeedID.GetName()), zap.String("dispatcher", dispatcherID.String()), zap.Uint64("commitTs", status.State.BlockTs)) | ||
| ======= | ||
| log.Info("Get block status from unexisted dispatcher, ignore it", | ||
| zap.String("changefeed", request.ChangefeedID.GetName()), | ||
| zap.String("dispatcher", dispatcherID.String()), | ||
| zap.Uint64("commitTs", status.State.BlockTs), | ||
| zap.Int64("mode", b.mode)) | ||
| continue | ||
| } else { | ||
| if !b.spanController.IsReplicating(task) { | ||
| log.Info("Get block status from unreplicating dispatcher, ignore it", | ||
| zap.String("changefeed", request.ChangefeedID.GetName()), | ||
| zap.String("dispatcher", dispatcherID.String()), | ||
| zap.Uint64("commitTs", status.State.BlockTs), | ||
| zap.Int64("mode", b.mode)) | ||
| // A newly added dispatcher may report its first WAITING barrier before the add | ||
| // operator moves it from scheduling to replicating. We still cannot admit that | ||
| // status into barrier, but silently dropping it would leave dispatcher waiting | ||
| // for the slow 5s resend timer. Return IgnoredBlockStatus so it keeps the live | ||
| // WAITING state locally and schedules a fast retry instead. | ||
| dispatcherStatus = append(dispatcherStatus, &heartbeatpb.DispatcherStatus{ | ||
| InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{ | ||
| InfluenceType: heartbeatpb.InfluenceType_Normal, | ||
| DispatcherIDs: []*heartbeatpb.DispatcherID{status.ID}, | ||
| }, | ||
| IgnoredBlockStatus: &heartbeatpb.IgnoredBlockStatus{ | ||
| CommitTs: status.State.BlockTs, | ||
| IsSyncPoint: status.State.IsSyncPoint, | ||
| }, | ||
| }) | ||
| >>>>>>> fcc173191 (maintainer: fast retry temporarily ignored WAITING statuses (#4808)) |
There was a problem hiding this comment.
Unresolved git merge conflict markers found in maintainer/barrier.go. Please clean up the conflict markers and keep the correct implementation.
log.Info("Get block status from unexisted dispatcher, ignore it",
zap.String("changefeed", request.ChangefeedID.GetName()),
zap.String("dispatcher", dispatcherID.String()),
zap.Uint64("commitTs", status.State.BlockTs),
zap.Int64("mode", b.mode))
continue
} else {
if !b.spanController.IsReplicating(task) {
log.Info("Get block status from unreplicating dispatcher, ignore it",
zap.String("changefeed", request.ChangefeedID.GetName()),
zap.String("dispatcher", dispatcherID.String()),
zap.Uint64("commitTs", status.State.BlockTs),
zap.Int64("mode", b.mode))
// A newly added dispatcher may report its first WAITING barrier before the add
// operator moves it from scheduling to replicating. We still cannot admit that
// status into barrier, but silently dropping it would leave dispatcher waiting
// for the slow 5s resend timer. Return IgnoredBlockStatus so it keeps the live
// WAITING state locally and schedules a fast retry instead.
dispatcherStatus = append(dispatcherStatus, &heartbeatpb.DispatcherStatus{
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
InfluenceType: heartbeatpb.InfluenceType_Normal,
DispatcherIDs: []*heartbeatpb.DispatcherID{status.ID},
},
IgnoredBlockStatus: &heartbeatpb.IgnoredBlockStatus{
CommitTs: status.State.BlockTs,
IsSyncPoint: status.State.IsSyncPoint,
},
})There was a problem hiding this comment.
keep the cp change and add the mode log from HEAD
| <<<<<<< HEAD | ||
| ======= |
| return pendingEvent == nil && blockStage == heartbeatpb.BlockStage_NONE | ||
| }, time.Second, 10*time.Millisecond) | ||
| require.Equal(t, int32(1), flushCalls.Load()) | ||
| >>>>>>> fcc173191 (maintainer: fast retry temporarily ignored WAITING statuses (#4808)) |
| return nil | ||
| } | ||
|
|
||
| <<<<<<< HEAD |
|
/cherry-pick-invite -h |
|
@zier-one: adding LGTM is restricted to approvers and reviewers in OWNERS files. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hongyunyan, zier-one The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This is an automated cherry-pick of #4808
What problem does this PR solve?
Issue Number: close #4810
When a non-DDL dispatcher reports a
WAITINGblock status while the maintainer still sees that task as temporarily non-replicating, the status used to be silently ignored. The dispatcher then had to wait for the regular 5-second resend loop before retrying, which unnecessarily delayed barrier recovery and convergence.What is changed and how it works?
IgnoredBlockStatustoheartbeatpb.DispatcherStatusso the maintainer can explicitly tell a dispatcher that its currentWAITINGstatus is temporarily ignored and should be retried soon, and regenerateheartbeat.pb.go.HandleDispatcherStatusnow has an ignored-block branch. It schedules a fast retry only when the hint strictly matches the current in-flightWAITINGblock event.ResendTaskkeeps the original 5-second periodic resend as the fallback path and adds a one-shot fast retry starting at 50ms with exponential backoff up to the same interval; ACK still cancels both the slow and fast resend paths.Before:

After:

Check List
Tests
[x] Unit test
Release note
Summary by CodeRabbit
Bug Fixes
New Features
Tests