Skip to content

Commit 6c99b4b

Browse files
dvilaverdedvilaverde
andauthored
reduce memory allocations when zlib compression is enabled (#880)
* simple metrics to capture packet data size both compressed & uncompressed * reusing zlib readers and writers via a sync.Pool * cache zlib writer/readers on a connection to avoid excessive Writer creation, which reduces memory allocations * fixing issue when reading large packet responses * revert to standard go zlib implemenation * Revert "revert to standard go zlib implemenation" This reverts commit a7893fa. * Revert "fixing issue when reading large packet responses" This reverts commit 4f512d8. * Revert "cache zlib writer/readers on a connection to avoid excessive Writer creation, which reduces memory allocations" This reverts commit 25e0cee. * refactoring to reduce memory allocations * cleanup changes in prep for PR into upstream * make zlib poll return ReadCloser instead of Reader * fixing broken tests and linting issues * removed duplicate assignment * update link to doxygen docs * addressing PR review feedback --------- Co-authored-by: dvilaverde <[email protected]>
1 parent 014dcd7 commit 6c99b4b

File tree

6 files changed

+190
-97
lines changed

6 files changed

+190
-97
lines changed

client/req.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,23 @@ func (c *Conn) writeCommandStr(command byte, arg string) error {
3939
func (c *Conn) writeCommandUint32(command byte, arg uint32) error {
4040
c.ResetSequence()
4141

42-
return c.WritePacket([]byte{
43-
0x05, //5 bytes long
44-
0x00,
45-
0x00,
46-
0x00, //sequence
42+
buf := utils.ByteSliceGet(9)
4743

48-
command,
44+
buf.B[0] = 0x05 //5 bytes long
45+
buf.B[1] = 0x00
46+
buf.B[2] = 0x00
47+
buf.B[3] = 0x00 //sequence
4948

50-
byte(arg),
51-
byte(arg >> 8),
52-
byte(arg >> 16),
53-
byte(arg >> 24),
54-
})
49+
buf.B[4] = command
50+
51+
buf.B[5] = byte(arg)
52+
buf.B[6] = byte(arg >> 8)
53+
buf.B[7] = byte(arg >> 16)
54+
buf.B[8] = byte(arg >> 24)
55+
56+
err := c.WritePacket(buf.B)
57+
utils.ByteSlicePut(buf)
58+
return err
5559
}
5660

5761
func (c *Conn) writeCommandStrStr(command byte, arg1 string, arg2 string) error {

client/stmt.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"math"
88

99
. "github.com/go-mysql-org/go-mysql/mysql"
10+
"github.com/go-mysql-org/go-mysql/utils"
1011
"github.com/pingcap/errors"
1112
)
1213

@@ -146,37 +147,44 @@ func (s *Stmt) write(args ...interface{}) error {
146147
length += len(paramValues[i])
147148
}
148149

149-
data := make([]byte, 4, 4+length)
150+
data := utils.BytesBufferGet()
151+
defer func() {
152+
utils.BytesBufferPut(data)
153+
}()
154+
if data.Len() < length+4 {
155+
data.Grow(4 + length)
156+
}
150157

151-
data = append(data, COM_STMT_EXECUTE)
152-
data = append(data, byte(s.id), byte(s.id>>8), byte(s.id>>16), byte(s.id>>24))
158+
data.Write([]byte{0, 0, 0, 0})
159+
data.WriteByte(COM_STMT_EXECUTE)
160+
data.Write([]byte{byte(s.id), byte(s.id >> 8), byte(s.id >> 16), byte(s.id >> 24)})
153161

154162
//flag: CURSOR_TYPE_NO_CURSOR
155-
data = append(data, 0x00)
163+
data.WriteByte(0x00)
156164

157165
//iteration-count, always 1
158-
data = append(data, 1, 0, 0, 0)
166+
data.Write([]byte{1, 0, 0, 0})
159167

160168
if s.params > 0 {
161-
data = append(data, nullBitmap...)
169+
data.Write(nullBitmap)
162170

163171
//new-params-bound-flag
164-
data = append(data, newParamBoundFlag)
172+
data.WriteByte(newParamBoundFlag)
165173

166174
if newParamBoundFlag == 1 {
167175
//type of each parameter, length: num-params * 2
168-
data = append(data, paramTypes...)
176+
data.Write(paramTypes)
169177

170178
//value of each parameter
171179
for _, v := range paramValues {
172-
data = append(data, v...)
180+
data.Write(v)
173181
}
174182
}
175183
}
176184

177185
s.conn.ResetSequence()
178186

179-
return s.conn.WritePacket(data)
187+
return s.conn.WritePacket(data.Bytes())
180188
}
181189

182190
func (c *Conn) Prepare(query string) (*Stmt, error) {

compress/zlib.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package compress
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"sync"
7+
8+
"github.com/klauspost/compress/zlib"
9+
)
10+
11+
const DefaultCompressionLevel = 6
12+
13+
var (
14+
zlibReaderPool *sync.Pool
15+
zlibWriterPool sync.Pool
16+
)
17+
18+
func init() {
19+
zlibReaderPool = &sync.Pool{
20+
New: func() interface{} {
21+
return nil
22+
},
23+
}
24+
25+
zlibWriterPool = sync.Pool{
26+
New: func() interface{} {
27+
w, err := zlib.NewWriterLevel(new(bytes.Buffer), DefaultCompressionLevel)
28+
if err != nil {
29+
panic(err)
30+
}
31+
return w
32+
},
33+
}
34+
}
35+
36+
var _ io.WriteCloser = zlibWriter{}
37+
var _ io.ReadCloser = zlibReader{}
38+
39+
type zlibWriter struct {
40+
w *zlib.Writer
41+
}
42+
43+
type zlibReader struct {
44+
r io.ReadCloser
45+
}
46+
47+
func GetPooledZlibWriter(target io.Writer) (io.WriteCloser, error) {
48+
w := zlibWriterPool.Get().(*zlib.Writer)
49+
w.Reset(target)
50+
51+
return zlibWriter{
52+
w: w,
53+
}, nil
54+
}
55+
56+
func GetPooledZlibReader(src io.Reader) (io.ReadCloser, error) {
57+
var (
58+
rc io.ReadCloser
59+
err error
60+
)
61+
62+
if r := zlibReaderPool.Get(); r != nil {
63+
rc = r.(io.ReadCloser)
64+
if rc.(zlib.Resetter).Reset(src, nil) != nil {
65+
return nil, err
66+
}
67+
} else {
68+
if rc, err = zlib.NewReader(src); err != nil {
69+
return nil, err
70+
}
71+
}
72+
73+
return zlibReader{
74+
r: rc,
75+
}, nil
76+
}
77+
78+
func (c zlibWriter) Write(data []byte) (n int, err error) {
79+
return c.w.Write(data)
80+
}
81+
82+
func (c zlibWriter) Close() error {
83+
err := c.w.Close()
84+
zlibWriterPool.Put(c.w)
85+
return err
86+
}
87+
88+
func (d zlibReader) Read(buf []byte) (n int, err error) {
89+
return d.r.Read(buf)
90+
}
91+
92+
func (d zlibReader) Close() error {
93+
err := d.r.Close()
94+
zlibReaderPool.Put(d.r)
95+
return err
96+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/goccy/go-json v0.10.2
1010
github.com/google/uuid v1.3.0
1111
github.com/jmoiron/sqlx v1.3.3
12-
github.com/klauspost/compress v1.17.1
12+
github.com/klauspost/compress v1.17.8
1313
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
1414
github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67
1515
github.com/shopspring/decimal v1.2.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
1919
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
2020
github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk=
2121
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
22-
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
23-
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
22+
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
23+
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
2424
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
2525
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
2626
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=

0 commit comments

Comments
 (0)