Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

respect MySQL server time zone #348

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
dist: xenial

language: go

go:
- "1.11"

services:
- elasticsearch

addons:
apt:
sources:
- mysql-5.7-trusty
packages:
- mysql-server
- mysql-client
- mysql

before_install:
- sudo mysql -e "use mysql; update user set authentication_string=PASSWORD('') where User='root'; update user set plugin='mysql_native_password';FLUSH PRIVILEGES;"
Expand Down
1 change: 1 addition & 0 deletions etc/river.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = ""
my_charset = "utf8"
my_timezone = "Local"

# Set true when elasticsearch use https
#es_https = false
Expand Down
22 changes: 18 additions & 4 deletions river/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ type SourceConfig struct {

// Config is the configuration
type Config struct {
MyAddr string `toml:"my_addr"`
MyUser string `toml:"my_user"`
MyPassword string `toml:"my_pass"`
MyCharset string `toml:"my_charset"`
MyAddr string `toml:"my_addr"`
MyUser string `toml:"my_user"`
MyPassword string `toml:"my_pass"`
MyCharset string `toml:"my_charset"`
MyTimezone TomeLocation `toml:"my_timezone"`

ESHttps bool `toml:"es_https"`
ESAddr string `toml:"es_addr"`
Expand Down Expand Up @@ -59,6 +60,7 @@ func NewConfigWithFile(name string) (*Config, error) {
// NewConfig creates a Config from data.
func NewConfig(data string) (*Config, error) {
var c Config
c.MyTimezone = TomeLocation{time.Local}

_, err := toml.Decode(data, &c)
if err != nil {
Expand All @@ -79,3 +81,15 @@ func (d *TomlDuration) UnmarshalText(text []byte) error {
d.Duration, err = time.ParseDuration(string(text))
return err
}

// TomeLocation supports time.Location codec for TOML format.
type TomeLocation struct {
*time.Location
}

// UnmarshalText implementes TOML UnmarshalText
func (l *TomeLocation) UnmarshalText(text []byte) error {
var err error
l.Location, err = time.LoadLocation(string(text))
return err
}
1 change: 1 addition & 0 deletions river/river_extra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) {
cfg.MyAddr = *myAddr
cfg.MyUser = "root"
cfg.MyPassword = ""
cfg.MyTimezone = TomeLocation{time.Local}
cfg.ESAddr = *esAddr

cfg.ServerID = 1001
Expand Down
18 changes: 17 additions & 1 deletion river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var myAddr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr")
var esAddr = flag.String("es_addr", "127.0.0.1:9200", "Elasticsearch addr")
var dateTimeStr = time.Now().Format(mysql.TimeFormat)
var dateTimeStr = time.Now().In(time.UTC).Format(mysql.TimeFormat)
var dateStr = time.Now().Format(mysqlDateFormat)

func Test(t *testing.T) {
Expand Down Expand Up @@ -77,6 +77,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
cfg.MyUser = "root"
cfg.MyPassword = ""
cfg.MyCharset = "utf8"
cfg.MyTimezone = TomeLocation{time.UTC}
cfg.ESAddr = *esAddr

cfg.ServerID = 1001
Expand Down Expand Up @@ -205,6 +206,18 @@ type = "river"
c.Assert(cfg.Sources, HasLen, 1)
c.Assert(cfg.Sources[0].Tables, HasLen, 4)
c.Assert(cfg.Rules, HasLen, 4)
c.Assert(cfg.MyTimezone.Location, Equals, time.Local)

str = `
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = ""
my_charset = "utf8"
my_timezone = "Asia/Shanghai"
`
cfg, err = NewConfig(str)
c.Assert(err, IsNil)
c.Assert(cfg.MyTimezone.Location.String(), Equals, "Asia/Shanghai")
}

func (s *riverTestSuite) testExecute(c *C, query string, args ...interface{}) {
Expand Down Expand Up @@ -369,6 +382,9 @@ func (s *riverTestSuite) TestRiver(c *C) {
c.Assert(r.Found, IsTrue)
tdt, _ := time.Parse(time.RFC3339, r.Source["tdatetime"].(string))
c.Assert(tdt.Format(mysql.TimeFormat), Equals, dateTimeStr)
tdtZoneName, tdtZoneOffset := tdt.Zone()
c.Assert(tdtZoneName, Equals, "UTC")
c.Assert(tdtZoneOffset, Equals, 0)
c.Assert(r.Source["tdate"], Equals, dateStr)

r = s.testElasticGet(c, "20")
Expand Down
2 changes: 1 addition & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in
case schema.TYPE_DATETIME, schema.TYPE_TIMESTAMP:
switch v := value.(type) {
case string:
vt, err := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
vt, err := time.ParseInLocation(mysql.TimeFormat, string(v), r.c.MyTimezone.Location)
if err != nil || vt.IsZero() { // failed to parse date or zero date
return nil
}
Expand Down