Skip to content

Commit 7f3b46b

Browse files
committed
feat: consumer add log id
1 parent 7e1c08e commit 7f3b46b

File tree

8 files changed

+228
-9
lines changed

8 files changed

+228
-9
lines changed

consumer/worker_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,40 @@ func TestConsumerQueryNoData(t *testing.T) {
8282
worker.StopAndWait()
8383

8484
}
85+
86+
func TestConsumerWithLogId(t *testing.T) {
87+
option := LogHubConfig{
88+
Endpoint: os.Getenv("LOG_TEST_ENDPOINT"),
89+
CredentialsProvider: sls.NewStaticCredentialsProvider(
90+
os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
91+
os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), ""),
92+
Project: os.Getenv("LOG_TEST_PROJECT"),
93+
Logstore: os.Getenv("LOG_TEST_LOGSTORE"),
94+
ConsumerGroupName: "test-consumer",
95+
ConsumerName: "test-consumer-1",
96+
CursorPosition: END_CURSOR,
97+
}
98+
99+
worker := InitConsumerWorkerWithCheckpointTracker(option, process_with_log_id)
100+
101+
worker.Start()
102+
time.Sleep(time.Second * 2000)
103+
worker.StopAndWait()
104+
105+
}
106+
107+
func process_with_log_id(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) {
108+
fmt.Printf("time: %s, shardId %d processing works success, logGroupSize: %d, currentCursor: %s\n",
109+
time.Now().Format("2006-01-02 15:04:05 000"),
110+
shardId, len(logGroupList.LogGroups),
111+
checkpointTracker.GetCurrentCursor())
112+
for _, logGroup := range logGroupList.LogGroups {
113+
lgoGroupId := logGroup.GetLogGroupId()
114+
fmt.Println("log group id: ", lgoGroupId)
115+
for i, log := range logGroup.Logs {
116+
log_id := logGroup.GetLogId(i)
117+
fmt.Printf("log %d has %d keys, and log id: %s\n", i, len(log.Contents), log_id)
118+
}
119+
}
120+
return "", nil
121+
}

example/consumer/with_log_id/main.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"time"
7+
8+
sls "github.com/aliyun/aliyun-log-go-sdk"
9+
consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
10+
)
11+
12+
func main() {
13+
option := consumerLibrary.LogHubConfig{
14+
Endpoint: os.Getenv("LOG_TEST_ENDPOINT"),
15+
CredentialsProvider: sls.NewStaticCredentialsProvider(
16+
os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
17+
os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), ""),
18+
Project: os.Getenv("LOG_TEST_PROJECT"),
19+
Logstore: os.Getenv("LOG_TEST_LOGSTORE"),
20+
ConsumerGroupName: "test-consumer",
21+
ConsumerName: "test-consumer-1",
22+
CursorPosition: consumerLibrary.END_CURSOR,
23+
}
24+
25+
worker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process_with_log_id)
26+
27+
worker.Start()
28+
defer worker.StopAndWait()
29+
for {
30+
time.Sleep(time.Second)
31+
}
32+
// worker.StopAndWait()
33+
}
34+
35+
func process_with_log_id(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
36+
fmt.Printf("time: %s, shardId %d processing works success, logGroupSize: %d,\n",
37+
time.Now().Format("2006-01-02 15:04:05 000"),
38+
shardId, len(logGroupList.LogGroups))
39+
40+
// start consume logs
41+
for _, logGroup := range logGroupList.LogGroups {
42+
// logGroupId is empty string if failed
43+
logGroupId := logGroup.GetLogGroupId() // eg: "4|MTczMjE1NDI4NzIxMzIzMjAzNg=="
44+
fmt.Println("log group id: ", logGroupId)
45+
46+
for i, log := range logGroup.Logs {
47+
// log id is empty string if failed
48+
log_id := logGroup.GetLogId(i) // eg: "4|MTczMjE1NDI4NzIxMzIzMjAzNg==|5|4"
49+
fmt.Printf("log %d has %d keys, and log id: %s\n", i, len(log.Contents), log_id)
50+
}
51+
}
52+
return "", nil
53+
}

log.pb.go

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

log_store.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -547,11 +547,13 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogM
547547
if err != nil {
548548
return nil, nil, err
549549
}
550+
readLastCursor, _ := parseHeaderString(r.Header, "X-Log-Read-Last-Cursor")
550551
pullMeta := &PullLogMeta{
551-
RawSize: rawSize,
552-
NextCursor: nextCursor,
553-
Netflow: netflow,
554-
Count: count,
552+
RawSize: rawSize,
553+
NextCursor: nextCursor,
554+
Netflow: netflow,
555+
Count: count,
556+
readLastCursor: readLastCursor,
555557
}
556558
// If query is not nil, extract more headers
557559
if plr.Query != "" {
@@ -640,7 +642,9 @@ func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm
640642
if err != nil {
641643
return nil, nil, err
642644
}
643-
645+
if plm.Count > 0 && plm.readLastCursor != "" {
646+
gl.addIdIfPossible(plm.readLastCursor, plr.ShardID)
647+
}
644648
return
645649
}
646650

model.go

+55-4
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package sls
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"net/http"
67
"net/url"
78
"strconv"
89
"strings"
10+
11+
"github.com/go-kit/kit/log/level"
912
)
1013

1114
// GetLogRequest for GetLogsV2
@@ -75,10 +78,11 @@ func (plr *PullLogRequest) ToURLParams() url.Values {
7578
}
7679

7780
type PullLogMeta struct {
78-
NextCursor string
79-
Netflow int
80-
RawSize int
81-
Count int
81+
NextCursor string
82+
Netflow int
83+
RawSize int
84+
Count int
85+
readLastCursor string // int64 string, eg: "1732154287213232020"
8286
// these fields are only present when query is set
8387
RawSizeBeforeQuery int // processed raw size before query
8488
Lines int // result lines after query
@@ -375,3 +379,50 @@ type ListStoreViewsResponse struct {
375379
Count int `json:"count"`
376380
StoreViews []string `json:"storeviews"`
377381
}
382+
383+
type logGroupIdentity struct {
384+
cursor string
385+
shard int
386+
}
387+
388+
// GetLogGroupId returns the log group id (shard|cursor)
389+
// If id is unknown, returns empty string
390+
func (l *LogGroup) GetLogGroupId() string {
391+
if l.identity == nil {
392+
return ""
393+
}
394+
return fmt.Sprintf("%d|%s", l.identity.shard, l.identity.cursor)
395+
}
396+
397+
// index must be less than len(LogGroup.Logs)
398+
// if no log id found, the returned log id is empty string
399+
func (l *LogGroup) GetLogId(index int) string {
400+
if index > len(l.Logs) {
401+
return ""
402+
}
403+
logGroupId := l.GetLogGroupId()
404+
if logGroupId == "" {
405+
return ""
406+
}
407+
return fmt.Sprintf("%s|%d|%d", logGroupId, len(l.Logs), index)
408+
}
409+
410+
func (l *LogGroupList) addIdIfPossible(readLastCursor string, shard int) error {
411+
lastCursorInt, err := strconv.ParseInt(readLastCursor, 10, 64)
412+
if err != nil {
413+
if IsDebugLevelMatched(1) {
414+
level.Error(Logger).Log("msg", "decode readLastCursor failed",
415+
"cursor", readLastCursor, "err", err)
416+
}
417+
return err
418+
}
419+
cursor := lastCursorInt - int64(len(l.LogGroups)) + 1
420+
for i := 0; i < len(l.LogGroups); i++ {
421+
l.LogGroups[i].identity = &logGroupIdentity{
422+
cursor: encodeCursor(cursor),
423+
shard: shard,
424+
}
425+
cursor++
426+
}
427+
return nil
428+
}

model_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"testing"
77

8+
"github.com/golang/protobuf/proto"
89
"github.com/stretchr/testify/assert"
910
)
1011

@@ -70,3 +71,39 @@ func TestIndex_MarshalJSON(t *testing.T) {
7071
}
7172

7273
}
74+
75+
func TestLogGroupIdentity(t *testing.T) {
76+
77+
sampleLog := &Log{
78+
Time: proto.Uint32(1732774880),
79+
Contents: []*LogContent{
80+
{
81+
Key: proto.String("test"),
82+
Value: proto.String("test"),
83+
},
84+
},
85+
}
86+
logGroupList := &LogGroupList{
87+
LogGroups: []*LogGroup{
88+
{
89+
Logs: []*Log{sampleLog},
90+
},
91+
{
92+
Logs: []*Log{sampleLog, sampleLog},
93+
},
94+
{
95+
Logs: []*Log{sampleLog, sampleLog, sampleLog},
96+
},
97+
},
98+
}
99+
err := logGroupList.addIdIfPossible("MTcyOTA3MDYxODQyODA0NDY1NQ==", 0)
100+
assert.NoError(t, err)
101+
102+
assert.Equal(t, "0|MTcyOTA3MDYxODQyODA0NDY1NQ==", logGroupList.LogGroups[2].GetLogGroupId())
103+
assert.Equal(t, "0|MTcyOTA3MDYxODQyODA0NDY1NA==", logGroupList.LogGroups[1].GetLogGroupId())
104+
assert.Equal(t, "0|MTcyOTA3MDYxODQyODA0NDY1Mw==", logGroupList.LogGroups[0].GetLogGroupId())
105+
empty := &LogGroup{}
106+
assert.Equal(t, empty.GetLogGroupId(), "")
107+
assert.Equal(t, "0|MTcyOTA3MDYxODQyODA0NDY1NA==|2|1", logGroupList.LogGroups[1].GetLogId(1))
108+
assert.Equal(t, "0|MTcyOTA3MDYxODQyODA0NDY1NQ==|3|0", logGroupList.LogGroups[2].GetLogId(0))
109+
}

util_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package sls
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestCursorEncode(t *testing.T) {
10+
assert.Equal(t, encodeCursor(1729070618428044655), "MTcyOTA3MDYxODQyODA0NDY1NQ==")
11+
cursor, err := decodeCursor("MTcyOTA3MDYxODQyODA0NDY1NQ==")
12+
assert.Nil(t, err)
13+
assert.Equal(t, cursor, int64(1729070618428044655))
14+
15+
assert.Equal(t, encodeCursor(0), "MA==")
16+
cursor, err = decodeCursor("MA==")
17+
assert.Nil(t, err)
18+
assert.Equal(t, cursor, int64(0))
19+
}

utils.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sls
22

33
import (
4+
"encoding/base64"
45
"fmt"
56
"net/http"
67
"strconv"
@@ -50,3 +51,19 @@ func parseHeaderString(header http.Header, headerName string) (string, error) {
5051
}
5152
return v[0], nil
5253
}
54+
55+
func decodeCursor(cursor string) (int64, error) {
56+
c, err := base64.StdEncoding.DecodeString(cursor)
57+
if err != nil {
58+
return 0, err
59+
}
60+
cursorInt, err := strconv.ParseInt(string(c), 10, 64)
61+
if err != nil {
62+
return 0, err
63+
}
64+
return cursorInt, nil
65+
}
66+
67+
func encodeCursor(cursor int64) string {
68+
return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(cursor, 10)))
69+
}

0 commit comments

Comments
 (0)