Skip to content

Commit 8cbb63b

Browse files
authored
Merge branch 'master' into master
2 parents a0552d7 + 28a2e74 commit 8cbb63b

File tree

5 files changed

+214
-57
lines changed

5 files changed

+214
-57
lines changed

replication/backup.go

Lines changed: 89 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p mysql.Position, timeout t
2727
}
2828
}
2929

30+
func (b *BinlogSyncer) StartBackupGTID(backupDir string, gset mysql.GTIDSet, timeout time.Duration) error {
31+
err := os.MkdirAll(backupDir, 0o755)
32+
if err != nil {
33+
return errors.Trace(err)
34+
}
35+
if b.cfg.SynchronousEventHandler == nil {
36+
return b.StartBackupWithHandlerAndGTID(gset, timeout, func(filename string) (io.WriteCloser, error) {
37+
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
38+
})
39+
} else {
40+
return b.StartSynchronousBackupWithGTID(gset, timeout)
41+
}
42+
}
43+
3044
// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
3145
// The process will continue until the timeout is reached or an error occurs.
3246
// This method should not be used together with SynchronousEventHandler.
@@ -54,52 +68,72 @@ func (b *BinlogSyncer) StartBackupWithHandler(p mysql.Position, timeout time.Dur
5468
backupHandler := &BackupEventHandler{
5569
handler: handler,
5670
}
57-
5871
s, err := b.StartSync(p)
5972
if err != nil {
6073
return errors.Trace(err)
6174
}
75+
return processWithHandler(b, s, backupHandler, timeout)
76+
}
6277

63-
defer func() {
64-
if backupHandler.w != nil {
65-
closeErr := backupHandler.w.Close()
66-
if retErr == nil {
67-
retErr = closeErr
68-
}
69-
}
70-
}()
78+
// StartBackupWithHandlerAndGTID starts the backup process for the binary log using the specified GTID set and handler.
79+
// - gset: The GTID set from which to begin the backup.
80+
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
81+
// If set to 0, a default very long timeout (30 days) is used instead.
82+
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
83+
func (b *BinlogSyncer) StartBackupWithHandlerAndGTID(gset mysql.GTIDSet, timeout time.Duration,
84+
handler func(binlogFilename string) (io.WriteCloser, error),
85+
) (retErr error) {
86+
if timeout == 0 {
87+
// a very long timeout here
88+
timeout = 30 * 3600 * 24 * time.Second
89+
}
90+
if b.cfg.SynchronousEventHandler != nil {
91+
return errors.New("StartBackupWithHandlerAndGTID cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackupWithGTID instead.")
92+
}
7193

72-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
73-
defer cancel()
94+
// Force use raw mode
95+
b.parser.SetRawMode(true)
7496

75-
for {
76-
select {
77-
case <-ctx.Done():
78-
return nil
79-
case <-b.ctx.Done():
80-
return nil
81-
case err := <-s.ech:
82-
return errors.Trace(err)
83-
case e := <-s.ch:
84-
err = backupHandler.HandleEvent(e)
85-
if err != nil {
86-
return errors.Trace(err)
87-
}
88-
}
97+
// Set up the backup event handler
98+
backupHandler := &BackupEventHandler{
99+
handler: handler,
100+
}
101+
102+
s, err := b.StartSyncGTID(gset)
103+
if err != nil {
104+
return errors.Trace(err)
89105
}
106+
return processWithHandler(b, s, backupHandler, timeout)
90107
}
91108

92109
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
93110
func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Duration) error {
94111
if b.cfg.SynchronousEventHandler == nil {
95112
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
96113
}
97-
98114
s, err := b.StartSync(p)
99115
if err != nil {
100116
return errors.Trace(err)
101117
}
102118

119+
return process(b, s, timeout)
120+
}
121+
122+
// StartSynchronousBackupWithGTID starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig with a specified GTID set.
123+
func (b *BinlogSyncer) StartSynchronousBackupWithGTID(gset mysql.GTIDSet, timeout time.Duration) error {
124+
if b.cfg.SynchronousEventHandler == nil {
125+
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackupWithGTID")
126+
}
127+
128+
s, err := b.StartSyncGTID(gset)
129+
if err != nil {
130+
return errors.Trace(err)
131+
}
132+
133+
return process(b, s, timeout)
134+
}
135+
136+
func process(b *BinlogSyncer, s *BinlogStreamer, timeout time.Duration) error {
103137
var ctx context.Context
104138
var cancel context.CancelFunc
105139

@@ -123,6 +157,35 @@ func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Dur
123157
}
124158
}
125159

160+
func processWithHandler(b *BinlogSyncer, s *BinlogStreamer, backupHandler *BackupEventHandler, timeout time.Duration) (retErr error) {
161+
defer func() {
162+
if backupHandler.w != nil {
163+
closeErr := backupHandler.w.Close()
164+
if retErr == nil {
165+
retErr = closeErr
166+
}
167+
}
168+
}()
169+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
170+
defer cancel()
171+
172+
for {
173+
select {
174+
case <-ctx.Done():
175+
return nil
176+
case <-b.ctx.Done():
177+
return nil
178+
case err := <-s.ech:
179+
return errors.Trace(err)
180+
case e := <-s.ch:
181+
err := backupHandler.HandleEvent(e)
182+
if err != nil {
183+
return errors.Trace(err)
184+
}
185+
}
186+
}
187+
}
188+
126189
// BackupEventHandler handles writing events for backup
127190
type BackupEventHandler struct {
128191
handler func(binlogFilename string) (io.WriteCloser, error)

replication/backup_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package replication
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"os"
78
"path"
89
"time"
910

11+
"github.com/google/uuid"
12+
1013
"github.com/stretchr/testify/require"
1114

1215
"github.com/go-mysql-org/go-mysql/mysql"
@@ -61,6 +64,16 @@ func (t *testSyncerSuite) TestSyncBackup() {
6164
testBackup(t, true) // true indicates synchronous mode
6265
}
6366

67+
// TestAsyncBackupWithGTID runs the backup process in asynchronous mode with GTID and verifies binlog file creation.
68+
func (t *testSyncerSuite) TestAsyncBackupWithGTID() {
69+
testBackUpWithGTID(t, false) // false indicates asynchronous mode
70+
}
71+
72+
// TestSyncBackupWithGTID runs the backup process in synchronous mode with GTID and verifies binlog file creation.
73+
func (t *testSyncerSuite) TestSyncBackupWithGTID() {
74+
testBackUpWithGTID(t, true) // true indicates synchronous mode
75+
}
76+
6477
// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
6578
func testBackup(t *testSyncerSuite, isSynchronous bool) {
6679
t.setupTest(mysql.MySQLFlavor)
@@ -111,6 +124,71 @@ func testBackup(t *testSyncerSuite, isSynchronous bool) {
111124
}
112125
}
113126

127+
func testBackUpWithGTID(t *testSyncerSuite, isSynchronous bool) {
128+
t.setupTest(mysql.MySQLFlavor)
129+
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled
130+
131+
binlogDir := "./var"
132+
os.RemoveAll(binlogDir)
133+
timeout := 3 * time.Second
134+
135+
if isSynchronous {
136+
// Set up a BackupEventHandler for synchronous mode
137+
backupHandler := NewBackupEventHandler(
138+
func(filename string) (io.WriteCloser, error) {
139+
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
140+
},
141+
)
142+
t.b.cfg.SynchronousEventHandler = backupHandler
143+
} else {
144+
// Ensure SynchronousEventHandler is nil for asynchronous mode
145+
t.b.cfg.SynchronousEventHandler = nil
146+
}
147+
148+
r, err := t.c.Execute("SELECT @@gtid_mode")
149+
require.NoError(t.T(), err)
150+
modeOn, _ := r.GetString(0, 0)
151+
if modeOn != "ON" {
152+
t.T().Skip("GTID mode is not ON")
153+
}
154+
155+
r, err = t.c.Execute("SHOW GLOBAL VARIABLES LIKE 'SERVER_UUID'")
156+
require.NoError(t.T(), err)
157+
158+
var masterUuid uuid.UUID
159+
if s, _ := r.GetString(0, 1); len(s) > 0 && s != "NONE" {
160+
masterUuid, err = uuid.Parse(s)
161+
require.NoError(t.T(), err)
162+
}
163+
164+
set, _ := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d-%d", masterUuid.String(), 1, 2))
165+
done := make(chan bool)
166+
167+
// Start the backup process in a goroutine
168+
go func() {
169+
err := t.b.StartBackupGTID(binlogDir, set, timeout)
170+
require.NoError(t.T(), err)
171+
done <- true
172+
}()
173+
174+
failTimeout := 2 * timeout
175+
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
176+
defer cancel()
177+
178+
// Wait for the backup to complete or timeout
179+
select {
180+
case <-done:
181+
files, err := os.ReadDir(binlogDir)
182+
require.NoError(t.T(), err, "Failed to read binlog directory")
183+
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
184+
mode := modeLabel(isSynchronous)
185+
t.T().Logf("Backup completed successfully in %s mode using GTID with %d binlog file(s).", mode, len(files))
186+
case <-ctx.Done():
187+
mode := modeLabel(isSynchronous)
188+
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
189+
}
190+
}
191+
114192
func modeLabel(isSynchronous bool) string {
115193
if isSynchronous {
116194
return "synchronous"

replication/row_event.go

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,7 +1317,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial boo
13171317
n = 4
13181318
t := binary.LittleEndian.Uint32(data)
13191319
if t == 0 {
1320-
v = formatZeroTime(0, 0)
1320+
v = "0000-00-00 00:00:00"
13211321
} else {
13221322
v = e.parseFracTime(fracTime{
13231323
Time: time.Unix(int64(t), 0),
@@ -1332,26 +1332,37 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial boo
13321332
n = 8
13331333
i64 := binary.LittleEndian.Uint64(data)
13341334
if i64 == 0 {
1335-
v = formatZeroTime(0, 0)
1335+
v = "0000-00-00 00:00:00"
13361336
} else {
13371337
d := i64 / 1000000
13381338
t := i64 % 1000000
1339-
v = e.parseFracTime(fracTime{
1340-
Time: time.Date(
1341-
int(d/10000),
1342-
time.Month((d%10000)/100),
1343-
int(d%100),
1344-
int(t/10000),
1345-
int((t%10000)/100),
1346-
int(t%100),
1347-
0,
1348-
time.UTC,
1349-
),
1350-
Dec: 0,
1351-
})
1339+
years := int(d / 10000)
1340+
months := int(d%10000) / 100
1341+
days := int(d % 100)
1342+
hours := int(t / 10000)
1343+
minutes := int(t%10000) / 100
1344+
seconds := int(t % 100)
1345+
if !e.parseTime || months == 0 || days == 0 {
1346+
v = fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d",
1347+
years, months, days, hours, minutes, seconds)
1348+
} else {
1349+
v = e.parseFracTime(fracTime{
1350+
Time: time.Date(
1351+
years,
1352+
time.Month(months),
1353+
days,
1354+
hours,
1355+
minutes,
1356+
seconds,
1357+
0,
1358+
time.UTC,
1359+
),
1360+
Dec: 0,
1361+
})
1362+
}
13521363
}
13531364
case mysql.MYSQL_TYPE_DATETIME2:
1354-
v, n, err = decodeDatetime2(data, meta)
1365+
v, n, err = decodeDatetime2(data, meta, e.parseTime)
13551366
v = e.parseFracTime(v)
13561367
case mysql.MYSQL_TYPE_TIME:
13571368
n = 3
@@ -1676,7 +1687,7 @@ func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Loc
16761687

16771688
const DATETIMEF_INT_OFS int64 = 0x8000000000
16781689

1679-
func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
1690+
func decodeDatetime2(data []byte, dec uint16, parseTime bool) (interface{}, int, error) {
16801691
// get datetime binary length
16811692
n := int(5 + (dec+1)/2)
16821693

@@ -1726,8 +1737,8 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
17261737
// minute = 0 = 0b000000
17271738
// second = 0 = 0b000000
17281739
// integer value = 0b1100100000010110000100000000000000000 = 107420450816
1729-
if intPart < 107420450816 {
1730-
return formatBeforeUnixZeroTime(year, month, day, hour, minute, second, int(frac), int(dec)), n, nil
1740+
if !parseTime || intPart < 107420450816 || month == 0 || day == 0 {
1741+
return formatDatetime(year, month, day, hour, minute, second, int(frac), int(dec)), n, nil
17311742
}
17321743

17331744
return fracTime{

replication/row_event_test.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,8 @@ func TestDecodeDatetime2(t *testing.T) {
675675
}{
676676
{[]byte("\xfe\xf3\xff\x7e\xfb"), 0, true, "9999-12-31 23:59:59"},
677677
{[]byte("\x99\x9a\xb8\xf7\xaa"), 0, true, "2016-10-28 15:30:42"},
678+
{[]byte("\x99\x98\x38\xf7\xaa"), 0, false, "2016-00-28 15:30:42"},
679+
{[]byte("\x99\x9a\x80\xf7\xaa"), 0, false, "2016-10-00 15:30:42"},
678680
{[]byte("\x99\x02\xc2\x00\x00"), 0, true, "1970-01-01 00:00:00"},
679681
{[]byte("\x80\x00\x00\x00\x00"), 0, false, "0000-00-00 00:00:00"},
680682
{[]byte("\x80\x00\x02\xf1\x05"), 0, false, "0000-00-01 15:04:05"},
@@ -684,17 +686,20 @@ func TestDecodeDatetime2(t *testing.T) {
684686
{[]byte("\x80\x03\x82\x00\x00\x01\xe2\x40"), uint16(6), false, "0001-01-01 00:00:00.123456"},
685687
}
686688
for _, tc := range testcases {
687-
value, _, err := decodeDatetime2(tc.data, tc.dec)
688-
require.NoError(t, err)
689-
switch v := value.(type) {
690-
case fracTime:
691-
require.True(t, tc.getFracTime)
692-
require.Equal(t, tc.expected, v.String())
693-
case string:
694-
require.False(t, tc.getFracTime)
695-
require.Equal(t, tc.expected, v)
696-
default:
697-
require.FailNow(t, "invalid value type: %T", value)
689+
for _, parseTime := range []bool{true, false} {
690+
value, _, err := decodeDatetime2(tc.data, tc.dec, parseTime)
691+
require.NoError(t, err)
692+
switch v := value.(type) {
693+
case fracTime:
694+
require.True(t, parseTime)
695+
require.True(t, tc.getFracTime)
696+
require.Equal(t, tc.expected, v.String())
697+
case string:
698+
require.False(t, parseTime && tc.getFracTime)
699+
require.Equal(t, tc.expected, v)
700+
default:
701+
require.FailNow(t, "invalid value type: %T", value)
702+
}
698703
}
699704
}
700705
}

replication/time.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func formatZeroTime(frac int, dec int) string {
4444
return s[0 : len(s)-(6-dec)]
4545
}
4646

47-
func formatBeforeUnixZeroTime(year, month, day, hour, minute, second, frac, dec int) string {
47+
func formatDatetime(year, month, day, hour, minute, second, frac, dec int) string {
4848
if dec == 0 {
4949
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second)
5050
}

0 commit comments

Comments
 (0)