- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1k
feat: Charset parameter for Mysql connection #1027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
ef55c4c
              6855741
              e6565cd
              cb4f084
              8d7d13c
              f81397d
              f77308e
              824d650
              61a148b
              8cc642f
              05b87bb
              2025086
              fa2e8fa
              e95e5cd
              fdd11a6
              d947c9e
              73477d3
              eb2ea8e
              007d419
              c6c9248
              d15a7ce
              4298599
              1f7d701
              98108dd
              411b75d
              9bd5965
              c349655
              e7d51c3
              c3593a7
              fd2c582
              3d213ae
              b34a98d
              7222b87
              b4851e0
              b939f18
              88f0609
              0799a57
              a2053cd
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -65,6 +65,10 @@ func NewCanal(cfg *Config) (*Canal, error) { | |
|  | ||
| c.ctx, c.cancel = context.WithCancel(context.Background()) | ||
|  | ||
| if cfg.WaitTimeBetweenConnectionSeconds > 0 { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be  | ||
| cfg.WaitTimeBetweenConnectionSeconds = time.Duration(5) * time.Second | ||
| } | ||
|  | ||
| c.dumpDoneCh = make(chan struct{}) | ||
| c.eventHandler = &DummyEventHandler{} | ||
| c.parser = parser.New() | ||
|  | @@ -192,6 +196,7 @@ func (c *Canal) RunFrom(pos mysql.Position) error { | |
| return c.Run() | ||
| } | ||
|  | ||
| // Start from selected GTIDSet | ||
| func (c *Canal) StartFromGTID(set mysql.GTIDSet) error { | ||
| c.master.UpdateGTIDSet(set) | ||
|  | ||
|  | @@ -238,15 +243,17 @@ func (c *Canal) run() error { | |
| } | ||
|  | ||
| func (c *Canal) Close() { | ||
| log.Infof("closing canal") | ||
| log.Debugf("closing canal") | ||
| c.m.Lock() | ||
| defer c.m.Unlock() | ||
|  | ||
| c.cancel() | ||
| c.syncer.Close() | ||
| c.connLock.Lock() | ||
| c.conn.Close() | ||
| c.conn = nil | ||
| if c.conn != nil { | ||
| c.conn.Close() | ||
| c.conn = nil | ||
| } | ||
| c.connLock.Unlock() | ||
|  | ||
| _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) | ||
|  | @@ -413,20 +420,21 @@ func (c *Canal) checkBinlogRowFormat() error { | |
|  | ||
| func (c *Canal) prepareSyncer() error { | ||
| cfg := replication.BinlogSyncerConfig{ | ||
| ServerID: c.cfg.ServerID, | ||
| Flavor: c.cfg.Flavor, | ||
| User: c.cfg.User, | ||
| Password: c.cfg.Password, | ||
| Charset: c.cfg.Charset, | ||
| HeartbeatPeriod: c.cfg.HeartbeatPeriod, | ||
| ReadTimeout: c.cfg.ReadTimeout, | ||
| UseDecimal: c.cfg.UseDecimal, | ||
| ParseTime: c.cfg.ParseTime, | ||
| SemiSyncEnabled: c.cfg.SemiSyncEnabled, | ||
| MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, | ||
| DisableRetrySync: c.cfg.DisableRetrySync, | ||
| TimestampStringLocation: c.cfg.TimestampStringLocation, | ||
| TLSConfig: c.cfg.TLSConfig, | ||
| ServerID: c.cfg.ServerID, | ||
| Flavor: c.cfg.Flavor, | ||
| User: c.cfg.User, | ||
| Password: c.cfg.Password, | ||
| Charset: c.cfg.Charset, | ||
| HeartbeatPeriod: c.cfg.HeartbeatPeriod, | ||
| ReadTimeout: c.cfg.ReadTimeout, | ||
| UseDecimal: c.cfg.UseDecimal, | ||
| ParseTime: c.cfg.ParseTime, | ||
| SemiSyncEnabled: c.cfg.SemiSyncEnabled, | ||
| MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, | ||
| DisableRetrySync: c.cfg.DisableRetrySync, | ||
| TimestampStringLocation: c.cfg.TimestampStringLocation, | ||
| TLSConfig: c.cfg.TLSConfig, | ||
| WaitTimeBetweenConnectionSeconds: c.cfg.WaitTimeBetweenConnectionSeconds, | ||
| } | ||
|  | ||
| if strings.Contains(c.cfg.Addr, "/") { | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -2,6 +2,7 @@ package canal | |
|  | ||
| import ( | ||
| "fmt" | ||
| "strings" | ||
| "sync/atomic" | ||
| "time" | ||
|  | ||
|  | @@ -22,15 +23,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { | |
| if err != nil { | ||
| return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err) | ||
| } | ||
| log.Infof("start sync binlog at binlog file %v", pos) | ||
| log.Debugf("start sync binlog at binlog file %v", pos) | ||
|          | ||
| return s, nil | ||
| } else { | ||
| gsetClone := gset.Clone() | ||
| s, err := c.syncer.StartSyncGTID(gset) | ||
| if err != nil { | ||
| return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err) | ||
| } | ||
| log.Infof("start sync binlog at GTID set %v", gsetClone) | ||
| log.Debugf("start sync binlog at GTID set %v", gsetClone) | ||
| return s, nil | ||
| } | ||
| } | ||
|  | @@ -65,7 +66,7 @@ func (c *Canal) runSyncBinlog() error { | |
| switch e := ev.Event.(type) { | ||
| case *replication.RotateEvent: | ||
| fakeRotateLogName = string(e.NextLogName) | ||
| log.Infof("received fake rotate event, next log name is %s", e.NextLogName) | ||
| log.Debugf("received fake rotate event, next log name is %s", e.NextLogName) | ||
| } | ||
|  | ||
| continue | ||
|  | @@ -92,7 +93,7 @@ func (c *Canal) runSyncBinlog() error { | |
| case *replication.RotateEvent: | ||
| pos.Name = string(e.NextLogName) | ||
| pos.Pos = uint32(e.Position) | ||
| log.Infof("rotate binlog to %s", pos) | ||
| log.Debugf("rotate binlog to %s", pos) | ||
| savePos = true | ||
| force = true | ||
| if err = c.eventHandler.OnRotate(e); err != nil { | ||
|  | @@ -142,7 +143,15 @@ func (c *Canal) runSyncBinlog() error { | |
| case *replication.QueryEvent: | ||
| stmts, _, err := c.parser.Parse(string(e.Query), "", "") | ||
| if err != nil { | ||
| log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) | ||
| msg := err.Error() | ||
| if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parsing error message like this is a bad practice. Don't you have an error code should could catch? | ||
| // Cut the first row from the message since it contain the procedure call and not the entire message | ||
| fl := strings.Split(msg, "\n") | ||
| log.Debugf("parse SP Error: (%s)", fl[0]) | ||
| } else { | ||
| log.Debugf("parse query(%s) err %v", e.Query, err) | ||
| } | ||
| log.Debugln("will skip this event") | ||
| continue | ||
| } | ||
| for _, stmt := range stmts { | ||
|  | @@ -232,7 +241,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { | |
|  | ||
| func (c *Canal) updateTable(db, table string) (err error) { | ||
| c.ClearTableCache([]byte(db), []byte(table)) | ||
| log.Infof("table structure changed, clear table cache: %s.%s\n", db, table) | ||
| log.Debugf("table structure changed, clear table cache: %s.%s\n", db, table) | ||
| if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { | ||
| return errors.Trace(err) | ||
| } | ||
|  | @@ -270,38 +279,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { | |
| return errors.Errorf("%s not supported now", e.Header.EventType) | ||
| } | ||
| events := newRowsEvent(t, action, ev.Rows, e.Header) | ||
| events.Header.Gtid = c.SyncedGTIDSet() | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why this is changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 | ||
| return c.eventHandler.OnRow(events) | ||
| } | ||
|  | ||
| func (c *Canal) FlushBinlog() error { | ||
| _, err := c.Execute("FLUSH BINARY LOGS") | ||
| return errors.Trace(err) | ||
| } | ||
|  | ||
| func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { | ||
| timer := time.NewTimer(timeout) | ||
| for { | ||
| select { | ||
| case <-timer.C: | ||
| return errors.Errorf("wait position %v too long > %s", pos, timeout) | ||
| default: | ||
| err := c.FlushBinlog() | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| curPos := c.master.Position() | ||
| if curPos.Compare(pos) >= 0 { | ||
| return nil | ||
| } else { | ||
| log.Debugf("master pos is %v, wait catching %v", curPos, pos) | ||
| time.Sleep(100 * time.Millisecond) | ||
| } | ||
| } | ||
| } | ||
|  | ||
| return nil | ||
| } | ||
| 
      Comment on lines
    
      -276
     to 
      -303
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this break the API of the package I don't see why they were moved as functions accepting Canal | ||
|  | ||
| func (c *Canal) GetMasterPos() (mysql.Position, error) { | ||
| rr, err := c.Execute("SHOW MASTER STATUS") | ||
| if err != nil { | ||
|  | @@ -336,12 +317,3 @@ func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) { | |
| } | ||
| return gset, nil | ||
| } | ||
|  | ||
| func (c *Canal) CatchMasterPos(timeout time.Duration) error { | ||
| pos, err := c.GetMasterPos() | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|  | ||
| return c.WaitUntilPos(pos, timeout) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the log level should be kept, other developers may rely on this behaviour.
Also for other files.