Skip to content

Commit e8f9ed9

Browse files
authored
feat: support override consumer group config and refine/fix some code (aliyun#197)
* feat: support override consumer group config and refine/fix some code 1. override old consumer group rather than skip 2. fix UpdateConsumerGroup always failed 3. refine codes * refine: modify consumer group Stringer impl and log * fix: no need to update when there's no difference
1 parent 9e39fa5 commit e8f9ed9

File tree

7 files changed

+99
-21
lines changed

7 files changed

+99
-21
lines changed

client_consumer.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ type ConsumerGroup struct {
1919
InOrder bool `json:"order"`
2020
}
2121

22+
func (cg *ConsumerGroup) String() string {
23+
return fmt.Sprintf("%+v", cg)
24+
}
25+
2226
// ConsumerGroupCheckPoint type define
2327
type ConsumerGroupCheckPoint struct {
2428
ShardID int `json:"shard"`
2529
CheckPoint string `json:"checkpoint"`
26-
UpdateTime int64 `json:"updateTime"`
30+
UpdateTime int64 `json:"updateTime"`
2731
Consumer string `json:"consumer"`
2832
}
2933

@@ -35,6 +39,9 @@ func (c *Client) CreateConsumerGroup(project, logstore string, cg ConsumerGroup)
3539
}
3640

3741
body, err := json.Marshal(cg)
42+
if err != nil {
43+
return err
44+
}
3845
uri := fmt.Sprintf("/logstores/%v/consumergroups", logstore)
3946
_, err = c.request(project, "POST", uri, h, body)
4047
if err != nil {
@@ -50,7 +57,15 @@ func (c *Client) UpdateConsumerGroup(project, logstore string, cg ConsumerGroup)
5057
"Content-Type": "application/json",
5158
}
5259

53-
body, err := json.Marshal(cg)
60+
updates := make(map[string]interface{})
61+
updates["order"] = cg.InOrder
62+
if cg.Timeout > 0 {
63+
updates["timeout"] = cg.Timeout
64+
}
65+
body, err := json.Marshal(updates)
66+
if err != nil {
67+
return err
68+
}
5469
uri := fmt.Sprintf("/logstores/%v/consumergroups/%v", logstore, cg.ConsumerGroupName)
5570
_, err = c.request(project, "PUT", uri, h, body)
5671
if err != nil {

consumer/consumer_client.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package consumerLibrary
22

33
import (
4+
"fmt"
45
"time"
56

67
sls "github.com/aliyun/aliyun-log-go-sdk"
@@ -31,15 +32,15 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
3132
AccessKeyID: option.AccessKeyID,
3233
AccessKeySecret: option.AccessKeySecret,
3334
SecurityToken: option.SecurityToken,
34-
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
35+
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
3536
}
3637
if option.HTTPClient != nil {
3738
client.SetHTTPClient(option.HTTPClient)
3839
}
3940
consumerGroup := sls.ConsumerGroup{
40-
option.ConsumerGroupName,
41-
option.HeartbeatIntervalInSecond * 3,
42-
option.InOrder,
41+
ConsumerGroupName: option.ConsumerGroupName,
42+
Timeout: option.HeartbeatIntervalInSecond * 3,
43+
InOrder: option.InOrder,
4344
}
4445
consumerClient := &ConsumerClient{
4546
option,
@@ -51,18 +52,37 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
5152
return consumerClient
5253
}
5354

54-
func (consumer *ConsumerClient) createConsumerGroup() {
55-
err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup)
55+
func (consumer *ConsumerClient) createConsumerGroup() error {
56+
consumerGroups, err := consumer.client.ListConsumerGroup(consumer.option.Project, consumer.option.Logstore)
5657
if err != nil {
57-
if slsError, ok := err.(*sls.Error); ok {
58-
if slsError.Code == "ConsumerGroupAlreadyExist" {
59-
level.Info(consumer.logger).Log("msg", "New consumer join the consumer group", "consumer name", consumer.option.ConsumerName, "group name", consumer.option.ConsumerGroupName)
58+
return fmt.Errorf("list consumer group failed: %w", err)
59+
}
60+
alreadyExist := false
61+
for _, cg := range consumerGroups {
62+
if cg.ConsumerGroupName == consumer.consumerGroup.ConsumerGroupName {
63+
alreadyExist = true
64+
if (*cg) != consumer.consumerGroup {
65+
level.Info(consumer.logger).Log("msg", "this config is different from original config, try to override it", "old_config", cg)
6066
} else {
61-
level.Error(consumer.logger).Log("msg", "create consumer group error", "error", err)
62-
67+
level.Info(consumer.logger).Log("msg", "new consumer join the consumer group", "consumer name", consumer.option.ConsumerName,
68+
"group name", consumer.option.ConsumerGroupName)
69+
return nil
6370
}
6471
}
6572
}
73+
if alreadyExist {
74+
if err := consumer.client.UpdateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
75+
return fmt.Errorf("update consumer group failed: %w", err)
76+
}
77+
} else {
78+
if err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
79+
if slsError, ok := err.(*sls.Error); !ok || slsError.Code != "ConsumerGroupAlreadyExist" {
80+
return fmt.Errorf("create consumer group failed: %w", err)
81+
}
82+
}
83+
}
84+
85+
return nil
6686
}
6787

6888
func (consumer *ConsumerClient) heartBeat(heart []int) ([]int, error) {

consumer/consumer_client_test.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package consumerLibrary
22

33
import (
4-
"github.com/aliyun/aliyun-log-go-sdk"
4+
"fmt"
55
"testing"
6+
7+
sls "github.com/aliyun/aliyun-log-go-sdk"
8+
"github.com/go-kit/kit/log"
9+
"github.com/stretchr/testify/assert"
610
)
711

812
func InitOption() LogHubConfig {
@@ -36,7 +40,6 @@ func consumerGroup() sls.ConsumerGroup {
3640
}
3741

3842
func TestConsumerClient_createConsumerGroup(t *testing.T) {
39-
4043
type fields struct {
4144
option LogHubConfig
4245
client *sls.Client
@@ -57,3 +60,41 @@ func TestConsumerClient_createConsumerGroup(t *testing.T) {
5760
consumer.createConsumerGroup()
5861
}
5962
}
63+
64+
func internalGetConsumerGroup(client *sls.Client, project, logstore, groupName string) (sls.ConsumerGroup, error) {
65+
cgs, err := client.ListConsumerGroup(project, logstore)
66+
if err != nil {
67+
return sls.ConsumerGroup{}, err
68+
}
69+
for _, cg := range cgs {
70+
if cg.ConsumerGroupName == groupName {
71+
return *cg, nil
72+
}
73+
}
74+
75+
return sls.ConsumerGroup{}, fmt.Errorf("consumer group not found")
76+
}
77+
78+
func TestConsumerClient_updateConsumerGroup(t *testing.T) {
79+
logger := log.NewNopLogger()
80+
oldOption := InitOption()
81+
newOption := oldOption
82+
newOption.HeartbeatIntervalInSecond += 20
83+
oldClient := initConsumerClient(oldOption, logger)
84+
newClient := initConsumerClient(newOption, logger)
85+
// ready
86+
_ = oldClient.client.DeleteConsumerGroup(oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
87+
assert.NotEqual(t, newClient.consumerGroup, oldClient.consumerGroup)
88+
// old config
89+
assert.Nil(t, oldClient.createConsumerGroup())
90+
cg, err := internalGetConsumerGroup(oldClient.client, oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
91+
assert.Nil(t, err)
92+
assert.Equal(t, cg, oldClient.consumerGroup)
93+
// new config
94+
assert.Nil(t, newClient.createConsumerGroup())
95+
cg, err = internalGetConsumerGroup(oldClient.client, oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
96+
assert.Nil(t, err)
97+
assert.Equal(t, cg, newClient.consumerGroup)
98+
// clean
99+
_ = oldClient.client.DeleteConsumerGroup(oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
100+
}

consumer/shard_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (consumer *ShardConsumerWorker) consume() {
164164
func (consumer *ShardConsumerWorker) consumerShutDown() {
165165
consumer.consumerShutDownFlag = true
166166
if !consumer.isShutDownComplete() {
167-
if consumer.getIsFlushCheckpointDoneStatus() == true {
167+
if consumer.getIsFlushCheckpointDoneStatus() {
168168
consumer.consume()
169169
} else {
170170
return

consumer/worker.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str
3434
do: do,
3535
Logger: logger,
3636
}
37-
consumerClient.createConsumerGroup()
37+
if err := consumerClient.createConsumerGroup(); err != nil {
38+
level.Error(consumerWorker.Logger).Log(
39+
"msg", "possibly failed to create or update consumer group, please check worker run log",
40+
"err", err)
41+
}
3842
return consumerWorker
3943
}
4044

@@ -65,7 +69,7 @@ func (consumerWorker *ConsumerWorker) run() {
6569
break
6670
}
6771
shardConsumer := consumerWorker.getShardConsumer(shard)
68-
if shardConsumer.getConsumerIsCurrentDoneStatus() == true {
72+
if shardConsumer.getConsumerIsCurrentDoneStatus() {
6973
shardConsumer.consume()
7074
} else {
7175
continue

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,14 +354,12 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
354354
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
355355
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
356356
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
357-
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE=
358357
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
359358
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
360359
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
361360
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
362361
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
363362
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
364-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
365363
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
366364
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
367365
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

log_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,6 @@ type LogConfig struct {
490490
OutputType string `json:"outputType"`
491491
OutputDetail OutputDetail `json:"outputDetail"`
492492

493-
CreateTime uint32 `json:"createTime,omitempty`
493+
CreateTime uint32 `json:"createTime,omitempty"`
494494
LastModifyTime uint32 `json:"lastModifyTime,omitempty"`
495495
}

0 commit comments

Comments
 (0)