Skip to content

Commit 9a152f5

Browse files
authored
Feat cgo-zstd (aliyun#284)
* test: add two types of zstd compressor * feat: mv cgo to extension * feat: add cgo-extensions * chore: update readme * chore: readme * chore: rm * refine name * rename package name * refine readme * update readme
1 parent 8b8880e commit 9a152f5

File tree

6 files changed

+179
-21
lines changed

6 files changed

+179
-21
lines changed

cgo/README.md

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
## 依赖
2+
使用此扩展,需要设置 go env 中的 CGO_ENABLED=1,且环境中已安装了合适的编译器,linux 上通常是 gcc。
3+
可以通过以下命令查看当前 CGO_ENABLED 是否打开。
4+
5+
```bash
6+
go env | grep CGO_ENABLED
7+
```
8+
查看默认的编译器。
9+
```bash
10+
go env | grep CC
11+
```
12+
13+
如果 CGO_ENABLED 值是 1,则可跳过下面开启 CGO_ENABLED 的步骤。
14+
15+
### 全局永久开启
16+
```bash
17+
go env -w CGO_ENABLED=1
18+
```
19+
20+
### 临时开启
21+
```bash
22+
CGO_ENABLED=1 go build
23+
```
24+
25+
## 使用方法
26+
开启 cgo-zstd 扩展
27+
28+
```golang
29+
import (
30+
cgo "github.com/aliyun/aliyun-log-go-sdk/cgo"
31+
sls "github.com/aliyun/aliyun-log-go-sdk"
32+
)
33+
cgo.SetZstdCgoCompressor(1)
34+
```
35+
36+
37+
使用 zstd 压缩写入日志的示例
38+
```golang
39+
import (
40+
"time"
41+
42+
cgo "github.com/aliyun/aliyun-log-go-sdk/cgo"
43+
sls "github.com/aliyun/aliyun-log-go-sdk"
44+
"github.com/golang/protobuf/proto"
45+
)
46+
47+
func main() {
48+
cgo.SetZstdCgoCompressor(1)
49+
client := sls.CreateNormalInterface("endpoint",
50+
"accessKeyId", "accessKeySecret", "")
51+
lg := &sls.LogGroup{
52+
Logs: []*sls.Log{
53+
{
54+
Time: proto.Uint32(uint32(time.Now().Unix())),
55+
Contents: []*sls.LogContent{
56+
{
57+
Key: proto.String("HELLO"),
58+
Value: proto.String("world"),
59+
},
60+
},
61+
},
62+
},
63+
}
64+
err := client.PostLogStoreLogsV2(
65+
"your-project",
66+
"your-logstore",
67+
&sls.PostLogStoreLogsRequest{
68+
LogGroup: lg,
69+
CompressType: sls.Compress_ZSTD, // 指定压缩方式为 ZSTD
70+
},
71+
)
72+
if err != nil {
73+
panic(err)
74+
}
75+
76+
}
77+
```

cgo/compressor.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package cgo
2+
3+
import (
4+
"sync"
5+
6+
"github.com/DataDog/zstd"
7+
sls "github.com/aliyun/aliyun-log-go-sdk"
8+
)
9+
10+
func SetZstdCgoCompressor(compressLevel int) error {
11+
sls.SetZstdCompressor(newZstdCompressor(compressLevel))
12+
return nil
13+
}
14+
15+
type zstdCompressor struct {
16+
ctxPool sync.Pool
17+
level int
18+
}
19+
20+
func newZstdCompressor(level int) *zstdCompressor {
21+
res := &zstdCompressor{
22+
level: level,
23+
}
24+
res.ctxPool = sync.Pool{
25+
New: func() interface{} {
26+
return zstd.NewCtx()
27+
},
28+
}
29+
return res
30+
}
31+
32+
func (c *zstdCompressor) Compress(src, dst []byte) ([]byte, error) {
33+
zstdCtx := c.ctxPool.Get().(zstd.Ctx)
34+
defer c.ctxPool.Put(zstdCtx)
35+
return zstdCtx.CompressLevel(dst, src, c.level)
36+
}
37+
38+
func (c *zstdCompressor) Decompress(src, dst []byte) ([]byte, error) {
39+
zstdCtx := c.ctxPool.Get().(zstd.Ctx)
40+
defer c.ctxPool.Put(zstdCtx)
41+
return zstdCtx.Decompress(dst, src)
42+
}

compressor.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package sls
2+
3+
import (
4+
"github.com/klauspost/compress/zstd"
5+
)
6+
7+
var slsZstdCompressor LogCompressor = NewZstdCompressor(zstd.SpeedFastest)
8+
9+
func SetZstdCompressor(compressor LogCompressor) error {
10+
slsZstdCompressor = compressor
11+
return nil
12+
}
13+
14+
type LogCompressor interface {
15+
// Compress src into dst. If you have a buffer to use, you can pass it to
16+
// prevent allocation. If it is too small, or if nil is passed, a new buffer
17+
// will be allocated and returned.
18+
Compress(src, dst []byte) ([]byte, error)
19+
// Decompress src into dst. If you have a buffer to use, you can pass it to
20+
// prevent allocation. If it is too small, or if nil is passed, a new buffer
21+
// will be allocated and returned.
22+
Decompress(src, dst []byte) ([]byte, error)
23+
}
24+
25+
type ZstdCompressor struct {
26+
writer *zstd.Encoder
27+
reader *zstd.Decoder
28+
level zstd.EncoderLevel
29+
}
30+
31+
func NewZstdCompressor(level zstd.EncoderLevel) *ZstdCompressor {
32+
res := &ZstdCompressor{
33+
level: level,
34+
}
35+
res.writer, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(res.level))
36+
res.reader, _ = zstd.NewReader(nil)
37+
return res
38+
}
39+
40+
func (c *ZstdCompressor) Compress(src, dst []byte) ([]byte, error) {
41+
if dst != nil {
42+
return c.writer.EncodeAll(src, dst[:0]), nil
43+
}
44+
return c.writer.EncodeAll(src, nil), nil
45+
}
46+
47+
func (c *ZstdCompressor) Decompress(src, dst []byte) ([]byte, error) {
48+
if dst != nil {
49+
return c.reader.DecodeAll(src, dst[:0])
50+
}
51+
return c.reader.DecodeAll(src, nil)
52+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/aliyun/aliyun-log-go-sdk
33
go 1.19
44

55
require (
6+
github.com/DataDog/zstd v1.5.5
67
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d
78
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.4
89
github.com/alibabacloud-go/sts-20150401/v2 v2.0.1

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
22
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
33
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
44
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
5+
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
6+
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
57
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
68
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d h1:wvStE9wLpws31NiWUx+38wny1msZ/tm+eL5xmm4Y7So=
79
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRyW7fC9XJOWQ+NdAv8VLG7ys7l3x4ozEGLUQ=

log_store.go

+5-21
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,12 @@ import (
1313

1414
"github.com/go-kit/kit/log/level"
1515
"github.com/gogo/protobuf/proto"
16-
"github.com/klauspost/compress/zstd"
1716
"github.com/pierrec/lz4"
1817
)
1918

2019
// this file is deprecated and no maintenance
2120
// see client_logstore.go
2221

23-
var (
24-
zstdReader = newZstdReader()
25-
zstdWriter = newZstdWriter()
26-
)
27-
28-
func newZstdReader() *zstd.Decoder {
29-
r, _ := zstd.NewReader(nil)
30-
return r
31-
}
32-
33-
func newZstdWriter() *zstd.Encoder {
34-
w, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
35-
return w
36-
}
37-
3822
// LogStore defines LogStore struct
3923
type LogStore struct {
4024
Name string `json:"logstoreName"`
@@ -185,7 +169,7 @@ func (s *LogStore) PutRawLog(rawLogData []byte) (err error) {
185169
}
186170
outLen = n
187171
case Compress_ZSTD:
188-
out = zstdWriter.EncodeAll(rawLogData, nil)
172+
out, _ = slsZstdCompressor.Compress(rawLogData, nil)
189173
h = map[string]string{
190174
"x-log-compresstype": "zstd",
191175
"x-log-bodyrawsize": strconv.Itoa(len(rawLogData)),
@@ -255,7 +239,7 @@ func (s *LogStore) PostRawLogs(body []byte, hashKey *string) (err error) {
255239
outLen = n
256240
case Compress_ZSTD:
257241
// Compress body with zstd
258-
out = zstdWriter.EncodeAll(body, nil)
242+
out, _ = slsZstdCompressor.Compress(body, nil)
259243
h = map[string]string{
260244
"x-log-compresstype": "zstd",
261245
"x-log-bodyrawsize": strconv.Itoa(len(body)),
@@ -327,7 +311,7 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
327311
outLen = n
328312
case Compress_ZSTD:
329313
// Compress body with zstd
330-
out = zstdWriter.EncodeAll(body, nil)
314+
out, _ = slsZstdCompressor.Compress(body, nil)
331315
h = map[string]string{
332316
"x-log-compresstype": "zstd",
333317
"x-log-bodyrawsize": strconv.Itoa(len(body)),
@@ -408,7 +392,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
408392
outLen = n
409393
case Compress_ZSTD:
410394
// Compress body with zstd
411-
out = zstdWriter.EncodeAll(body, nil)
395+
out, _ = slsZstdCompressor.Compress(body, nil)
412396
h = map[string]string{
413397
"x-log-compresstype": "zstd",
414398
"x-log-bodyrawsize": strconv.Itoa(len(body)),
@@ -590,7 +574,7 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullL
590574
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, pullLogMeta.RawSize)
591575
}
592576
case Compress_ZSTD:
593-
out, err = zstdReader.DecodeAll(buf, out[:0])
577+
out, err = slsZstdCompressor.Decompress(buf, out)
594578
if err != nil {
595579
return nil, nil, err
596580
}

0 commit comments

Comments
 (0)