Skip to content
Draft

[WIP] #5149

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,10 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
efs = append(efs, ef.ToInternalEventFilterRule())
}
res.Filter = &config.FilterConfig{
Rules: c.Filter.Rules,
IgnoreTxnStartTs: c.Filter.IgnoreTxnStartTs,
EventFilters: efs,
Rules: c.Filter.Rules,
IgnoreTxnStartTs: c.Filter.IgnoreTxnStartTs,
DebugSkipDDLTypes: c.Filter.DebugSkipDDLTypes,
EventFilters: efs,
}
}
if c.Consistent != nil {
Expand Down Expand Up @@ -695,10 +696,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}

res.Filter = &FilterConfig{
Rules: cloned.Filter.Rules,
IgnoreTxnStartTs: cloned.Filter.IgnoreTxnStartTs,
EventFilters: efs,
Rules: cloned.Filter.Rules,
IgnoreTxnStartTs: cloned.Filter.IgnoreTxnStartTs,
DebugSkipDDLTypes: make([]bf.EventType, len(cloned.Filter.DebugSkipDDLTypes)),
EventFilters: efs,
}
copy(res.Filter.DebugSkipDDLTypes, cloned.Filter.DebugSkipDDLTypes)
}
if cloned.Sink != nil {
var dispatchRules []*DispatchRule
Expand Down Expand Up @@ -1065,9 +1068,10 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
// FilterConfig represents filter config for a changefeed
// This is a duplicate of config.FilterConfig
type FilterConfig struct {
Rules []string `json:"rules,omitempty"`
IgnoreTxnStartTs []uint64 `json:"ignore_txn_start_ts,omitempty"`
EventFilters []EventFilterRule `json:"event_filters,omitempty"`
Rules []string `json:"rules,omitempty"`
IgnoreTxnStartTs []uint64 `json:"ignore_txn_start_ts,omitempty"`
DebugSkipDDLTypes []bf.EventType `json:"debug_skip_ddl_types,omitempty"`
EventFilters []EventFilterRule `json:"event_filters,omitempty"`
}

// MounterConfig represents mounter config for a changefeed
Expand Down
10 changes: 7 additions & 3 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,15 @@ func (d *DispatcherMap[T]) ForEach(fn func(id common.DispatcherID, dispatcher T)

func toFilterConfigPB(filter *config.FilterConfig) *eventpb.InnerFilterConfig {
filterConfig := &eventpb.InnerFilterConfig{
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
EventFilters: make([]*eventpb.EventFilterRule, 0),
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
DebugSkipDdlTypes: make([]string, 0, len(filter.DebugSkipDDLTypes)),
EventFilters: make([]*eventpb.EventFilterRule, 0),
}

for _, eventType := range filter.DebugSkipDDLTypes {
filterConfig.DebugSkipDdlTypes = append(filterConfig.DebugSkipDdlTypes, string(eventType))
}
for _, eventFilterRule := range filter.EventFilters {
filterConfig.EventFilters = append(filterConfig.EventFilters, toEventFilterRulePB(eventFilterRule))
}
Expand Down
211 changes: 134 additions & 77 deletions eventpb/event.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions eventpb/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message InnerFilterConfig {
repeated string rules = 1;
repeated uint64 ignore_txn_start_ts = 2;
repeated EventFilterRule EventFilters = 3;
repeated string debug_skip_ddl_types = 4;
}

message FilterConfig {
Expand Down
37 changes: 33 additions & 4 deletions logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,40 @@ func (s *keyspaceSchemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) {
return
}
}

if !filter.IsSysSchema(ddlEvent.Job.SchemaName) {
s.unsortedCache.addDDLEvent(ddlEvent)
}
}

func filterIgnoredDDLEvents(events []commonEvent.DDLEvent, tableFilter filter.Filter) []commonEvent.DDLEvent {
if tableFilter == nil || len(events) == 0 {
return events
}

filteredEvents := events[:0]
for _, event := range events {
skipped, err := tableFilter.ShouldSkipDDLEventInSchemaStore(event.GetDDLType(), event.Query)
if err != nil {
log.Warn("schema store ddl filter failed",
zap.Any("type", event.GetDDLType()),
zap.Uint64("finishedTs", event.FinishedTs),
zap.String("query", event.Query),
zap.Error(err))
filteredEvents = append(filteredEvents, event)
continue
}
if skipped {
log.Info("skip ddl event by debug-skip-ddl-types",
zap.Any("type", event.GetDDLType()),
zap.Uint64("finishedTs", event.FinishedTs),
zap.String("query", event.Query))
continue
}
filteredEvents = append(filteredEvents, event)
}
return filteredEvents
}

// TODO tenfyzhong 2025-09-13 13:40:26 use a chan to decoupling
// advancePendingResolvedTs will be call by ddlJobFetcher when it fetched a new ddl event
// it will update the pendingResolvedTs and notify the updateResolvedTs goroutine to apply the ddl event
Expand Down Expand Up @@ -432,7 +460,7 @@ func (s *schemaStore) FetchTableDDLEvents(
if err != nil {
return nil, err
}
return events, nil
return filterIgnoredDDLEvents(events, tableFilter), nil
}

// FetchTableTriggerDDLEvents returns the next ddl events which finishedTs are within the range (start, end]
Expand All @@ -453,8 +481,9 @@ func (s *schemaStore) FetchTableTriggerDDLEvents(keyspaceMeta common.KeyspaceMet
return nil, 0, err
}

filteredEvents := filterIgnoredDDLEvents(events, tableFilter)
if len(events) == limit {
return events, events[limit-1].FinishedTs, nil
return filteredEvents, events[limit-1].FinishedTs, nil
}
end := currentResolvedTs
// after we get currentResolvedTs, there may be new ddl events with FinishedTs > currentResolvedTs
Expand All @@ -469,7 +498,7 @@ func (s *schemaStore) FetchTableTriggerDDLEvents(keyspaceMeta common.KeyspaceMet
zap.Int("limit", limit),
zap.Uint64("end", end),
zap.Any("events", events))
return events, end, nil
return filteredEvents, end, nil
}

// RegisterKeyspace register a keyspace to fetch table ddl
Expand Down
120 changes: 120 additions & 0 deletions logservice/schemastore/schema_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ import (
"time"

"github.com/pingcap/log"
bf "github.com/pingcap/ticdc/pkg/binlog-filter"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
cdcfilter "github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -110,6 +115,121 @@ func TestIgnoreDDLByCommitTs(t *testing.T) {
require.NotContains(t, tableNames, "t2")
}

func TestDebugSkipDDLByType(t *testing.T) {
mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

dir := t.TempDir()
pstorage := newPersistentStorageForTest(dir, nil)
defer func() {
err := pstorage.close()
require.NoError(t, err)
}()

store := &keyspaceSchemaStore{
pdClock: mockPDClock,
unsortedCache: newDDLCache(),
dataStorage: pstorage,
notifyCh: make(chan any, 1),
}
store.resolvedTs.Store(pstorage.gcTs)
store.pendingResolvedTs.Store(pstorage.gcTs)

tableFilter, err := cdcfilter.NewFilter(&config.FilterConfig{
Rules: []string{"*.*"},
DebugSkipDDLTypes: []bf.EventType{bf.RenameTable},
}, "", false, false)
require.NoError(t, err)

ddlJobs := []DDLJobWithCommitTs{
{
Job: buildCreateSchemaJobForTest(100, "test", 1000),
CommitTs: 1000,
},
{
Job: buildCreateTableJobForTest(100, 200, "t1", 1010),
CommitTs: 1010,
},
{
Job: buildRenameTableJobForTest(100, 200, "t2", 1020, &model.InvolvingSchemaInfo{
Database: "test",
Table: "t1",
}),
CommitTs: 1020,
},
}
for _, ddl := range ddlJobs {
ddl.Job.BinlogInfo.SchemaVersion = 100
}

for _, ddl := range ddlJobs {
store.writeDDLEvent(ddl)
}
store.advancePendingResolvedTs(1020)
store.tryUpdateResolvedTs()

require.Eventually(t, func() bool {
return store.resolvedTs.Load() >= 1020
}, 5*time.Second, 10*time.Millisecond)

tables, err := pstorage.getAllPhysicalTables(1020, nil)
require.NoError(t, err)
require.Len(t, tables, 1)
require.Equal(t, "t2", tables[0].TableName)

err = pstorage.registerTable(200, 1010)
require.NoError(t, err)
ss := &schemaStore{
keyspaceSchemaStoreMap: map[uint32]*keyspaceSchemaStore{
common.DefaultKeyspace.ID: store,
},
}
tableEvents, err := ss.FetchTableDDLEvents(common.DefaultKeyspace, common.NewDispatcherID(), 200, tableFilter, 1010, 1020)
require.NoError(t, err)
require.Empty(t, tableEvents)

triggerEvents, end, err := ss.FetchTableTriggerDDLEvents(common.DefaultKeyspace, common.NewDispatcherID(), tableFilter, 1010, 10)
require.NoError(t, err)
require.Empty(t, triggerEvents)
require.Equal(t, uint64(1020), end)
}

func TestDebugSkipDDLEventByTypeCoversSupportedDDLTypes(t *testing.T) {
for _, actionType := range cdcfilter.SupportedDDLActionTypes() {
ddlEvent := commonEvent.DDLEvent{
Type: byte(actionType),
Query: "ddl",
}
eventType := cdcfilter.DDLToEventType(actionType)
require.NotEqual(t, bf.NullEvent, eventType, actionType.String())

require.True(t,
shouldSkipDDLEventInSchemaStore(t, ddlEvent, []bf.EventType{eventType}),
actionType.String())
require.True(t,
shouldSkipDDLEventInSchemaStore(t, ddlEvent, []bf.EventType{bf.AllDDL}),
actionType.String())

if cdcfilter.IsAlterTableDDL(actionType) {
require.True(t,
shouldSkipDDLEventInSchemaStore(t, ddlEvent, []bf.EventType{bf.AlterTable}),
actionType.String())
}
}
}

func shouldSkipDDLEventInSchemaStore(t *testing.T, ddlEvent commonEvent.DDLEvent, skipDDLTypes []bf.EventType) bool {
t.Helper()
tableFilter, err := cdcfilter.NewFilter(&config.FilterConfig{
Rules: []string{"*.*"},
DebugSkipDDLTypes: skipDDLTypes,
}, "", false, false)
require.NoError(t, err)
skipped, err := tableFilter.ShouldSkipDDLEventInSchemaStore(ddlEvent.GetDDLType(), ddlEvent.Query)
require.NoError(t, err)
return skipped
}

func TestGetAllPhysicalTablesReturnsSnapshotLostByGCError(t *testing.T) {
dir := t.TempDir()
pstorage := newPersistentStorageForTest(dir, nil)
Expand Down
6 changes: 5 additions & 1 deletion pkg/binlog-filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func ClassifyEvent(event EventType) (EventType, error) {
AlterIndexVisibility,
AlterTTLInfo,
AlterTTLRemove,
MultiSchemaChange:
MultiSchemaChange,
AddForeignKey,
DropForeignKey,
AddFullTextIndex,
CreateHybridIndex:
return incompatibleDDL, nil
default:
return NullEvent, errors.NotValidf("event type %s", event)
Expand Down
6 changes: 5 additions & 1 deletion pkg/binlog-filter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func toEventType(es string) (EventType, error) {
AddTablePartition,
DropTablePartition,
TruncateTablePartition,
AddForeignKey,
DropForeignKey,

IncompatibleDDLChanges,
ValueRangeDecrease,
Expand Down Expand Up @@ -118,7 +120,9 @@ func toEventType(es string) (EventType, error) {
AlterIndexVisibility,
AlterTTLInfo,
AlterTTLRemove,
MultiSchemaChange:
MultiSchemaChange,
AddFullTextIndex,
CreateHybridIndex:
return event, nil
case CreateSchema: // alias of CreateDatabase
return CreateDatabase, nil
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func (c *DebugConfig) ValidateAndAdjust() error {
if err := c.Scheduler.ValidateAndAdjust(); err != nil {
return errors.Trace(err)
}
if c.SchemaStore == nil {
c.SchemaStore = NewDefaultSchemaStoreConfig()
}
if err := c.SchemaStore.ValidateAndAdjust(); err != nil {
return errors.Trace(err)
}

return nil
}
Expand Down Expand Up @@ -113,6 +119,14 @@ func NewDefaultSchemaStoreConfig() *SchemaStoreConfig {
}
}

// ValidateAndAdjust validates and adjusts the schema store configuration.
func (c *SchemaStoreConfig) ValidateAndAdjust() error {
if c.IgnoreDDLCommitTs == nil {
c.IgnoreDDLCommitTs = []uint64{}
}
return nil
}

// EventServiceConfig represents config for event service
type EventServiceConfig struct {
ScanTaskQueueSize int `toml:"scan-task-queue-size" json:"scan_task_queue_size"`
Expand Down
14 changes: 8 additions & 6 deletions pkg/config/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ import (

// FilterConfig represents filter config for a changefeed
type FilterConfig struct {
Rules []string `toml:"rules" json:"rules"`
IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"`
EventFilters []*EventFilterRule `toml:"event-filters" json:"event-filters"`
Rules []string `toml:"rules" json:"rules"`
IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"`
DebugSkipDDLTypes []bf.EventType `toml:"debug-skip-ddl-types" json:"debug-skip-ddl-types"`
EventFilters []*EventFilterRule `toml:"event-filters" json:"event-filters"`
}

func NewDefaultFilterConfig() *FilterConfig {
return &FilterConfig{
Rules: []string{"*.*"},
IgnoreTxnStartTs: []uint64{},
EventFilters: []*EventFilterRule{},
Rules: []string{"*.*"},
IgnoreTxnStartTs: []uint64{},
DebugSkipDDLTypes: []bf.EventType{},
EventFilters: []*EventFilterRule{},
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/config/server_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ func TestSchedulerConfigValidateAndAdjust(t *testing.T) {
require.Error(t, conf.ValidateAndAdjust())
}

func TestSchemaStoreConfigValidateAndAdjust(t *testing.T) {
t.Parallel()
conf := GetDefaultServerConfig().Clone().Debug.SchemaStore
conf.IgnoreDDLCommitTs = nil
require.NoError(t, conf.ValidateAndAdjust())
require.NotNil(t, conf.IgnoreDDLCommitTs)
}

func TestIsValidClusterID(t *testing.T) {
cases := []struct {
id string
Expand Down
Loading
Loading