Skip to content

Commit 2ed7cb6

Browse files
authored
dm-worker: refine db operation (pingcap#236)
* syncer db conn * loader db refine * remove useless code * add db retry policy * remove syncer max retry config * add normal retry policy * update retry strategy * add context for retry * update to terror * add invalid strategy * update retry invalid connection * update retry strategy * add retry strategy comment * add reset connection logic * use same baseConn in Conns * udpate syncer dbs * unify query in log * share same *DB by all conns * split load execute sql with different retryFn * unify retry logic to pkg * put retry error together * unify syncer conn helper function * update retry logic and some tests error formats * add baseConn test * make baseconn a pkg * abstract retry strategy * add querySQL args * add whiteList error retry logic * unify retry error in syncer/loader * keep max-retry in syncerConifg
1 parent cb56783 commit 2ed7cb6

File tree

41 files changed

+801
-526
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+801
-526
lines changed

dm/config/subtask.go

-4
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,6 @@ func (c *SubTaskConfig) Adjust() error {
219219
c.MetaSchema = defaultMetaSchema
220220
}
221221

222-
if c.MaxRetry == 0 {
223-
c.MaxRetry = 1
224-
}
225-
226222
if !c.DisableHeartbeat {
227223
c.EnableHeartbeat = true
228224
}

dm/config/task.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ var (
5555
// SyncerConfig
5656
defaultWorkerCount = 16
5757
defaultBatch = 100
58-
defaultMaxRetry = 100
5958
)
6059

6160
// Meta represents binlog's meta pos
@@ -184,7 +183,8 @@ type SyncerConfig struct {
184183
MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly
185184
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
186185
Batch int `yaml:"batch" toml:"batch" json:"batch"`
187-
MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"`
186+
// deprecated
187+
MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"`
188188

189189
// refine following configs to top level configs?
190190
AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"`
@@ -198,7 +198,6 @@ func defaultSyncerConfig() SyncerConfig {
198198
return SyncerConfig{
199199
WorkerCount: defaultWorkerCount,
200200
Batch: defaultBatch,
201-
MaxRetry: defaultMaxRetry,
202201
}
203202
}
204203

dm/dm-ansible/conf/task.yaml.example

-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ mysql-instances: # one or more source database, config more source d
7474
syncer:
7575
worker-count: 16
7676
batch: 100
77-
max-retry: 100
7877

7978
# other common configs shared by all instances
8079

@@ -150,4 +149,3 @@ syncers: # syncer process unit specific configs, mysql insta
150149
global:
151150
worker-count: 16
152151
batch: 100
153-
max-retry: 100

dm/master/server_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ syncers:
118118
global:
119119
worker-count: 16
120120
batch: 100
121-
max-retry: 100
122121
`
123122

124123
var (

dm/master/task.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ mysql-instances: # one or more source database, config more source d
7474
syncer:
7575
worker-count: 16
7676
batch: 100
77-
max-retry: 100
7877

7978
# other common configs shared by all instances
8079

@@ -150,4 +149,3 @@ syncers: # syncer process unit specific configs, mysql insta
150149
global:
151150
worker-count: 16
152151
batch: 100
153-
max-retry: 100

dm/worker/subtask.toml

-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ meta-file = "./syncer.subTaskA.meta"
5858
worker-count = 16
5959
batch = 1000
6060

61-
# max-retry is used for retry when network interruption.
62-
max-retry = 100
6361

6462
# target database timezone, all timestamp event in binlog will translate to format time based on this timezone, default use local timezone
6563
# timezone = "Asia/Shanghai"

loader/checkpoint.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (cp *RemoteCheckPoint) prepare() error {
112112

113113
func (cp *RemoteCheckPoint) createSchema() error {
114114
sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema)
115-
err := cp.conn.executeSQL(cp.tctx, []string{sql2}, true)
115+
err := cp.conn.executeSQL(cp.tctx, []string{sql2})
116116
return terror.WithScope(err, terror.ScopeDownstream)
117117
}
118118

@@ -131,7 +131,7 @@ func (cp *RemoteCheckPoint) createTable() error {
131131
);
132132
`
133133
sql2 := fmt.Sprintf(createTable, tableName)
134-
err := cp.conn.executeSQL(cp.tctx, []string{sql2}, true)
134+
err := cp.conn.executeSQL(cp.tctx, []string{sql2})
135135
return terror.WithScope(err, terror.ScopeDownstream)
136136
}
137137

@@ -143,7 +143,7 @@ func (cp *RemoteCheckPoint) Load() error {
143143
}()
144144

145145
query := fmt.Sprintf("SELECT `filename`,`cp_schema`,`cp_table`,`offset`,`end_pos` from `%s`.`%s` where `id`=?", cp.schema, cp.table)
146-
rows, err := cp.conn.querySQL(cp.tctx, query, queryRetryCount, cp.id)
146+
rows, err := cp.conn.querySQL(cp.tctx, query, cp.id)
147147
if err != nil {
148148
return terror.WithScope(err, terror.ScopeDownstream)
149149
}
@@ -279,7 +279,8 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
279279
zap.String("table", fields[1]),
280280
zap.Int64("offset", 0),
281281
zap.Int64("end position", endPos))
282-
err := cp.conn.executeCheckpointSQL(cp.tctx, sql2, maxRetryCount, cp.id, filename, fields[0], fields[1], 0, endPos)
282+
args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos}
283+
err := cp.conn.executeSQL(cp.tctx, []string{sql2}, args)
283284
if err != nil {
284285
if isErrDupEntry(err) {
285286
cp.tctx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename))
@@ -305,14 +306,14 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
305306
// Clear implements CheckPoint.Clear
306307
func (cp *RemoteCheckPoint) Clear() error {
307308
sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id)
308-
err := cp.conn.executeSQL(cp.tctx, []string{sql2}, true)
309+
err := cp.conn.executeSQL(cp.tctx, []string{sql2})
309310
return terror.WithScope(err, terror.ScopeDownstream)
310311
}
311312

312313
// Count implements CheckPoint.Count
313314
func (cp *RemoteCheckPoint) Count() (int, error) {
314315
query := fmt.Sprintf("SELECT COUNT(id) FROM `%s`.`%s` WHERE `id` = ?", cp.schema, cp.table)
315-
rows, err := cp.conn.querySQL(cp.tctx, query, queryRetryCount, cp.id)
316+
rows, err := cp.conn.querySQL(cp.tctx, query, cp.id)
316317
if err != nil {
317318
return 0, terror.WithScope(err, terror.ScopeDownstream)
318319
}

loader/checkpoint_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
118118
defer closeConn(conn)
119119
for _, cs := range cases {
120120
sql2 := cp.GenSQL(cs.filename, cs.endPos)
121-
err = conn.executeSQL(tctx, []string{sql2}, true)
121+
err = conn.executeSQL(tctx, []string{sql2})
122122
c.Assert(err, IsNil)
123123
}
124124

0 commit comments

Comments
 (0)