Skip to content

Commit 58258ad

Browse files
authored
syncer: reset db connections during resume (pingcap#254)
1 parent caa7fc1 commit 58258ad

File tree

4 files changed

+52
-3
lines changed

4 files changed

+52
-3
lines changed

dm/master/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1870,7 +1870,7 @@ func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name
18701870
resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout)
18711871
var queryResp *pb.QueryTaskOperationResponse
18721872
if err != nil {
1873-
log.L().Error("fail to query task operation", log.ShortError(err))
1873+
log.L().Error("fail to query task operation", zap.String("task", name), zap.Int64("operation log ID", opLogID), log.ShortError(err))
18741874
} else {
18751875
queryResp = resp.QueryTaskOperation
18761876
respLog := queryResp.Log

syncer/checkpoint.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ type CheckPoint interface {
127127
// Close closes the CheckPoint
128128
Close()
129129

130+
// ResetConn resets database connections owned by the Checkpoint
131+
ResetConn() error
132+
130133
// Clear clears all checkpoints
131134
Clear() error
132135

@@ -242,6 +245,11 @@ func (cp *RemoteCheckPoint) Close() {
242245
closeConns(cp.tctx, cp.db)
243246
}
244247

248+
// ResetConn implements CheckPoint.ResetConn
249+
func (cp *RemoteCheckPoint) ResetConn() error {
250+
return cp.db.ResetConn(cp.tctx)
251+
}
252+
245253
// Clear implements CheckPoint.Clear
246254
func (cp *RemoteCheckPoint) Clear() error {
247255
cp.Lock()

syncer/syncer.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,34 @@ func (s *Syncer) reset() {
503503
}
504504
}
505505

506+
func (s *Syncer) resetDBs() error {
507+
var err error
508+
509+
// toDBs share the same `*sql.DB` in underlying `*baseconn.BaseConn`, currently the `BaseConn.ResetConn`
510+
// can only reset the `*sql.DB` and point to the new `*sql.DB`, it is hard to reset all the `*sql.DB` by
511+
// calling `BaseConn.ResetConn` once. On the other side if we simply change the underlying value of a
512+
// `*sql.DB` by `*conn.DB = *db`, there exists some data race and invalid memory address in db driver.
513+
// So we use the close and recreate way here.
514+
closeConns(s.tctx, s.toDBs...)
515+
s.toDBs, err = createConns(s.cfg, s.cfg.To, s.cfg.WorkerCount, maxDMLConnectionTimeout)
516+
if err != nil {
517+
return terror.WithScope(err, terror.ScopeDownstream)
518+
}
519+
s.tctx.L().Info("createDBs", zap.String("toDBs baseConn", fmt.Sprintf("%p", s.toDBs[0].baseConn.DB)))
520+
521+
err = s.ddlDB.ResetConn(s.tctx)
522+
if err != nil {
523+
return terror.WithScope(err, terror.ScopeDownstream)
524+
}
525+
526+
err = s.checkpoint.ResetConn()
527+
if err != nil {
528+
return terror.WithScope(err, terror.ScopeDownstream)
529+
}
530+
531+
return nil
532+
}
533+
506534
// Process implements the dm.Unit interface.
507535
func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) {
508536
syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Add(0)
@@ -1392,7 +1420,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
13921420

13931421
table, columns, err := s.getTable(schemaName, tableName)
13941422
if err != nil {
1395-
return err
1423+
return terror.WithScope(err, terror.ScopeDownstream)
13961424
}
13971425
rows, err := s.mappingDML(originSchema, originTable, columns, ev.Rows)
13981426
if err != nil {
@@ -2205,6 +2233,17 @@ func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult) {
22052233

22062234
// continue the processing
22072235
s.reset()
2236+
// reset database conns
2237+
err := s.resetDBs()
2238+
if err != nil {
2239+
pr <- pb.ProcessResult{
2240+
IsCanceled: false,
2241+
Errors: []*pb.ProcessError{
2242+
unit.NewProcessError(pb.ErrorType_UnknownError, errors.ErrorStack(err)),
2243+
},
2244+
}
2245+
return
2246+
}
22082247
s.Process(ctx, pr)
22092248
}
22102249

syncer/syncer_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1513,7 +1513,9 @@ func (s *testSyncerSuite) TestRun(c *C) {
15131513

15141514
ctx, cancel = context.WithCancel(context.Background())
15151515
resultCh = make(chan pb.ProcessResult)
1516-
go syncer.Resume(ctx, resultCh)
1516+
// simulate `syncer.Resume` here, but doesn't reset database conns
1517+
syncer.reset()
1518+
go syncer.Process(ctx, resultCh)
15171519

15181520
expectJobs2 := []*expectJob{
15191521
{

0 commit comments

Comments
 (0)