Skip to content

Commit bf1f86b

Browse files
authored
prefer atomic values to atomic operations (#1048)
not sure why there's no atomic.NewUint32, so leave baseConnID alone
1 parent 447a516 commit bf1f86b

File tree

3 files changed

+7
-10
lines changed

3 files changed

+7
-10
lines changed

canal/canal.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Canal struct {
5151
includeTableRegex []*regexp.Regexp
5252
excludeTableRegex []*regexp.Regexp
5353

54-
delay *uint32
54+
delay atomic.Uint32
5555

5656
ctx context.Context
5757
cancel context.CancelFunc
@@ -85,8 +85,6 @@ func NewCanal(cfg *Config) (*Canal, error) {
8585
}
8686
c.master = &masterInfo{logger: c.cfg.Logger}
8787

88-
c.delay = new(uint32)
89-
9088
var err error
9189

9290
if err = c.prepareDumper(); err != nil {
@@ -195,7 +193,7 @@ func (c *Canal) prepareDumper() error {
195193
}
196194

197195
func (c *Canal) GetDelay() uint32 {
198-
return atomic.LoadUint32(c.delay)
196+
return c.delay.Load()
199197
}
200198

201199
// Run will first try to dump all data from MySQL master `mysqldump`,

canal/sync.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package canal
22

33
import (
44
"log/slog"
5-
"sync/atomic"
65
"time"
76

87
"github.com/go-mysql-org/go-mysql/mysql"
@@ -259,7 +258,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
259258
if now >= ev.Header.Timestamp {
260259
newDelay = now - ev.Header.Timestamp
261260
}
262-
atomic.StoreUint32(c.delay, newDelay)
261+
c.delay.Store(newDelay)
263262
}
264263

265264
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {

replication/parser.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type BinlogParser struct {
3333
timestampStringLocation *time.Location
3434

3535
// used to start/stop processing
36-
stopProcessing uint32
36+
stopProcessing atomic.Bool
3737

3838
useDecimal bool
3939
useFloatWithTrailingZero bool
@@ -54,11 +54,11 @@ func NewBinlogParser() *BinlogParser {
5454
}
5555

5656
func (p *BinlogParser) Stop() {
57-
atomic.StoreUint32(&p.stopProcessing, 1)
57+
p.stopProcessing.Store(true)
5858
}
5959

6060
func (p *BinlogParser) Resume() {
61-
atomic.StoreUint32(&p.stopProcessing, 0)
61+
p.stopProcessing.Store(false)
6262
}
6363

6464
func (p *BinlogParser) Reset() {
@@ -166,7 +166,7 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool,
166166
}
167167

168168
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
169-
for atomic.LoadUint32(&p.stopProcessing) != 1 {
169+
for !p.stopProcessing.Load() {
170170
done, err := p.parseSingleEvent(r, onEvent)
171171
if err != nil {
172172
if err == errMissingTableMapEvent {

0 commit comments

Comments
 (0)