Skip to content

Commit c3b9ca9

Browse files
authored
syncer: release mysql dependencies in syncer test (2) (pingcap#226)
* split db test and syncer test
1 parent 2ed7cb6 commit c3b9ca9

File tree

4 files changed

+522
-345
lines changed

4 files changed

+522
-345
lines changed

syncer/db_test.go

+187-5
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,99 @@
1414
package syncer
1515

1616
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"time"
21+
22+
"github.com/pingcap/dm/dm/config"
23+
"github.com/pingcap/dm/pkg/utils"
24+
1725
. "github.com/pingcap/check"
26+
"github.com/pingcap/tidb-tools/pkg/filter"
1827
gouuid "github.com/satori/go.uuid"
1928
"github.com/siddontang/go-mysql/mysql"
20-
21-
"github.com/pingcap/dm/pkg/utils"
29+
"github.com/siddontang/go-mysql/replication"
2230
)
2331

24-
func (s *testSyncerSuite) TestGetServerUUID(c *C) {
32+
var _ = Suite(&testDBSuite{})
33+
34+
type testDBSuite struct {
35+
db *sql.DB
36+
syncer *replication.BinlogSyncer
37+
streamer *replication.BinlogStreamer
38+
cfg *config.SubTaskConfig
39+
}
40+
41+
func (s *testDBSuite) SetUpSuite(c *C) {
42+
s.cfg = &config.SubTaskConfig{
43+
From: getDBConfigFromEnv(),
44+
To: getDBConfigFromEnv(),
45+
ServerID: 102,
46+
MetaSchema: "db_test",
47+
Name: "db_ut",
48+
Mode: config.ModeIncrement,
49+
Flavor: "mysql",
50+
}
51+
s.cfg.From.Adjust()
52+
s.cfg.To.Adjust()
53+
54+
dir := c.MkDir()
55+
s.cfg.RelayDir = dir
56+
57+
var err error
58+
dbAddr := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", s.cfg.From.User, s.cfg.From.Password, s.cfg.From.Host, s.cfg.From.Port)
59+
s.db, err = sql.Open("mysql", dbAddr)
60+
c.Assert(err, IsNil)
61+
62+
s.resetBinlogSyncer(c)
63+
_, err = s.db.Exec("SET GLOBAL binlog_format = 'ROW';")
64+
c.Assert(err, IsNil)
65+
}
66+
67+
func (s *testDBSuite) resetBinlogSyncer(c *C) {
68+
cfg := replication.BinlogSyncerConfig{
69+
ServerID: uint32(s.cfg.ServerID),
70+
Flavor: "mysql",
71+
Host: s.cfg.From.Host,
72+
Port: uint16(s.cfg.From.Port),
73+
User: s.cfg.From.User,
74+
Password: s.cfg.From.Password,
75+
UseDecimal: true,
76+
VerifyChecksum: true,
77+
}
78+
if s.cfg.Timezone != "" {
79+
timezone, err2 := time.LoadLocation(s.cfg.Timezone)
80+
c.Assert(err2, IsNil)
81+
cfg.TimestampStringLocation = timezone
82+
}
83+
84+
if s.syncer != nil {
85+
s.syncer.Close()
86+
}
87+
88+
pos, _, err := utils.GetMasterStatus(s.db, "mysql")
89+
c.Assert(err, IsNil)
90+
91+
s.syncer = replication.NewBinlogSyncer(cfg)
92+
s.streamer, err = s.syncer.StartSync(pos)
93+
c.Assert(err, IsNil)
94+
}
95+
96+
func (s *testDBSuite) TestGetServerUUID(c *C) {
2597
uuid, err := utils.GetServerUUID(s.db, "mysql")
2698
c.Assert(err, IsNil)
2799
_, err = gouuid.FromString(uuid)
28100
c.Assert(err, IsNil)
29101
}
30102

31-
func (s *testSyncerSuite) TestGetServerID(c *C) {
103+
func (s *testDBSuite) TestGetServerID(c *C) {
32104
id, err := utils.GetServerID(s.db)
33105
c.Assert(err, IsNil)
34106
c.Assert(id, Greater, int64(0))
35107
}
36108

37-
func (s *testSyncerSuite) TestBinaryLogs(c *C) {
109+
func (s *testDBSuite) TestBinaryLogs(c *C) {
38110
files, err := getBinaryLogs(s.db)
39111
c.Assert(err, IsNil)
40112
c.Assert(files, Not(HasLen), 0)
@@ -64,3 +136,113 @@ func (s *testSyncerSuite) TestBinaryLogs(c *C) {
64136
c.Assert(remainingSize, Equals, files[fileNum].size)
65137

66138
}
139+
140+
func (s *testDBSuite) TestTimezone(c *C) {
141+
s.cfg.BWList = &filter.Rules{
142+
DoDBs: []string{"~^tztest_.*"},
143+
IgnoreDBs: []string{"stest", "~^foo.*"},
144+
}
145+
146+
createSQLs := []string{
147+
"create database if not exists tztest_1",
148+
"create table if not exists tztest_1.t_1(id int, a timestamp)",
149+
}
150+
151+
testCases := []struct {
152+
sqls []string
153+
timezone string
154+
}{
155+
{
156+
[]string{
157+
"insert into tztest_1.t_1(id, a) values (1, '1990-04-15 01:30:12')",
158+
"insert into tztest_1.t_1(id, a) values (2, '1990-04-15 02:30:12')",
159+
"insert into tztest_1.t_1(id, a) values (3, '1990-04-15 03:30:12')",
160+
},
161+
"Asia/Shanghai",
162+
},
163+
{
164+
[]string{
165+
"insert into tztest_1.t_1(id, a) values (4, '1990-04-15 01:30:12')",
166+
"insert into tztest_1.t_1(id, a) values (5, '1990-04-15 02:30:12')",
167+
"insert into tztest_1.t_1(id, a) values (6, '1990-04-15 03:30:12')",
168+
},
169+
"America/Phoenix",
170+
},
171+
}
172+
queryTs := "select unix_timestamp(a) from `tztest_1`.`t_1` where id = ?"
173+
174+
dropSQLs := []string{
175+
"drop table tztest_1.t_1",
176+
"drop database tztest_1",
177+
}
178+
179+
defer func() {
180+
for _, sql := range dropSQLs {
181+
_, err := s.db.Exec(sql)
182+
c.Assert(err, IsNil)
183+
}
184+
}()
185+
186+
for _, sql := range createSQLs {
187+
_, err := s.db.Exec(sql)
188+
c.Assert(err, IsNil)
189+
}
190+
191+
for _, testCase := range testCases {
192+
s.cfg.Timezone = testCase.timezone
193+
syncer := NewSyncer(s.cfg)
194+
syncer.genRouter()
195+
s.resetBinlogSyncer(c)
196+
197+
// we should not use `sql.DB.Exec` to do query which depends on session variables
198+
// because `sql.DB.Exec` will choose a underlying Conn for every query from the connection pool
199+
// and different Conn using different session
200+
// ref: `sql.DB.Conn`
201+
// and `set @@global` is also not reasonable, because it can not affect sessions already exist
202+
// if we must ensure multi queries use the same session, we should use a transaction
203+
txn, err := s.db.Begin()
204+
c.Assert(err, IsNil)
205+
txn.Exec("set @@session.time_zone = ?", testCase.timezone)
206+
txn.Exec("set @@session.sql_mode = ''")
207+
for _, sql := range testCase.sqls {
208+
_, err = txn.Exec(sql)
209+
c.Assert(err, IsNil)
210+
}
211+
err = txn.Commit()
212+
c.Assert(err, IsNil)
213+
214+
location, err := time.LoadLocation(testCase.timezone)
215+
c.Assert(err, IsNil)
216+
217+
idx := 0
218+
for {
219+
if idx >= len(testCase.sqls) {
220+
break
221+
}
222+
e, err := s.streamer.GetEvent(context.Background())
223+
c.Assert(err, IsNil)
224+
switch ev := e.Event.(type) {
225+
case *replication.RowsEvent:
226+
skip, err := syncer.skipDMLEvent(string(ev.Table.Schema), string(ev.Table.Table), e.Header.EventType)
227+
c.Assert(err, IsNil)
228+
if skip {
229+
continue
230+
}
231+
232+
rowid := ev.Rows[0][0].(int32)
233+
var ts sql.NullInt64
234+
err2 := s.db.QueryRow(queryTs, rowid).Scan(&ts)
235+
c.Assert(err2, IsNil)
236+
c.Assert(ts.Valid, IsTrue)
237+
238+
raw := ev.Rows[0][1].(string)
239+
data, err := time.ParseInLocation("2006-01-02 15:04:05", raw, location)
240+
c.Assert(err, IsNil)
241+
c.Assert(data.Unix(), DeepEquals, ts.Int64)
242+
idx++
243+
default:
244+
continue
245+
}
246+
}
247+
}
248+
}

syncer/ddl_test.go

+21-21
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ package syncer
1515

1616
import (
1717
"bytes"
18-
"database/sql"
1918

2019
"github.com/pingcap/dm/dm/config"
2120
parserpkg "github.com/pingcap/dm/pkg/parser"
2221
"github.com/pingcap/dm/pkg/utils"
2322

23+
"github.com/DATA-DOG/go-sqlmock"
2424
. "github.com/pingcap/check"
2525
"github.com/pingcap/parser"
2626
"github.com/pingcap/tidb-tools/pkg/filter"
@@ -38,7 +38,9 @@ func (s *testSyncerSuite) TestTrimCtrlChars(c *C) {
3838
controlChars = append(controlChars, 0x7f)
3939

4040
var buf bytes.Buffer
41-
p, err := utils.GetParser(s.db, false)
41+
db, mock, err := sqlmock.New()
42+
c.Assert(err, IsNil)
43+
p, err := s.mockParser(db, mock)
4244
c.Assert(err, IsNil)
4345

4446
for _, char := range controlChars {
@@ -67,30 +69,20 @@ func (s *testSyncerSuite) TestAnsiQuotes(c *C) {
6769
"create table test.test (\"id\" int)",
6870
"insert into test.test (\"id\") values('a')",
6971
}
70-
result, err := s.db.Query("select @@global.sql_mode")
71-
var sqlMode sql.NullString
72-
c.Assert(err, IsNil)
73-
defer result.Close()
74-
for result.Next() {
75-
err = result.Scan(&sqlMode)
76-
c.Assert(err, IsNil)
77-
break
78-
}
79-
c.Assert(sqlMode.Valid, IsTrue)
8072

81-
_, err = s.db.Exec("set @@global.sql_mode='ANSI_QUOTES'")
73+
db, mock, err := sqlmock.New()
74+
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE").
75+
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
76+
AddRow("sql_mode", "ANSI_QUOTES"))
8277
c.Assert(err, IsNil)
83-
// recover original sql_mode
84-
defer s.db.Exec("set @@global.sql_mode = ?", sqlMode)
8578

86-
parser, err := utils.GetParser(s.db, false)
79+
parser, err := utils.GetParser(db, false)
8780
c.Assert(err, IsNil)
8881

8982
for _, sql := range ansiQuotesCases {
9083
_, err = parser.ParseOneStmt(sql, "", "")
9184
c.Assert(err, IsNil)
9285
}
93-
9486
}
9587

9688
func (s *testSyncerSuite) TestDDLWithDashComments(c *C) {
@@ -100,7 +92,9 @@ func (s *testSyncerSuite) TestDDLWithDashComments(c *C) {
10092
CREATE TABLE test.test_table_with_c (id int);
10193
`
10294

103-
parser, err := utils.GetParser(s.db, false)
95+
db, mock, err := sqlmock.New()
96+
c.Assert(err, IsNil)
97+
parser, err := s.mockParser(db, mock)
10498
c.Assert(err, IsNil)
10599

106100
_, err = parserpkg.Parse(parser, sql, "", "")
@@ -111,7 +105,9 @@ func (s *testSyncerSuite) TestCommentQuote(c *C) {
111105
sql := "ALTER TABLE schemadb.ep_edu_course_message_auto_reply MODIFY answer JSON COMMENT '回复的内容-格式为list,有两个字段:\"answerType\"://''发送客服消息类型:1-文本消息,2-图片,3-图文链接''; answer:回复内容';"
112106
expectedSQL := "ALTER TABLE `schemadb`.`ep_edu_course_message_auto_reply` MODIFY COLUMN `answer` JSON COMMENT '回复的内容-格式为list,有两个字段:\"answerType\"://''发送客服消息类型:1-文本消息,2-图片,3-图文链接''; answer:回复内容'"
113107

114-
parser, err := utils.GetParser(s.db, false)
108+
db, mock, err := sqlmock.New()
109+
c.Assert(err, IsNil)
110+
parser, err := s.mockParser(db, mock)
115111
c.Assert(err, IsNil)
116112

117113
stmt, err := parser.ParseOneStmt(sql, "", "")
@@ -340,7 +336,9 @@ func (s *testSyncerSuite) TestParseDDLSQL(c *C) {
340336
}
341337
syncer := NewSyncer(cfg)
342338

343-
parser, err := utils.GetParser(s.db, false)
339+
db, mock, err := sqlmock.New()
340+
c.Assert(err, IsNil)
341+
parser, err := s.mockParser(db, mock)
344342
c.Assert(err, IsNil)
345343

346344
for _, cs := range cases {
@@ -371,7 +369,9 @@ func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) {
371369
}
372370

373371
syncer := &Syncer{}
374-
parser, err := utils.GetParser(s.db, false)
372+
db, mock, err := sqlmock.New()
373+
c.Assert(err, IsNil)
374+
parser, err := s.mockParser(db, mock)
375375
c.Assert(err, IsNil)
376376

377377
for _, tc := range testCases {

0 commit comments

Comments
 (0)