Skip to content

Commit 500de05

Browse files
authored
Merge branch 'aliyun:master' into master
2 parents 00e4788 + 6671a18 commit 500de05

10 files changed

+165
-49
lines changed

Diff for: client_interface.go

+3
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ type ClientInterface interface {
189189
GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
190190
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error)
191191

192+
GetLogsV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error)
193+
GetLogLinesV2(project, logstore string, req *GetLogRequest) (*GetLogLinesResponse, error)
194+
192195
// #################### Index Operations #####################
193196
// CreateIndex ...
194197
CreateIndex(project, logstore string, index Index) error

Diff for: client_store.go

+12
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,18 @@ func (c *Client) GetLogLines(project, logstore string, topic string, from int64,
197197
return ls.GetLogLines(topic, from, to, queryExp, maxLineNum, offset, reverse)
198198
}
199199

200+
// GetLogsV2 ...
201+
func (c *Client) GetLogsV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error) {
202+
ls := convertLogstore(c, project, logstore)
203+
return ls.GetLogsV2(req)
204+
}
205+
206+
// GetLogLinesV2 ...
207+
func (c *Client) GetLogLinesV2(project, logstore string, req *GetLogRequest) (*GetLogLinesResponse, error) {
208+
ls := convertLogstore(c, project, logstore)
209+
return ls.GetLogLinesV2(req)
210+
}
211+
200212
// CreateIndex ...
201213
func (c *Client) CreateIndex(project, logstore string, index Index) error {
202214
ls := convertLogstore(c, project, logstore)

Diff for: consumer/worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (consumerWorker *ConsumerWorker) StopAndWait() {
4848
consumerWorker.workerShutDownFlag.Store(true)
4949
consumerWorker.consumerHeatBeat.shutDownHeart()
5050
consumerWorker.waitGroup.Wait()
51-
level.Info(consumerWorker.Logger).Log("msg", "consumer worker %v stopped", "consumer name", consumerWorker.client.option.ConsumerName)
51+
level.Info(consumerWorker.Logger).Log("msg", "consumer worker stopped", "consumer name", consumerWorker.client.option.ConsumerName)
5252
}
5353

5454
func (consumerWorker *ConsumerWorker) run() {

Diff for: example/consumer/copy_data/copy_data.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// README :
1414
// This is an E2E test, which creates another logstore under the same project to simulate consumption.
15-
// If you don't want to use this method, you can comment out 31-45 lines of code and override your own process function
15+
// If you don't want to use this method, you can comment out 60-67 lines of code and override your own process function
1616
// Be careful not to change the parameter type of process function.
1717

1818
var option consumerLibrary.LogHubConfig

Diff for: log_config.go

+30-26
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ func ConvertToApsaraLogConfigInputDetail(detail InputDetailInterface) (*ApsaraLo
137137
// RegexConfigInputDetail regex log config
138138
type RegexConfigInputDetail struct {
139139
LocalFileConfigInputDetail
140-
Key []string `json:"key"`
141-
LogBeginRegex string `json:"logBeginRegex"`
142-
Regex string `json:"regex"`
140+
Key []string `json:"key"`
141+
LogBeginRegex string `json:"logBeginRegex"`
142+
Regex string `json:"regex"`
143+
CustomizedFields string `json:"customizedFields,omitempty"`
143144
}
144145

145146
// InitRegexConfigInputDetail ...
@@ -221,11 +222,12 @@ func ConvertToJSONConfigInputDetail(detail InputDetailInterface) (*JSONConfigInp
221222
// DelimiterConfigInputDetail delimiter log config
222223
type DelimiterConfigInputDetail struct {
223224
LocalFileConfigInputDetail
224-
Separator string `json:"separator"`
225-
Quote string `json:"quote"`
226-
Key []string `json:"key"`
227-
TimeKey string `json:"timeKey"`
228-
AutoExtend bool `json:"autoExtend"`
225+
Separator string `json:"separator"`
226+
Quote string `json:"quote"`
227+
Key []string `json:"key"`
228+
TimeKey string `json:"timeKey"`
229+
AutoExtend bool `json:"autoExtend"`
230+
AcceptNoEnoughKeys bool `json:"acceptNoEnoughKeys"`
229231
}
230232

231233
// InitDelimiterConfigInputDetail ...
@@ -271,24 +273,26 @@ func ConvertToDelimiterConfigInputDetail(detail InputDetailInterface) (*Delimite
271273
// LocalFileConfigInputDetail all file input detail's basic config
272274
type LocalFileConfigInputDetail struct {
273275
CommonConfigInputDetail
274-
LogType string `json:"logType"`
275-
LogPath string `json:"logPath"`
276-
FilePattern string `json:"filePattern"`
277-
TimeFormat string `json:"timeFormat"`
278-
TopicFormat string `json:"topicFormat,omitempty"`
279-
Preserve bool `json:"preserve"`
280-
PreserveDepth int `json:"preserveDepth"`
281-
FileEncoding string `json:"fileEncoding,omitempty"`
282-
DiscardUnmatch bool `json:"discardUnmatch"`
283-
MaxDepth int `json:"maxDepth"`
284-
TailExisted bool `json:"tailExisted"`
285-
DiscardNonUtf8 bool `json:"discardNonUtf8"`
286-
DelaySkipBytes int `json:"delaySkipBytes"`
287-
IsDockerFile bool `json:"dockerFile"`
288-
DockerIncludeLabel map[string]string `json:"dockerIncludeLabel,omitempty"`
289-
DockerExcludeLabel map[string]string `json:"dockerExcludeLabel,omitempty"`
290-
DockerIncludeEnv map[string]string `json:"dockerIncludeEnv,omitempty"`
291-
DockerExcludeEnv map[string]string `json:"dockerExcludeEnv,omitempty"`
276+
LogType string `json:"logType"`
277+
LogPath string `json:"logPath"`
278+
FilePattern string `json:"filePattern"`
279+
TimeFormat string `json:"timeFormat"`
280+
TopicFormat string `json:"topicFormat,omitempty"`
281+
Preserve bool `json:"preserve"`
282+
PreserveDepth int `json:"preserveDepth"`
283+
FileEncoding string `json:"fileEncoding,omitempty"`
284+
DiscardUnmatch bool `json:"discardUnmatch"`
285+
MaxDepth int `json:"maxDepth"`
286+
TailExisted bool `json:"tailExisted"`
287+
DiscardNonUtf8 bool `json:"discardNonUtf8"`
288+
DelaySkipBytes int `json:"delaySkipBytes"`
289+
IsDockerFile bool `json:"dockerFile"`
290+
DockerIncludeLabel map[string]string `json:"dockerIncludeLabel,omitempty"`
291+
DockerExcludeLabel map[string]string `json:"dockerExcludeLabel,omitempty"`
292+
DockerIncludeEnv map[string]string `json:"dockerIncludeEnv,omitempty"`
293+
DockerExcludeEnv map[string]string `json:"dockerExcludeEnv,omitempty"`
294+
PluginDetail map[string]interface{} `json:"plugin,omitempty"`
295+
Advanced map[string]interface{} `json:"advanced,omitempty"`
292296
}
293297

294298
func GetFileConfigInputDetailType(detail InputDetailInterface) (string, bool) {

Diff for: log_store.go

+31-16
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ type LogStore struct {
2727
AutoSplit bool `json:"autoSplit"`
2828
MaxSplitShard int `json:"maxSplitShard"`
2929
AppendMeta bool `json:"appendMeta"`
30-
ArchiveSecons int `json:"archiveSeconds"`
3130
TelemetryType string `json:"telemetryType"`
31+
HotTTL uint32 `json:"hot_ttl,omitempty"`
3232

3333
CreateTime uint32 `json:"createTime,omitempty"`
3434
LastModifyTime uint32 `json:"lastModifyTime,omitempty"`
@@ -534,23 +534,14 @@ func (s *LogStore) GetHistograms(topic string, from int64, to int64, queryExp st
534534
}
535535

536536
// getLogs query logs with [from, to) time range
537-
func (s *LogStore) getLogs(topic string, from int64, to int64, queryExp string,
538-
maxLineNum int64, offset int64, reverse bool) (*http.Response, []byte, *GetLogsResponse, error) {
537+
func (s *LogStore) getLogs(req *GetLogRequest) (*http.Response, []byte, *GetLogsResponse, error) {
539538

540539
h := map[string]string{
541540
"x-log-bodyrawsize": "0",
542541
"Accept": "application/json",
543542
}
544543

545-
urlVal := url.Values{}
546-
urlVal.Add("type", "log")
547-
urlVal.Add("from", strconv.Itoa(int(from)))
548-
urlVal.Add("to", strconv.Itoa(int(to)))
549-
urlVal.Add("topic", topic)
550-
urlVal.Add("line", strconv.Itoa(int(maxLineNum)))
551-
urlVal.Add("offset", strconv.Itoa(int(offset)))
552-
urlVal.Add("reverse", strconv.FormatBool(reverse))
553-
urlVal.Add("query", queryExp)
544+
urlVal := req.ToURLParams()
554545

555546
uri := fmt.Sprintf("/logstores/%s?%s", s.Name, urlVal.Encode())
556547
r, err := request(s.project, "GET", uri, h, nil)
@@ -591,11 +582,24 @@ func (s *LogStore) getLogs(topic string, from int64, to int64, queryExp string,
591582
}, nil
592583
}
593584

594-
// GetJsonLogs query logs with [from, to) time range
585+
// GetLogLines query logs with [from, to) time range
595586
func (s *LogStore) GetLogLines(topic string, from int64, to int64, queryExp string,
596587
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
597588

598-
rsp, b, logRsp, err := s.getLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
589+
var req GetLogRequest
590+
req.Topic = topic
591+
req.From = from
592+
req.To = to
593+
req.Query = queryExp
594+
req.Lines = maxLineNum
595+
req.Offset = offset
596+
req.Reverse = reverse
597+
return s.GetLogLinesV2(&req)
598+
}
599+
600+
// GetLogLinesV2 query logs with [from, to) time range
601+
func (s *LogStore) GetLogLinesV2(req *GetLogRequest) (*GetLogLinesResponse, error) {
602+
rsp, b, logRsp, err := s.getLogs(req)
599603
if err != nil {
600604
return nil, err
601605
}
@@ -616,8 +620,20 @@ func (s *LogStore) GetLogLines(topic string, from int64, to int64, queryExp stri
616620
// GetLogs query logs with [from, to) time range
617621
func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
618622
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
623+
var req GetLogRequest
624+
req.Topic = topic
625+
req.From = from
626+
req.To = to
627+
req.Query = queryExp
628+
req.Lines = maxLineNum
629+
req.Offset = offset
630+
req.Reverse = reverse
631+
return s.GetLogsV2(&req)
632+
}
619633

620-
rsp, b, logRsp, err := s.getLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
634+
// GetLogsV2 query logs with [from, to) time range
635+
func (s *LogStore) GetLogsV2(req *GetLogRequest) (*GetLogsResponse, error) {
636+
rsp, b, logRsp, err := s.getLogs(req)
621637
if err == nil && len(b) != 0 {
622638
logs := []map[string]string{}
623639
err = json.Unmarshal(b, &logs)
@@ -626,7 +642,6 @@ func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
626642
}
627643
logRsp.Logs = logs
628644
}
629-
630645
return logRsp, err
631646
}
632647

Diff for: logstore_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,39 @@ func createLogStore(s *LogstoreTestSuite) *LogStore {
6161
return store
6262
}
6363

64+
func (s *LogstoreTestSuite) TestCreateLogStoreWithNewRequestBody() {
65+
client := CreateNormalInterface(s.endpoint, s.accessKeyID, s.accessKeySecret, "")
66+
exist, ce := client.CheckProjectExist(s.projectName)
67+
s.Nil(ce)
68+
if !exist {
69+
_, cpe := client.CreateProject(s.projectName, "go sdk test")
70+
s.Nil(cpe)
71+
}
72+
defer client.DeleteProject(s.projectName)
73+
logStore := &LogStore{
74+
Name: s.logstoreName,
75+
TTL: 7,
76+
ShardCount: 2,
77+
WebTracking: false,
78+
AutoSplit: true,
79+
MaxSplitShard: 16,
80+
AppendMeta: false,
81+
}
82+
err := s.Project.CreateLogStoreV2(logStore)
83+
s.Nil(err)
84+
time.Sleep(time.Second * 10)
85+
store, err := s.Project.GetLogStore(s.logstoreName)
86+
s.Nil(err)
87+
s.Equal(s.logstoreName, store.Name)
88+
s.Equal(7, store.TTL)
89+
s.Equal(2, store.ShardCount)
90+
s.Equal(false, store.WebTracking)
91+
s.Equal(true, store.AutoSplit)
92+
s.Equal(16, store.MaxSplitShard)
93+
s.Equal(false, store.AppendMeta)
94+
s.Equal(uint32(0), store.HotTTL)
95+
}
96+
6497
func (s *LogstoreTestSuite) TestCheckLogStore() {
6598
store, err := s.Project.GetLogStore(s.logstoreName)
6699
s.Nil(err)
@@ -256,6 +289,7 @@ func (s *LogstoreTestSuite) TestGetLogs() {
256289
lResp, lErr := s.Logstore.GetLogs("", int64(beginTime), int64(endTime), "InternalServerError", 100, 0, false)
257290
s.Nil(lErr)
258291
s.Equal(lResp.Count, int64(logCount))
292+
fmt.Println(*lResp)
259293
}
260294

261295
func (s *LogstoreTestSuite) TestLogstore() {

Diff for: model.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,37 @@ package sls
22

33
import (
44
"encoding/json"
5+
"net/url"
6+
"strconv"
57
"strings"
68
)
79

10+
// GetLogRequest for GetLogsV2
11+
type GetLogRequest struct {
12+
From int64 // unix time, eg time.Now().Unix() - 900
13+
To int64 // unix time, eg time.Now().Unix()
14+
Topic string // @note topic is not used anymore, use __topic__ : xxx in query instead
15+
Lines int64 // max 100; offset, lines and reverse is ignored when use SQL in query
16+
Offset int64
17+
Reverse bool
18+
Query string
19+
PowerSQL bool
20+
}
21+
22+
func (glr *GetLogRequest) ToURLParams() url.Values {
23+
urlVal := url.Values{}
24+
urlVal.Add("type", "log")
25+
urlVal.Add("from", strconv.Itoa(int(glr.From)))
26+
urlVal.Add("to", strconv.Itoa(int(glr.To)))
27+
urlVal.Add("topic", glr.Topic)
28+
urlVal.Add("line", strconv.Itoa(int(glr.Lines)))
29+
urlVal.Add("offset", strconv.Itoa(int(glr.Offset)))
30+
urlVal.Add("reverse", strconv.FormatBool(glr.Reverse))
31+
urlVal.Add("powerSql", strconv.FormatBool(glr.PowerSQL))
32+
urlVal.Add("query", glr.Query)
33+
return urlVal
34+
}
35+
836
// GetHistogramsResponse defines response from GetHistograms call
937
type SingleHistogram struct {
1038
Progress string `json:"progress"`
@@ -36,7 +64,7 @@ type GetLogsResponse struct {
3664
// note: GetLogLinesResponse.Logs is nil when use GetLogLinesResponse
3765
type GetLogLinesResponse struct {
3866
GetLogsResponse
39-
Lines []json.RawMessage
67+
Lines []json.RawMessage
4068
}
4169

4270
func (resp *GetLogsResponse) IsComplete() bool {

Diff for: request.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,27 @@ import (
55
"crypto/md5"
66
"encoding/json"
77
"fmt"
8-
"github.com/go-kit/kit/log/level"
98
"io/ioutil"
109
"net/http"
1110
"net/http/httputil"
1211
"net/url"
1312
"time"
1413

14+
"github.com/go-kit/kit/log/level"
15+
1516
"github.com/cenkalti/backoff"
1617
"golang.org/x/net/context"
1718
)
1819

1920
// timeout configs
2021
var (
21-
defaultRequestTimeout = 10 * time.Second
22-
defaultRetryTimeout = 30 * time.Second
22+
defaultRequestTimeout = 60 * time.Second
23+
defaultRetryTimeout = 90 * time.Second
2324
defaultHttpClient = &http.Client{
2425
Timeout: defaultRequestTimeout,
2526
}
2627
)
2728

28-
2929
func retryReadErrorCheck(ctx context.Context, err error) (bool, error) {
3030
if err == nil {
3131
return false, nil

Diff for: token_auto_update_client.go

+20
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,26 @@ func (c *TokenAutoUpdateClient) GetHistograms(project, logstore string, topic st
669669
return
670670
}
671671

672+
func (c *TokenAutoUpdateClient) GetLogsV2(project, logstore string, req *GetLogRequest) (r *GetLogsResponse, err error) {
673+
for i := 0; i < c.maxTryTimes; i++ {
674+
r, err = c.logClient.GetLogsV2(project, logstore, req)
675+
if !c.processError(err) {
676+
return
677+
}
678+
}
679+
return
680+
}
681+
682+
func (c *TokenAutoUpdateClient) GetLogLinesV2(project, logstore string, req *GetLogRequest) (r *GetLogLinesResponse, err error) {
683+
for i := 0; i < c.maxTryTimes; i++ {
684+
r, err = c.logClient.GetLogLinesV2(project, logstore, req)
685+
if !c.processError(err) {
686+
return
687+
}
688+
}
689+
return
690+
}
691+
672692
func (c *TokenAutoUpdateClient) GetLogs(project, logstore string, topic string, from int64, to int64, queryExp string,
673693
maxLineNum int64, offset int64, reverse bool) (r *GetLogsResponse, err error) {
674694
for i := 0; i < c.maxTryTimes; i++ {

0 commit comments

Comments
 (0)