Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,9 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
}

// HandleDispatcherStatus handles the dispatcher status from the maintainer to process block events.
// Each dispatcher status may contain an ACK info or a dispatcher action or both.
// Each dispatcher status may contain an ACK info, an ignored-block hint, or a dispatcher action.
// If we get an ack info, we need to check whether the ack is for the ddl event in resend task map. If so, we can cancel the resend task.
// If we get an ignored-block hint for the current waiting event, we schedule one fast retry while keeping the slow fallback resend task.
// If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. If so, we can deal the ddl event based on the action.
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream.
// 2. If the action is a pass, we just need to pass the event
Expand All @@ -612,7 +613,22 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
d.cancelResendTask(identifier)
}

// Step2: deal with the dispatcher action
// Step2: deal with the ignored block status
ignoredBlockStatus := dispatcherStatus.GetIgnoredBlockStatus()
if ignoredBlockStatus != nil && d.blockEventStatus.ignoredStatusMatches(ignoredBlockStatus) {
identifier := BlockEventIdentifier{
CommitTs: ignoredBlockStatus.CommitTs,
IsSyncPoint: ignoredBlockStatus.IsSyncPoint,
}
if task := d.resendTaskMap.Get(identifier); task != nil {
_ = task.Execute()
} else {
log.Info("resendTask not found; fast resend path cannot be executed.", zap.Uint64("CommitTs", ignoredBlockStatus.CommitTs), zap.Bool("IsSyncPoint", ignoredBlockStatus.IsSyncPoint))
}
return false
}

// Step3: deal with the dispatcher action
action := dispatcherStatus.GetAction()
if action != nil {
pendingEvent := d.blockEventStatus.getEvent()
Expand Down Expand Up @@ -682,7 +698,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
}
}

// Step3: whether the outdate message or not, we need to return message show we have finished the event.
// Step4: whether the outdate message or not, we need to return message show we have finished the event.
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
Expand Down
52 changes: 52 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/config/kerneltype"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -522,6 +523,57 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) {
require.Equal(t, int32(1), flushCalls.Load())
}

func TestDispatcherIgnoresStaleIgnoredBlockStatus(t *testing.T) {
tableSpan := getUncompleteTableSpan()
tableSpan.KeyspaceID = getTestingKeyspaceID()
dispatcher := newDispatcherForTest(newDispatcherTestSink(t, common.MysqlSinkType).Sink(), tableSpan)

previousScheduler := GetDispatcherTaskScheduler()
taskScheduler := threadpool.NewThreadPool(1)
SetDispatcherTaskScheduler(taskScheduler)
defer func() {
SetDispatcherTaskScheduler(previousScheduler)
taskScheduler.Stop()
}()

ddlEvent := &commonEvent.DDLEvent{
FinishedTs: 30,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{1},
},
}
dispatcher.blockEventStatus.setBlockEvent(ddlEvent, heartbeatpb.BlockStage_WRITING)
identifier := BlockEventIdentifier{CommitTs: ddlEvent.FinishedTs}
dispatcher.resendTaskMap.Set(identifier, newResendTask(&heartbeatpb.TableSpanBlockStatus{
ID: dispatcher.GetId().ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: ddlEvent.FinishedTs,
BlockTables: &heartbeatpb.InfluencedTables{
InfluenceType: heartbeatpb.InfluenceType_Normal,
TableIDs: []int64{1},
},
Stage: heartbeatpb.BlockStage_WAITING,
},
Mode: dispatcher.GetMode(),
}, dispatcher, nil))
defer dispatcher.cancelResendTask(identifier)

await := dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{
IgnoredBlockStatus: &heartbeatpb.IgnoredBlockStatus{
CommitTs: ddlEvent.FinishedTs,
},
})
require.False(t, await)

select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.FailNow(t, "unexpected fast retry for stale ignored block status", "msg=%v", msg)
case <-time.After(200 * time.Millisecond):
}
}

// test uncompelete table span can correctly handle the ddl events
func TestUncompeleteTableSpanDispatcherHandleEvents(t *testing.T) {
count.Swap(0)
Expand Down
17 changes: 17 additions & 0 deletions downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bo
return b.blockCommitTs == action.CommitTs
}

// ignoredStatusMatches checks whether the ignored status is for the current pending ddl/sync point event.
func (b *BlockEventStatus) ignoredStatusMatches(ignored *heartbeatpb.IgnoredBlockStatus) bool {
b.mutex.Lock()
defer b.mutex.Unlock()

if b.blockPendingEvent == nil {
return false
}

if b.blockStage != heartbeatpb.BlockStage_WAITING {
return false
}

pendingIsSyncPoint := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent
return b.blockCommitTs == ignored.CommitTs && pendingIsSyncPoint == ignored.IsSyncPoint
}

func (b *BlockEventStatus) getEventCommitTs() (uint64, bool) {
b.mutex.Lock()
defer b.mutex.Unlock()
Expand Down
Loading
Loading