Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store masterinfo data in elasticsearch #253

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions etc/river.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,9 @@ es_pass = ""
# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
# Beta feature: Store to elasticsearch
# data_dir = "es:http://username:password@hostname:9200/index/type?id=1001"
# recommend id = server_id
data_dir = "./var"

# Inner Http status address
175 changes: 175 additions & 0 deletions river/elasticsearch_masterinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package river

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"

"github.com/juju/errors"
"github.com/siddontang/go-mysql-elasticsearch/elastic"
"github.com/siddontang/go-mysql/mysql"
"gopkg.in/birkirb/loggers.v1/log"
)

type elasticsearchMasterInfo struct {
sync.RWMutex

es *elastic.Client

Name string
Pos uint32
index string
docType string
id string
}

func loadElasticsearchMasterInfo(dataPath string) (masterInfo, error) {
var m elasticsearchMasterInfo

if !strings.HasPrefix(dataPath, "es:") {
return &m, errors.Errorf("error elasticsearch prefix: %s", dataPath)
}
esURL, err := url.Parse(dataPath[3:])
if err != nil {
return &m, err
}

cfg := new(elastic.ClientConfig)
cfg.Addr = esURL.Host
if esURL.User != nil {
cfg.User = esURL.User.Username()
cfg.Password, _ = esURL.User.Password()
}
cfg.Https = esURL.Scheme == "https"
m.es = elastic.NewClient(cfg)

paths := strings.Split(esURL.Path, "/")
m.index = paths[1]
m.docType = paths[2]

m.id = "1"
id := esURL.Query().Get("id")
if id != "" {
m.id = id
}

err = m.loadMasterInfo()
if err != nil {
return nil, err
}

return &m, nil
}

func (m *elasticsearchMasterInfo) Save(pos mysql.Position) error {
log.Infof("save position %s", pos)

m.Lock()
defer m.Unlock()

m.Name = pos.Name
m.Pos = pos.Pos
doc := map[string]interface{}{
"name": m.Name,
"pos": m.Pos,
}
err := m.es.Update(m.index, m.docType, m.id, doc)
if err != nil {
log.Errorf("ES MasterInfo save error: %s", err)
return err
}
return nil
}

func (m *elasticsearchMasterInfo) Position() mysql.Position {
m.RLock()
defer m.RUnlock()

return mysql.Position{
Name: m.Name,
Pos: m.Pos,
}
}

func (m *elasticsearchMasterInfo) Close() error {
pos := m.Position()
return m.Save(pos)
}

func (m *elasticsearchMasterInfo) loadMasterInfo() error {
mapping, err := m.es.GetMapping(m.index, m.docType)
if err != nil || mapping.Code == http.StatusNotFound {
err = m.createMapping()
}

info, err := m.es.Get(m.index, m.docType, m.id)
if err == nil && (info.Code == http.StatusOK || info.Code == http.StatusNotFound) {
item := info.ResponseItem
if item.Found {
source := item.Source
m.Name = source["name"].(string)
m.Pos = uint32(source["pos"].(float64))
}
return nil
}
return errors.Wrap(err, errors.New("loadMasterInfo error"))
}

func (m *elasticsearchMasterInfo) createMapping() error {
version, _ := m.getEsVersion()
nameType := "keyword"
if version < 5 {
nameType = "string"
}
mapping := map[string]interface{}{
"properties": map[string]interface{}{
"name": map[string]interface{}{
"type": nameType,
},
"pos": map[string]interface{}{
"type": "long",
},
},
}

return m.es.CreateMapping(m.index, m.docType, mapping)
}

func (m *elasticsearchMasterInfo) getEsVersion() (int64, error) {
reqURL := fmt.Sprintf("%s://%s/", m.es.Protocol, m.es.Addr)
resp, err := m.es.DoRequest("GET", reqURL, bytes.NewBuffer(nil))
if err != nil {
return 0, err
}
defer resp.Body.Close()

var esinfo EsinfoResponse
err = json.NewDecoder(resp.Body).Decode(&esinfo)

version := esinfo.Version.Number
if version == "" {
return 0, errors.Errorf("unknow version")
}
v := regexp.MustCompile("^\\d+").FindString(version)
mainVersion, err := strconv.ParseInt(v, 10, 32)
if err == nil {
return mainVersion, nil
}
return 0, errors.Errorf("unknow version")
}

type EsinfoResponse struct {
Name string `json:"name"`
ClusterName string `json:"cluster_name"`
Version EsVersionResponse `json:"version"`
}

type EsVersionResponse struct {
Number string
}
12 changes: 6 additions & 6 deletions river/master.go → river/file_masterinfo.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ import (
"gopkg.in/birkirb/loggers.v1/log"
)

type masterInfo struct {
type fileMasterInfo struct {
sync.RWMutex

Name string `toml:"bin_name"`
@@ -24,8 +24,8 @@ type masterInfo struct {
lastSaveTime time.Time
}

func loadMasterInfo(dataDir string) (*masterInfo, error) {
var m masterInfo
func loadFileMasterInfo(dataDir string) (masterInfo, error) {
var m fileMasterInfo

if len(dataDir) == 0 {
return &m, nil
@@ -50,7 +50,7 @@ func loadMasterInfo(dataDir string) (*masterInfo, error) {
return &m, errors.Trace(err)
}

func (m *masterInfo) Save(pos mysql.Position) error {
func (m *fileMasterInfo) Save(pos mysql.Position) error {
log.Infof("save position %s", pos)

m.Lock()
@@ -82,7 +82,7 @@ func (m *masterInfo) Save(pos mysql.Position) error {
return errors.Trace(err)
}

func (m *masterInfo) Position() mysql.Position {
func (m *fileMasterInfo) Position() mysql.Position {
m.RLock()
defer m.RUnlock()

@@ -92,7 +92,7 @@ func (m *masterInfo) Position() mysql.Position {
}
}

func (m *masterInfo) Close() error {
func (m *fileMasterInfo) Close() error {
pos := m.Position()

return m.Save(pos)
21 changes: 21 additions & 0 deletions river/masterinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package river

import (
"strings"

"github.com/siddontang/go-mysql/mysql"
)

type masterInfo interface {
Save(pos mysql.Position) error
Position() mysql.Position
Close() error
}

func loadMasterInfo(dataPath string) (masterInfo, error) {
if strings.HasPrefix(dataPath, "es:") {
return loadElasticsearchMasterInfo(dataPath)
}

return loadFileMasterInfo(dataPath)
}
2 changes: 1 addition & 1 deletion river/river.go
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ type River struct {

st *stat

master *masterInfo
master masterInfo

syncCh chan interface{}
}