Skip to content

Commit 28a2e74

Browse files
dgupta2323Deepak Gupta
andauthored
Enhancement to allow backup binlogs using a specified GTID set (#1050)
* Enhancement to allow syncing binlogs using a specified GTID set * Run gofumpt --------- Co-authored-by: Deepak Gupta <[email protected]>
1 parent 4a082cf commit 28a2e74

File tree

2 files changed

+167
-26
lines changed

2 files changed

+167
-26
lines changed

replication/backup.go

Lines changed: 89 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p mysql.Position, timeout t
2727
}
2828
}
2929

30+
func (b *BinlogSyncer) StartBackupGTID(backupDir string, gset mysql.GTIDSet, timeout time.Duration) error {
31+
err := os.MkdirAll(backupDir, 0o755)
32+
if err != nil {
33+
return errors.Trace(err)
34+
}
35+
if b.cfg.SynchronousEventHandler == nil {
36+
return b.StartBackupWithHandlerAndGTID(gset, timeout, func(filename string) (io.WriteCloser, error) {
37+
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
38+
})
39+
} else {
40+
return b.StartSynchronousBackupWithGTID(gset, timeout)
41+
}
42+
}
43+
3044
// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
3145
// The process will continue until the timeout is reached or an error occurs.
3246
// This method should not be used together with SynchronousEventHandler.
@@ -54,52 +68,72 @@ func (b *BinlogSyncer) StartBackupWithHandler(p mysql.Position, timeout time.Dur
5468
backupHandler := &BackupEventHandler{
5569
handler: handler,
5670
}
57-
5871
s, err := b.StartSync(p)
5972
if err != nil {
6073
return errors.Trace(err)
6174
}
75+
return processWithHandler(b, s, backupHandler, timeout)
76+
}
6277

63-
defer func() {
64-
if backupHandler.w != nil {
65-
closeErr := backupHandler.w.Close()
66-
if retErr == nil {
67-
retErr = closeErr
68-
}
69-
}
70-
}()
78+
// StartBackupWithHandlerAndGTID starts the backup process for the binary log using the specified GTID set and handler.
79+
// - gset: The GTID set from which to begin the backup.
80+
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
81+
// If set to 0, a default very long timeout (30 days) is used instead.
82+
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
83+
func (b *BinlogSyncer) StartBackupWithHandlerAndGTID(gset mysql.GTIDSet, timeout time.Duration,
84+
handler func(binlogFilename string) (io.WriteCloser, error),
85+
) (retErr error) {
86+
if timeout == 0 {
87+
// a very long timeout here
88+
timeout = 30 * 3600 * 24 * time.Second
89+
}
90+
if b.cfg.SynchronousEventHandler != nil {
91+
return errors.New("StartBackupWithHandlerAndGTID cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackupWithGTID instead.")
92+
}
7193

72-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
73-
defer cancel()
94+
// Force use raw mode
95+
b.parser.SetRawMode(true)
7496

75-
for {
76-
select {
77-
case <-ctx.Done():
78-
return nil
79-
case <-b.ctx.Done():
80-
return nil
81-
case err := <-s.ech:
82-
return errors.Trace(err)
83-
case e := <-s.ch:
84-
err = backupHandler.HandleEvent(e)
85-
if err != nil {
86-
return errors.Trace(err)
87-
}
88-
}
97+
// Set up the backup event handler
98+
backupHandler := &BackupEventHandler{
99+
handler: handler,
100+
}
101+
102+
s, err := b.StartSyncGTID(gset)
103+
if err != nil {
104+
return errors.Trace(err)
89105
}
106+
return processWithHandler(b, s, backupHandler, timeout)
90107
}
91108

92109
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
93110
func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Duration) error {
94111
if b.cfg.SynchronousEventHandler == nil {
95112
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
96113
}
97-
98114
s, err := b.StartSync(p)
99115
if err != nil {
100116
return errors.Trace(err)
101117
}
102118

119+
return process(b, s, timeout)
120+
}
121+
122+
// StartSynchronousBackupWithGTID starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig with a specified GTID set.
123+
func (b *BinlogSyncer) StartSynchronousBackupWithGTID(gset mysql.GTIDSet, timeout time.Duration) error {
124+
if b.cfg.SynchronousEventHandler == nil {
125+
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackupWithGTID")
126+
}
127+
128+
s, err := b.StartSyncGTID(gset)
129+
if err != nil {
130+
return errors.Trace(err)
131+
}
132+
133+
return process(b, s, timeout)
134+
}
135+
136+
func process(b *BinlogSyncer, s *BinlogStreamer, timeout time.Duration) error {
103137
var ctx context.Context
104138
var cancel context.CancelFunc
105139

@@ -123,6 +157,35 @@ func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Dur
123157
}
124158
}
125159

160+
func processWithHandler(b *BinlogSyncer, s *BinlogStreamer, backupHandler *BackupEventHandler, timeout time.Duration) (retErr error) {
161+
defer func() {
162+
if backupHandler.w != nil {
163+
closeErr := backupHandler.w.Close()
164+
if retErr == nil {
165+
retErr = closeErr
166+
}
167+
}
168+
}()
169+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
170+
defer cancel()
171+
172+
for {
173+
select {
174+
case <-ctx.Done():
175+
return nil
176+
case <-b.ctx.Done():
177+
return nil
178+
case err := <-s.ech:
179+
return errors.Trace(err)
180+
case e := <-s.ch:
181+
err := backupHandler.HandleEvent(e)
182+
if err != nil {
183+
return errors.Trace(err)
184+
}
185+
}
186+
}
187+
}
188+
126189
// BackupEventHandler handles writing events for backup
127190
type BackupEventHandler struct {
128191
handler func(binlogFilename string) (io.WriteCloser, error)

replication/backup_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package replication
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"os"
78
"path"
89
"time"
910

11+
"github.com/google/uuid"
12+
1013
"github.com/stretchr/testify/require"
1114

1215
"github.com/go-mysql-org/go-mysql/mysql"
@@ -61,6 +64,16 @@ func (t *testSyncerSuite) TestSyncBackup() {
6164
testBackup(t, true) // true indicates synchronous mode
6265
}
6366

67+
// TestAsyncBackupWithGTID runs the backup process in asynchronous mode with GTID and verifies binlog file creation.
68+
func (t *testSyncerSuite) TestAsyncBackupWithGTID() {
69+
testBackUpWithGTID(t, false) // false indicates asynchronous mode
70+
}
71+
72+
// TestSyncBackupWithGTID runs the backup process in synchronous mode with GTID and verifies binlog file creation.
73+
func (t *testSyncerSuite) TestSyncBackupWithGTID() {
74+
testBackUpWithGTID(t, true) // true indicates synchronous mode
75+
}
76+
6477
// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
6578
func testBackup(t *testSyncerSuite, isSynchronous bool) {
6679
t.setupTest(mysql.MySQLFlavor)
@@ -111,6 +124,71 @@ func testBackup(t *testSyncerSuite, isSynchronous bool) {
111124
}
112125
}
113126

127+
func testBackUpWithGTID(t *testSyncerSuite, isSynchronous bool) {
128+
t.setupTest(mysql.MySQLFlavor)
129+
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled
130+
131+
binlogDir := "./var"
132+
os.RemoveAll(binlogDir)
133+
timeout := 3 * time.Second
134+
135+
if isSynchronous {
136+
// Set up a BackupEventHandler for synchronous mode
137+
backupHandler := NewBackupEventHandler(
138+
func(filename string) (io.WriteCloser, error) {
139+
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
140+
},
141+
)
142+
t.b.cfg.SynchronousEventHandler = backupHandler
143+
} else {
144+
// Ensure SynchronousEventHandler is nil for asynchronous mode
145+
t.b.cfg.SynchronousEventHandler = nil
146+
}
147+
148+
r, err := t.c.Execute("SELECT @@gtid_mode")
149+
require.NoError(t.T(), err)
150+
modeOn, _ := r.GetString(0, 0)
151+
if modeOn != "ON" {
152+
t.T().Skip("GTID mode is not ON")
153+
}
154+
155+
r, err = t.c.Execute("SHOW GLOBAL VARIABLES LIKE 'SERVER_UUID'")
156+
require.NoError(t.T(), err)
157+
158+
var masterUuid uuid.UUID
159+
if s, _ := r.GetString(0, 1); len(s) > 0 && s != "NONE" {
160+
masterUuid, err = uuid.Parse(s)
161+
require.NoError(t.T(), err)
162+
}
163+
164+
set, _ := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d-%d", masterUuid.String(), 1, 2))
165+
done := make(chan bool)
166+
167+
// Start the backup process in a goroutine
168+
go func() {
169+
err := t.b.StartBackupGTID(binlogDir, set, timeout)
170+
require.NoError(t.T(), err)
171+
done <- true
172+
}()
173+
174+
failTimeout := 2 * timeout
175+
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
176+
defer cancel()
177+
178+
// Wait for the backup to complete or timeout
179+
select {
180+
case <-done:
181+
files, err := os.ReadDir(binlogDir)
182+
require.NoError(t.T(), err, "Failed to read binlog directory")
183+
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
184+
mode := modeLabel(isSynchronous)
185+
t.T().Logf("Backup completed successfully in %s mode using GTID with %d binlog file(s).", mode, len(files))
186+
case <-ctx.Done():
187+
mode := modeLabel(isSynchronous)
188+
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
189+
}
190+
}
191+
114192
func modeLabel(isSynchronous bool) string {
115193
if isSynchronous {
116194
return "synchronous"

0 commit comments

Comments
 (0)