Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,10 +1086,11 @@ type EventFilterRule struct {
// regular expression
IgnoreSQL []string `toml:"ignore_sql" json:"ignore_sql"`
// sql expression
IgnoreInsertValueExpr string `json:"ignore_insert_value_expr"`
IgnoreUpdateNewValueExpr string `json:"ignore_update_new_value_expr"`
IgnoreUpdateOldValueExpr string `json:"ignore_update_old_value_expr"`
IgnoreDeleteValueExpr string `json:"ignore_delete_value_expr"`
IgnoreInsertValueExpr string `json:"ignore_insert_value_expr"`
IgnoreUpdateNewValueExpr string `json:"ignore_update_new_value_expr"`
IgnoreUpdateOldValueExpr string `json:"ignore_update_old_value_expr"`
IgnoreDeleteValueExpr string `json:"ignore_delete_value_expr"`
IgnoreUpdateOnlyColumns []string `json:"ignore_update_only_columns,omitempty"`
}

// ToInternalEventFilterRule converts EventFilterRule to *config.EventFilterRule
Expand All @@ -1101,6 +1102,7 @@ func (e EventFilterRule) ToInternalEventFilterRule() *config.EventFilterRule {
IgnoreUpdateNewValueExpr: &e.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: &e.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: &e.IgnoreDeleteValueExpr,
IgnoreUpdateOnlyColumns: e.IgnoreUpdateOnlyColumns,
}
if len(e.IgnoreEvent) != 0 {
res.IgnoreEvent = make([]bf.EventType, len(e.IgnoreEvent))
Expand All @@ -1119,6 +1121,10 @@ func ToAPIEventFilterRule(er *config.EventFilterRule) EventFilterRule {
IgnoreUpdateOldValueExpr: util.GetOrZero(er.IgnoreUpdateOldValueExpr),
IgnoreDeleteValueExpr: util.GetOrZero(er.IgnoreDeleteValueExpr),
}
if len(er.IgnoreUpdateOnlyColumns) != 0 {
res.IgnoreUpdateOnlyColumns = make([]string, len(er.IgnoreUpdateOnlyColumns))
copy(res.IgnoreUpdateOnlyColumns, er.IgnoreUpdateOnlyColumns)
}
if len(er.Matcher) != 0 {
res.Matcher = make([]string, len(er.Matcher))
copy(res.Matcher, er.Matcher)
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type DispatcherService interface {
GetCheckpointTs() uint64
HandleEvents(events []DispatcherEvent, wakeCallback func()) (block bool)
IsOutputRawChangeEvent() bool
EnableIgnoreUpdateOnlyColumns() bool
}

// Dispatcher defines the interface for event dispatchers that are responsible for receiving events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func TestHandleEventsIgnoreSpecialTableOnNonMySQLSink(t *testing.T) {
}
}

func TestEnableIgnoreUpdateOnlyColumns(t *testing.T) {
require.True(t, newTestBasicDispatcher(t, common.KafkaSinkType, false).EnableIgnoreUpdateOnlyColumns())
require.False(t, newTestBasicDispatcher(t, common.MysqlSinkType, false).EnableIgnoreUpdateOnlyColumns())
require.False(t, newTestBasicDispatcher(t, common.PulsarSinkType, false).EnableIgnoreUpdateOnlyColumns())
}

func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) {
dispatcher := newTestBasicDispatcher(t, common.MysqlSinkType, false)
dispatcher.tableModeCompatibilityChecked = true
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func (d *BasicDispatcher) IsOutputRawChangeEvent() bool {
return d.sharedInfo.outputRawChangeEvent
}

func (d *BasicDispatcher) EnableIgnoreUpdateOnlyColumns() bool {
return d.sink.SinkType() == common.KafkaSinkType
}

func (d *BasicDispatcher) GetRouter() routing.Router {
return d.sharedInfo.GetRouter()
}
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func toEventFilterRulePB(rule *config.EventFilterRule) *eventpb.EventFilterRule
IgnoreUpdateOldValueExpr: util.GetOrZero(rule.IgnoreUpdateOldValueExpr),
IgnoreDeleteValueExpr: util.GetOrZero(rule.IgnoreDeleteValueExpr),
}
eventFilterPB.IgnoreUpdateOnlyColumns = append(eventFilterPB.IgnoreUpdateOnlyColumns, rule.IgnoreUpdateOnlyColumns...)

eventFilterPB.Matcher = append(eventFilterPB.Matcher, rule.Matcher...)

Expand Down
44 changes: 23 additions & 21 deletions downstreamadapter/eventcollector/dispatcher_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,20 +519,21 @@ func (s *dispatcherSession) newDispatcherRegisterRequest(serverID string, onlyRe
TableSpan: s.target.GetTableSpan(),
StartTs: startTs,
// ServerId is the id of the request sender.
ServerId: serverID,
ActionType: eventpb.ActionType_ACTION_TYPE_REGISTER,
FilterConfig: s.target.GetFilterConfig(),
EnableSyncPoint: s.target.EnableSyncPoint(),
SyncPointInterval: uint64(syncPointInterval.Seconds()),
SyncPointTs: syncpoint.CalculateStartSyncPointTs(startTs, syncPointInterval, s.target.GetSkipSyncpointAtStartTs()),
OnlyReuse: onlyReuse,
BdrMode: s.target.GetBDRMode(),
Mode: s.target.GetMode(),
Epoch: 0,
Timezone: s.target.GetTimezone(),
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
ServerId: serverID,
ActionType: eventpb.ActionType_ACTION_TYPE_REGISTER,
FilterConfig: s.target.GetFilterConfig(),
EnableSyncPoint: s.target.EnableSyncPoint(),
SyncPointInterval: uint64(syncPointInterval.Seconds()),
SyncPointTs: syncpoint.CalculateStartSyncPointTs(startTs, syncPointInterval, s.target.GetSkipSyncpointAtStartTs()),
OnlyReuse: onlyReuse,
BdrMode: s.target.GetBDRMode(),
Mode: s.target.GetMode(),
Epoch: 0,
Timezone: s.target.GetTimezone(),
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
EnableIgnoreUpdateOnlyColumns: s.target.EnableIgnoreUpdateOnlyColumns(),
},
}
}
Expand Down Expand Up @@ -561,13 +562,14 @@ func (s *dispatcherSession) newDispatcherResetRequest(serverID string, resetTs u
SyncPointInterval: uint64(syncPointInterval.Seconds()),
SyncPointTs: syncpoint.CalculateStartSyncPointTs(resetTs, syncPointInterval, skipSyncpointSameAsResetTs),
// OnlyReuse: false,
BdrMode: s.target.GetBDRMode(),
Mode: s.target.GetMode(),
Epoch: epoch,
Timezone: s.target.GetTimezone(),
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
BdrMode: s.target.GetBDRMode(),
Mode: s.target.GetMode(),
Epoch: epoch,
Timezone: s.target.GetTimezone(),
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
EnableIgnoreUpdateOnlyColumns: s.target.EnableIgnoreUpdateOnlyColumns(),
},
}
}
Expand Down
26 changes: 21 additions & 5 deletions downstreamadapter/eventcollector/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ type mockDispatcher struct {
id common.DispatcherID
changefeedID common.ChangeFeedID
handleEvents func(events []dispatcher.DispatcherEvent, wakeCallback func()) (block bool)
handleError func(err error)
events []dispatcher.DispatcherEvent
checkPointTs uint64
tableSpan *heartbeatpb.TableSpan

skipSyncpointAtStartTs bool
skipSyncpointAtStartTs bool
router routing.Router
enableIgnoreUpdateOnlyColumns bool
}

func newMockDispatcher(id common.DispatcherID, startTs uint64) *mockDispatcher {
Expand Down Expand Up @@ -98,10 +101,6 @@ func (m *mockDispatcher) GetTableSpan() *heartbeatpb.TableSpan {
}
}

func (m *mockDispatcher) GetRouter() routing.Router {
return routing.Router{}
}

func (m *mockDispatcher) GetBDRMode() bool {
return false
}
Expand Down Expand Up @@ -155,6 +154,20 @@ func (m *mockDispatcher) IsOutputRawChangeEvent() bool {
return false
}

func (m *mockDispatcher) EnableIgnoreUpdateOnlyColumns() bool {
return m.enableIgnoreUpdateOnlyColumns
}

func (m *mockDispatcher) GetRouter() routing.Router {
return m.router
}

func (m *mockDispatcher) HandleError(err error) {
if m.handleError != nil {
m.handleError(err)
}
}

// mockEvent implements the Event interface for testing
type mockEvent struct {
eventType int
Expand Down Expand Up @@ -1689,6 +1702,7 @@ func TestRegistrationEntrypoints(t *testing.T) {

// Create a mock dispatcher and event collector
mockDisp := newMockDispatcher(dispatcherID, 0)
mockDisp.enableIgnoreUpdateOnlyColumns = true
mockEventCollector := newTestEventCollector(localServerID)
stat := newDispatcherStat(mockDisp, mockEventCollector, nil)

Expand All @@ -1703,6 +1717,7 @@ func TestRegistrationEntrypoints(t *testing.T) {
require.Equal(t, eventpb.ActionType_ACTION_TYPE_REGISTER, req.ActionType)
require.False(t, req.OnlyReuse, "OnlyReuse should be false for local registration")
require.Equal(t, dispatcherID.ToPB(), req.DispatcherId)
require.True(t, req.EnableIgnoreUpdateOnlyColumns())
case <-time.After(1 * time.Second):
require.Fail(t, "timed out waiting for message")
}
Expand All @@ -1719,6 +1734,7 @@ func TestRegistrationEntrypoints(t *testing.T) {
require.Equal(t, eventpb.ActionType_ACTION_TYPE_REGISTER, req.ActionType)
require.True(t, req.OnlyReuse, "OnlyReuse should be true for remote registration")
require.Equal(t, dispatcherID.ToPB(), req.DispatcherId)
require.True(t, req.EnableIgnoreUpdateOnlyColumns())
case <-time.After(1 * time.Second):
require.Fail(t, "timed out waiting for message")
}
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/eventcollector/event_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (m *mockEventDispatcher) IsOutputRawChangeEvent() bool {
return false
}

func (m *mockEventDispatcher) EnableIgnoreUpdateOnlyColumns() bool {
return false
}

func newMessage(id node.ID, msg messaging.IOTypeT) *messaging.TargetMessage {
targetMessage := messaging.NewSingleTargetMessage(id, messaging.EventCollectorTopic, msg)
targetMessage.From = id
Expand Down
Loading
Loading