Skip to content

Commit

Permalink
Merge pull request #247 from brokercap/v2.x
Browse files Browse the repository at this point in the history
v2.1.1-beta
  • Loading branch information
jc3wish authored Sep 17, 2023
2 parents 0ff7d2f + 6ced480 commit 201eec4
Show file tree
Hide file tree
Showing 22 changed files with 2,043 additions and 611 deletions.
20 changes: 13 additions & 7 deletions Bristol/mysql/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,20 @@ func (This *BinlogDump) startConnAndDumpBinlog() {
This.parser.callbackErrChan <- fmt.Errorf(StatusFlagName(STATUS_RUNNING))
This.parser.connectionId = connectionId
ctx, cancelFun := context.WithCancel(This.context.ctx)
go This.checkDumpConnection(ctx, connectionId)
go This.checkDumpConnection(ctx, cancelFun)
//*** get connection id end

This.checksumEnabled()
if This.parser.isGTID == false {
This.mysqlConn.DumpBinlog(This.parser, This.CallbackFun)
} else {
This.mysqlConn.DumpBinlogGtid(This.parser, This.CallbackFun)
}
go func() {
// 这里使用defer 是担心DumBinlog 方法异常了,导致需要重连的情况下,而不是 checkDumpConnection 里发现连接异常了
defer cancelFun()
if This.parser.isGTID == false {
This.mysqlConn.DumpBinlog(This.parser, This.CallbackFun)
} else {
This.mysqlConn.DumpBinlogGtid(This.parser, This.CallbackFun)
}
}()
<-ctx.Done()
This.BinlogConnCLose(true)
This.RLock()
This.Status = This.parser.dumpBinLogStatus
Expand All @@ -279,7 +284,7 @@ func (This *BinlogDump) startConnAndDumpBinlog() {
This.parser.KillConnect(connectionId)
}

func (This *BinlogDump) checkDumpConnection(ctx context.Context, connectionId string) {
func (This *BinlogDump) checkDumpConnection(ctx context.Context, cancelFunc context.CancelFunc) {
defer func() {
if err := recover(); err != nil {
log.Println("binlog.go checkDumpConnection err:", err)
Expand Down Expand Up @@ -328,6 +333,7 @@ func (This *BinlogDump) checkDumpConnection(ctx context.Context, connectionId st
if m != nil {
if _, ok = m["TIME"]; !ok {
log.Println("This.mysqlConn close ,connectionId: ", connectionId)
cancelFunc()
This.BinlogConnCLose0(true)
return
}
Expand Down
10 changes: 6 additions & 4 deletions README.EN.MD
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ Your star is the biggest support for Bifrost!!!
| Kafka | NO | YES | canal | canal json to bifrost data |
| kafka | NO | YES | bifrost | bifrost to kafka,kafka to bifrost |
| kafka | NO | YES | debezium | debezium for mysql data |
| Mongo | NO | YES | oplog | |
| Mongo | NO | YES | oplog | |
| kafka | NO | YES | customer json | customer json to bifrost struct |

---

#### **WIKI** : [https://wiki.xbifrost.com/en](https://wiki.xbifrost.com/en)
Expand Down Expand Up @@ -115,11 +117,11 @@ After compiling, the corresponding platform name folder will be created in the t
##### Binary

```
wget https://github.com/brokercap/Bifrost/releases/download/v2.0.6-beta/bifrost_v2.0.6-beta_Linux-amd64-bin.tar.gz
wget https://github.com/brokercap/Bifrost/releases/download/v2.1.1-beta/bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz
tar -zxvf bifrost_v2.0.6-beta_Linux-amd64-bin.tar.gz
tar -zxvf bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz
cd bifrost_v2.0.6-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
cd bifrost_v2.1.1-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
```

Expand Down
7 changes: 4 additions & 3 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
| kafka | NO | YES | bifrost | Bifrost可以作为源写入到kafka,再由Bifrost消费kafka的数据 |
| kafka | NO | YES | debezium | debezium将数据写入kafka,Bifrost消费kafka的数据自动转换处理 |
| Mongo | NO | YES | oplog | |
| kafka | NO | YES | customer json | 支持自定义JSON格式数据转换Bifrost格式的功能 |

---

Expand Down Expand Up @@ -119,11 +120,11 @@ make install prefix=./target
##### 二进制文件安装
`````sh

wget https://github.com/brokercap/Bifrost/releases/download/v2.0.6-beta/bifrost_v2.0.6-beta_Linux-amd64-bin.tar.gz
wget https://github.com/brokercap/Bifrost/releases/download/v2.1.1-beta/bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz

tar -zxvf bifrost_v2.0.6-beta_Linux-amd64-bin.tar.gz
tar -zxvf bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz

cd bifrost_v2.0.6-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
cd bifrost_v2.1.1-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*

`````

Expand Down
13 changes: 13 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
v2.1.1-beta 2023-09-17
1. 修复同步配置字段映射显示undefined的bug
https://github.com/brokercap/Bifrost/issues/237
2. output ClickHouse支持选择配置自动创建MergeTree引擎表
https://github.com/brokercap/Bifrost/issues/244
3. 修复mysql源连接断开没被发现的bug
https://github.com/brokercap/Bifrost/issues/188

v2.1.0-beta 2023-09-03
1. 支持input kafka 自定义配置数据格式转换成Bifrost格式
2. output clickhouse 支持json.Number数据类型转换成String
3. 修复mongo input 在update事件中,但更新的数据字典实际为nil导致异常的bug

v2.0.6-beta 2023-06-18
1. 修复目标库clickhouse显示同步成功,但实际失败的bug
https://github.com/brokercap/Bifrost/issues/233
Expand Down
2 changes: 1 addition & 1 deletion config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ limitations under the License.

package config

const VERSION = "v2.0.6-beta"
const VERSION = "v2.1.1-beta"
168 changes: 168 additions & 0 deletions input/kafka/input_customer_json_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kafka

import (
"github.com/Shopify/sarama"
"log"
"runtime/debug"
"strings"

inputDriver "github.com/brokercap/Bifrost/input/driver"
outputDriver "github.com/brokercap/Bifrost/plugin/driver"
)

const InputCustomerJsonData = "customer_json_kafka"

func init() {
inputDriver.Register(InputCustomerJsonData, NewCustomerJsonDataInput, VERSION, BIFROST_VERSION)
}

type CustomerJsonDataInput struct {
InputKafka
pluginCustomerDataObj *outputDriver.PluginDataCustomerJson
hadTransferConfig bool
}

func NewCustomerJsonDataInput() inputDriver.Driver {
return NewCustomerJsonDataInput0()
}

func NewCustomerJsonDataInput0() *CustomerJsonDataInput {
c := &CustomerJsonDataInput{}
c.Init()
c.childCallBack = c.CallBack
c.pluginCustomerDataObj, _ = outputDriver.NewPluginDataCustomerJson()
return c
}

func (c *CustomerJsonDataInput) TransferConfig() {
if c.hadTransferConfig {
return
}
defer func() {
c.hadTransferConfig = true
}()

key2rowPath := c.tansferConfigKey2Row(c.getConfig("input.key2row"))
c.pluginCustomerDataObj.SetKey2Row(key2rowPath)

databasePath := c.tansferConfigPath(c.getConfig("input.database"))
c.pluginCustomerDataObj.SetDatabasePath(databasePath)

tablePath := c.tansferConfigPath(c.getConfig("input.table"))
c.pluginCustomerDataObj.SetTablePath(tablePath)

pksPath := c.tansferConfigPath(c.getConfig("input.pks"))
c.pluginCustomerDataObj.SetPksPath(pksPath)

updateNewDataPath := c.tansferConfigPath(c.getConfig("input.update_new_data"))
c.pluginCustomerDataObj.SetUpdateNewDataPath(updateNewDataPath)

updateOldDataPath := c.tansferConfigPath(c.getConfig("input.update_old_data"))
c.pluginCustomerDataObj.SetUpdateOldDataPath(updateOldDataPath)

insertDataPath := c.tansferConfigPath(c.getConfig("input.insert_data"))
c.pluginCustomerDataObj.SetInsertDataPath(insertDataPath)

deleteDataPath := c.tansferConfigPath(c.getConfig("input.delete_data"))
c.pluginCustomerDataObj.SetDeleteDataPath(deleteDataPath)

eventTypePath := c.tansferConfigPath(c.getConfig("input.event.type"))
c.pluginCustomerDataObj.SetEventTypePath(eventTypePath)

if _, ok := c.config.ParamMap["input.event.type.val.insert"]; ok {
c.pluginCustomerDataObj.SetEventTypeValInsert(c.config.ParamMap["input.event.type.val.insert"])
}
if _, ok := c.config.ParamMap["input.event.type.val.select"]; ok {
c.pluginCustomerDataObj.SetEventTypeValSelect(c.config.ParamMap["input.event.type.val.select"])
}
if _, ok := c.config.ParamMap["input.event.type.val.update"]; ok {
c.pluginCustomerDataObj.SetEventTypeValUpdate(c.config.ParamMap["input.event.type.val.update"])
}
if _, ok := c.config.ParamMap["input.event.type.val.delete"]; ok {
c.pluginCustomerDataObj.SetEventTypeValDelete(c.config.ParamMap["input.event.type.val.delete"])
}
}

func (c *CustomerJsonDataInput) getConfig(key string) *string {
if val, ok := c.config.ParamMap[key]; ok {
return &val
}
return nil
}

func (c *CustomerJsonDataInput) tansferConfigKey2Row(config *string) (key2Row []outputDriver.PluginCustomerJsonDataKey2Row) {
if config == nil {
return
}
tmpArr := strings.Split(*config, ",")
for _, v := range tmpArr {
tmpArr0 := strings.Split(v, ":")
if len(tmpArr0) > 2 {
log.Printf("[ERROR] input:%s childInit input.key2row: %s is not valid,use like a.b:name please! \n", InputCustomerJsonData, v)
continue
}
var name string
if len(tmpArr0) == 2 {
name = tmpArr0[1]
} else {
name = tmpArr0[0]
}
keyPath := strings.Split(tmpArr0[0], ".")
key2Row = append(key2Row, outputDriver.PluginCustomerJsonDataKey2Row{Name: name, Path: keyPath})
}
return
}

func (c *CustomerJsonDataInput) tansferConfigPath(config *string) (path []string) {
if config == nil {
return
}
path = strings.Split(*config, ".")
return
}

func (c *CustomerJsonDataInput) CallBack(kafkaMsg *sarama.ConsumerMessage) error {
if c.callback == nil {
return nil
}
c.TransferConfig()
defer func() {
if err := recover(); err != nil {
log.Printf("%s CallBack recover err:%+v \n", InputCustomerJsonData, err)
log.Println(string(debug.Stack()))
}
}()
err := c.pluginCustomerDataObj.Decoder(kafkaMsg.Value)
if err != nil {
return err
}
data := c.pluginCustomerDataObj.ToBifrostOutputPluginData()
if data == nil {
log.Printf("[ERROR] input:%s ToBifrostOutputPluginData nil, kafkaMsg:%+v \n", InputCustomerJsonData, string(kafkaMsg.Value))
return nil
}
data.Gtid = c.SetTopicPartitionOffsetAndReturnGTID(kafkaMsg)
data.EventSize = uint32(len(kafkaMsg.Value))
data.BinlogFileNum = 1
data.BinlogPosition = 0
data.EventID = c.getNextEventID()
data.AliasSchemaName = kafkaMsg.Topic
data.AliasTableName = c.FormatPartitionTableName(kafkaMsg.Partition)
c.ToInputCallback(data)
return nil
}
Loading

0 comments on commit 201eec4

Please sign in to comment.