Skip to content

Commit e1fc872

Browse files
authored
Merge branch 'aliyun:master' into master
2 parents bb59d30 + 8e36402 commit e1fc872

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+2633
-729
lines changed

.github/workflows/go-ut.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
- name: Set up Go
2020
uses: actions/setup-go@v3
2121
with:
22-
go-version: 1.17
22+
go-version: 1.19
2323

2424
- name: Build
2525
run: go build -v ./...

.github/workflows/go.yml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,16 @@ jobs:
1616
- name: Set up Go
1717
uses: actions/setup-go@v2
1818
with:
19-
go-version: 1.13
19+
go-version: 1.19
20+
21+
- name: Build
22+
run: go build -v ./
23+
24+
- name: Build consumer
25+
run: go build -v ./consumer/...
26+
27+
- name: Build producer
28+
run: go build -v ./producer/...
29+
30+
- name: Build util
31+
run: go build -v ./util/...

README.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ go get -u github.com/aliyun/aliyun-log-go-sdk
3838
Client = sls.CreateNormalInterfaceV2(Endpoint, credentialsProvider)
3939
```
4040

41+
为了防止出现配置错误,您可以在创建 Client 之后,测试 Client 是否能成功调用 SLS API
42+
```go
43+
_, err := Client.ListProject()
44+
if err != nil {
45+
panic(err)
46+
}
47+
```
48+
4149
2. **创建project**
4250

4351
参考 [log_project.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/project/log_project.go)文件
@@ -97,9 +105,7 @@ go get -u github.com/aliyun/aliyun-log-go-sdk
97105

98106
5. **写数据**
99107

100-
参考[put_log.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/loghub/put_log.go)
101-
102-
这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失。
108+
这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失, 对于MetricStore类型, 使用`PutLogsWithMetricStoreURL`API 发送数据可以提升大基数时间线下查询性能 。
103109

104110
```go
105111
logs := []*sls.Log{}
@@ -143,8 +149,6 @@ go get -u github.com/aliyun/aliyun-log-go-sdk
143149

144150
6.**读数据**
145151

146-
参考[pull_log.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/loghub/pull_log.go)
147-
148152
这里展示了使用SDK中原生API接口调用去拉取数据的方式,我们不推荐使用这种方式去读取消费logstore中的数据,推荐使用SDK中 [consumer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/consumer) 消费组去拉取数据,消费组提供自动负载均衡以及失败重试等机制,并且会自动保存拉取断点,再次拉取不会拉取重复数据。
149153

150154
```go

cgo/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
## 依赖
2+
使用此扩展,需要设置 go env 中的 CGO_ENABLED=1,且环境中已安装了合适的编译器,linux 上通常是 gcc。
3+
可以通过以下命令查看当前 CGO_ENABLED 是否打开。
4+
5+
```bash
6+
go env | grep CGO_ENABLED
7+
```
8+
查看默认的编译器。
9+
```bash
10+
go env | grep CC
11+
```
12+
13+
如果 CGO_ENABLED 值是 1,则可跳过下面开启 CGO_ENABLED 的步骤。
14+
15+
### 全局永久开启
16+
```bash
17+
go env -w CGO_ENABLED=1
18+
```
19+
20+
### 临时开启
21+
```bash
22+
CGO_ENABLED=1 go build
23+
```
24+
25+
## 使用方法
26+
开启 cgo-zstd 扩展
27+
28+
```golang
29+
import (
30+
cgo "github.com/aliyun/aliyun-log-go-sdk/cgo"
31+
sls "github.com/aliyun/aliyun-log-go-sdk"
32+
)
33+
cgo.SetZstdCgoCompressor(1)
34+
```
35+
36+
37+
使用 zstd 压缩写入日志的示例
38+
```golang
39+
import (
40+
"time"
41+
42+
cgo "github.com/aliyun/aliyun-log-go-sdk/cgo"
43+
sls "github.com/aliyun/aliyun-log-go-sdk"
44+
"github.com/golang/protobuf/proto"
45+
)
46+
47+
func main() {
48+
cgo.SetZstdCgoCompressor(1)
49+
client := sls.CreateNormalInterface("endpoint",
50+
"accessKeyId", "accessKeySecret", "")
51+
lg := &sls.LogGroup{
52+
Logs: []*sls.Log{
53+
{
54+
Time: proto.Uint32(uint32(time.Now().Unix())),
55+
Contents: []*sls.LogContent{
56+
{
57+
Key: proto.String("HELLO"),
58+
Value: proto.String("world"),
59+
},
60+
},
61+
},
62+
},
63+
}
64+
err := client.PostLogStoreLogsV2(
65+
"your-project",
66+
"your-logstore",
67+
&sls.PostLogStoreLogsRequest{
68+
LogGroup: lg,
69+
CompressType: sls.Compress_ZSTD, // 指定压缩方式为 ZSTD
70+
},
71+
)
72+
if err != nil {
73+
panic(err)
74+
}
75+
76+
}
77+
```

cgo/compressor.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package cgo
2+
3+
import (
4+
"sync"
5+
6+
"github.com/DataDog/zstd"
7+
sls "github.com/aliyun/aliyun-log-go-sdk"
8+
)
9+
10+
func SetZstdCgoCompressor(compressLevel int) error {
11+
sls.SetZstdCompressor(newZstdCompressor(compressLevel))
12+
return nil
13+
}
14+
15+
type zstdCompressor struct {
16+
ctxPool sync.Pool
17+
level int
18+
}
19+
20+
func newZstdCompressor(level int) *zstdCompressor {
21+
res := &zstdCompressor{
22+
level: level,
23+
}
24+
res.ctxPool = sync.Pool{
25+
New: func() interface{} {
26+
return zstd.NewCtx()
27+
},
28+
}
29+
return res
30+
}
31+
32+
func (c *zstdCompressor) Compress(src, dst []byte) ([]byte, error) {
33+
zstdCtx := c.ctxPool.Get().(zstd.Ctx)
34+
defer c.ctxPool.Put(zstdCtx)
35+
return zstdCtx.CompressLevel(dst, src, c.level)
36+
}
37+
38+
func (c *zstdCompressor) Decompress(src, dst []byte) ([]byte, error) {
39+
zstdCtx := c.ctxPool.Get().(zstd.Ctx)
40+
defer c.ctxPool.Put(zstdCtx)
41+
return zstdCtx.Decompress(dst, src)
42+
}

client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88
"net/http"
99
"net/url"
1010
"strconv"
11+
"strings"
1112
"sync"
1213
"time"
14+
15+
"github.com/aliyun/aliyun-log-go-sdk/util"
1316
)
1417

1518
// GlobalForceUsingHTTP if GlobalForceUsingHTTP is true, then all request will use HTTP(ignore LogProject's UsingHTTP flag)
@@ -28,6 +31,7 @@ var MaxCompletedRetryLatency = 5 * time.Minute
2831
const (
2932
Compress_LZ4 = iota // 0
3033
Compress_None // 1
34+
Compress_ZSTD // 2
3135
Compress_Max // max compress type(just for filter invalid compress type)
3236
)
3337

@@ -165,6 +169,11 @@ func (c *Client) SetHTTPClient(client *http.Client) {
165169
c.HTTPClient = client
166170
}
167171

172+
// SetRetryTimeout set retry timeout
173+
func (c *Client) SetRetryTimeout(timeout time.Duration) {
174+
c.RetryTimeOut = timeout
175+
}
176+
168177
// SetAuthVersion set signature version that the client used
169178
func (c *Client) SetAuthVersion(version AuthVersionType) {
170179
c.accessKeyLock.Lock()
@@ -395,3 +404,11 @@ func (c *Client) DeleteProject(name string) error {
395404
func (c *Client) Close() error {
396405
return nil
397406
}
407+
408+
func (c *Client) setSignV4IfInAcdr(endpoint string) {
409+
region, err := util.ParseRegion(endpoint)
410+
if err == nil && strings.Contains(region, "-acdr-ut-") {
411+
c.AuthVersion = AuthV4
412+
c.Region = region
413+
}
414+
}

client_interface.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package sls
33
import (
44
"net/http"
55
"time"
6+
7+
"github.com/aliyun/aliyun-log-go-sdk/util"
68
)
79

810
// CreateNormalInterface create a normal client.
@@ -11,10 +13,10 @@ import (
1113
// If you keep using long-lived AccessKeyID and AccessKeySecret,
1214
// use the example code below.
1315
//
14-
// provider := NewStaticCredProvider(accessKeyID, accessKeySecret, securityToken)
16+
// provider := NewStaticCredentialsProvider(accessKeyID, accessKeySecret, securityToken)
1517
// client := CreateNormalInterfaceV2(endpoint, provider)
1618
func CreateNormalInterface(endpoint, accessKeyID, accessKeySecret, securityToken string) ClientInterface {
17-
return &Client{
19+
client := &Client{
1820
Endpoint: endpoint,
1921
AccessKeyID: accessKeyID,
2022
AccessKeySecret: accessKeySecret,
@@ -26,6 +28,8 @@ func CreateNormalInterface(endpoint, accessKeyID, accessKeySecret, securityToken
2628
securityToken,
2729
),
2830
}
31+
client.setSignV4IfInAcdr(endpoint)
32+
return client
2933
}
3034

3135
// CreateNormalInterfaceV2 create a normal client, with a CredentialsProvider.
@@ -35,13 +39,15 @@ func CreateNormalInterface(endpoint, accessKeyID, accessKeySecret, securityToken
3539
//
3640
// See [credentials_provider.go] for more details.
3741
func CreateNormalInterfaceV2(endpoint string, credentialsProvider CredentialsProvider) ClientInterface {
38-
return &Client{
42+
client := &Client{
3943
Endpoint: endpoint,
4044
credentialsProvider: credentialsProvider,
4145
}
46+
client.setSignV4IfInAcdr(endpoint)
47+
return client
4248
}
4349

44-
type UpdateTokenFunction = func() (accessKeyID, accessKeySecret, securityToken string, expireTime time.Time, err error)
50+
type UpdateTokenFunction = util.UpdateTokenFunction
4551

4652
// CreateTokenAutoUpdateClient create a TokenAutoUpdateClient,
4753
// this client will auto fetch security token and retry when operation is `Unauthorized`
@@ -79,6 +85,8 @@ type ClientInterface interface {
7985
SetUserAgent(userAgent string)
8086
// SetHTTPClient set a custom http client, all request will send to sls by this client
8187
SetHTTPClient(client *http.Client)
88+
// SetRetryTimeout set retry timeout, client will retry util retry timeout
89+
SetRetryTimeout(timeout time.Duration)
8290
// #################### Client Operations #####################
8391
// ResetAccessKeyToken reset client's access key token
8492
ResetAccessKeyToken(accessKeyID, accessKeySecret, securityToken string)
@@ -162,6 +170,20 @@ type ClientInterface interface {
162170
// ListEventStore returns all eventStore names of project p.
163171
ListEventStore(project string, offset, size int) ([]string, error)
164172

173+
// #################### StoreView Operations #####################
174+
// CreateStoreView creates a new storeView.
175+
CreateStoreView(project string, storeView *StoreView) error
176+
// UpdateStoreView updates a storeView.
177+
UpdateStoreView(project string, storeView *StoreView) error
178+
// DeleteStoreView deletes a storeView.
179+
DeleteStoreView(project string, storeViewName string) error
180+
// GetStoreView returns storeView.
181+
GetStoreView(project string, storeViewName string) (*StoreView, error)
182+
// ListStoreViews returns all storeView names of a project.
183+
ListStoreViews(project string, req *ListStoreViewsRequest) (*ListStoreViewsResponse, error)
184+
// GetStoreViewIndex returns all index config of logstores in the storeView, only support storeType logstore.
185+
GetStoreViewIndex(project string, storeViewName string) (*GetStoreViewIndexResponse, error)
186+
165187
// #################### Logtail Operations #####################
166188
// ListMachineGroup returns machine group name list and the total number of machine groups.
167189
// The offset starts from 0 and the size is the max number of machine groups could be returned.
@@ -220,7 +242,6 @@ type ClientInterface interface {
220242
CreateEtlMeta(project string, etlMeta *EtlMeta) (err error)
221243
UpdateEtlMeta(project string, etlMeta *EtlMeta) (err error)
222244
DeleteEtlMeta(project string, etlMetaName, etlMetaKey string) (err error)
223-
listEtlMeta(project string, etlMetaName, etlMetaKey, etlMetaTag string, offset, size int) (total int, count int, etlMeta []*EtlMeta, err error)
224245
GetEtlMeta(project string, etlMetaName, etlMetaKey string) (etlMeta *EtlMeta, err error)
225246
ListEtlMeta(project string, etlMetaName string, offset, size int) (total int, count int, etlMetaList []*EtlMeta, err error)
226247
ListEtlMetaWithTag(project string, etlMetaName, etlMetaTag string, offset, size int) (total int, count int, etlMetaList []*EtlMeta, err error)
@@ -237,12 +258,14 @@ type ClientInterface interface {
237258
MergeShards(project, logstore string, shardID int) (shards []*Shard, err error)
238259

239260
// #################### Log Operations #####################
261+
PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error)
240262
// PutLogs put logs into logstore.
241263
// The callers should transform user logs into LogGroup.
242264
PutLogs(project, logstore string, lg *LogGroup) (err error)
243265
// PostLogStoreLogs put logs into Shard logstore by hashKey.
244266
// The callers should transform user logs into LogGroup.
245267
PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKey *string) (err error)
268+
PostLogStoreLogsV2(project, logstore string, req *PostLogStoreLogsRequest) (err error)
246269
// PostRawLogWithCompressType put logs into logstore with specific compress type and hashKey.
247270
PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error)
248271
// PutLogsWithCompressType put logs into logstore with specific compress type.
@@ -262,14 +285,18 @@ type ClientInterface interface {
262285
// The nextCursor is the next curosr can be used to read logs at next time.
263286
GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
264287
logGroupMaxCount int) (out []byte, nextCursor string, err error)
288+
// Deprecated: Use GetLogsBytesWithQuery instead.
265289
GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error)
290+
GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error)
266291
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
267292
// The logGroupMaxCount is the max number of logGroup could be returned.
268293
// The nextCursor is the next cursor can be used to read logs at next time.
269294
// @note if you want to pull logs continuous, set endCursor = ""
270295
PullLogs(project, logstore string, shardID int, cursor, endCursor string,
271296
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
297+
// Deprecated: Use PullLogsWithQuery instead.
272298
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
299+
PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error)
273300
// GetHistograms query logs with [from, to) time range
274301
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
275302
// GetLogs query logs with [from, to) time range

client_job.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const (
1616
DataSourceKafka DataSourceType = "Kafka"
1717
DataSourceCMS DataSourceType = "AliyunCloudMonitor"
1818
DataSourceGeneral DataSourceType = "General"
19+
DataSourceS3 DataSourceType = "S3"
1920

2021
OSSDataFormatTypeLine OSSDataFormatType = "Line"
2122
OSSDataFormatTypeMultiline OSSDataFormatType = "Multiline"
@@ -190,7 +191,23 @@ type (
190191
TimePattern string `json:"timePattern"`
191192
TimeFormat string `json:"timeFormat"`
192193
TimeZone string `json:"timeZone"`
194+
Communication string `json:"communication"`
195+
NameResolutions string `json:"nameResolutions"`
193196
AdditionalProps map[string]string `json:"additionalProps"`
197+
VpcId string `json:"vpcId"`
198+
}
199+
200+
S3Source struct {
201+
DataSource
202+
AWSAccessKey string `json:"awsAccessKey"`
203+
AWSAccessKeySecret string `json:"awsAccessKeySecret"`
204+
AWSRegion string `json:"awsRegion"`
205+
Bucket string `json:"bucket"`
206+
Prefix string `json:"prefix,omitempty"`
207+
Pattern string `json:"pattern,omitempty"`
208+
CompressionCodec string `json:"compressionCodec,omitempty"`
209+
Encoding string `json:"encoding,omitempty"`
210+
Format interface{} `json:"format,omitempty"`
194211
}
195212

196213
// ingestion JDBC source

0 commit comments

Comments
 (0)