Skip to content

Commit b75db0e

Browse files
authored
Merge pull request #1214 from PapaPiya/fix_kafka_sender
[PDR-17200][feat(kafka)]: 增加kafka sender version配置
2 parents e199be8 + 45d2ac9 commit b75db0e

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

sender/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,14 @@ var ModeKeyOptions = map[string][]Option{
986986
Description: "SASL密码(sasl_password)",
987987
ToolTip: "使用SASL协议进行鉴权的密码",
988988
},
989+
{
990+
KeyName: KeyKafkaVersion,
991+
ChooseOnly: false,
992+
Default: "",
993+
DefaultNoUse: false,
994+
Description: "kafka版本(kafka_version)",
995+
Advance: true,
996+
},
989997
{
990998
KeyName: KeyKafkaRetryMax,
991999
ChooseOnly: false,

sender/config/models.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ const (
222222
KeyKafkaCompression = "kafka_compression" //压缩模式,有none, gzip, snappy
223223
KeyKafkaTimeout = "kafka_timeout" //连接超时时间
224224
KeyKafkaKeepAlive = "kafka_keep_alive" //保持连接时长
225+
KeyKafkaVersion = "kafka_version" //版本
225226
KeyMaxMessageBytes = "max_message_bytes" //每条消息最大字节数
226227
KeyGZIPCompressionLevel = "gzip_compression_level" //GZIP压缩日志的策略
227228
KeyGZIPCompressionNo = "仅打包不压缩"

sender/kafka/kafka.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"time"
1010

1111
"github.com/Shopify/sarama"
12-
"github.com/json-iterator/go"
12+
jsoniter "github.com/json-iterator/go"
1313
"github.com/rcrowley/go-metrics"
1414

1515
"github.com/qiniu/log"
@@ -86,6 +86,7 @@ func NewSender(conf conf.MapConf) (kafkaSender sender.Sender, err error) {
8686
keepAlive, _ := conf.GetStringOr(KeyKafkaKeepAlive, "0")
8787
maxMessageBytes, _ := conf.GetIntOr(KeyMaxMessageBytes, 4*1024*1024)
8888
gzipCompressionLevel, _ := conf.GetStringOr(KeyGZIPCompressionLevel, KeyGZIPCompressionDefault)
89+
version, _ := conf.GetStringOr(KeyKafkaVersion, "")
8990

9091
name, _ := conf.GetStringOr(KeyName, fmt.Sprintf("kafkaSender:(kafkaUrl:%s,topic:%s)", hosts, topic))
9192

@@ -108,8 +109,10 @@ func NewSender(conf conf.MapConf) (kafkaSender sender.Sender, err error) {
108109
if !ok {
109110
return nil, fmt.Errorf("unknown compression mode: '%v'", compression)
110111
}
111-
if compressionMode == sarama.CompressionLZ4 {
112-
cfg.Version = sarama.V0_10_0_0
112+
if version != "" {
113+
if cfg.Version, err = sarama.ParseKafkaVersion(version); err != nil {
114+
return nil, err
115+
}
113116
}
114117
cfg.Producer.Compression = compressionMode
115118
cfg.Net.DialTimeout, err = time.ParseDuration(timeout)

0 commit comments

Comments
 (0)