Skip to content

Commit a2b7d52

Browse files
committed
sstable: enable compression levels
Enable compression levels by changing block.Compression to be composed of a Family and Level. Expose new enums to Cockroach.
1 parent dbc7e3b commit a2b7d52

19 files changed

+221
-78
lines changed

ingest_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func TestIngestLoadRand(t *testing.T) {
191191
},
192192
path: paths[i],
193193
}
194-
expected[i].tableMetadata.Stats.CompressionType = block.SnappyCompression
194+
expected[i].tableMetadata.Stats.CompressionType = block.SnappyCompressionFamily
195195
expected[i].StatsMarkValid()
196196

197197
func() {

internal/manifest/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type TableStats struct {
7676
// Total size of value blocks and value index block.
7777
ValueBlocksSize uint64
7878
// CompressionType is the compression type of the table.
79-
CompressionType block.Compression
79+
CompressionType block.CompressionFamily
8080
// TombstoneDenseBlocksRatio is the ratio of data blocks in this table that
8181
// fulfills at least one of the following:
8282
// 1. The block contains at least options.Experimental.NumDeletionsThreshold

options.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,41 @@ const (
4343
type Compression = block.Compression
4444

4545
// Exported Compression constants.
46-
const (
46+
var (
4747
DefaultCompression = block.DefaultCompression
4848
NoCompression = block.NoCompression
4949
SnappyCompression = block.SnappyCompression
50-
ZstdCompression = block.ZstdCompression
51-
MinlzCompression = block.MinlzCompression
50+
ZstdCompression = block.DefaultZstdCompression
51+
MinlzCompression = block.DefaultMinlzCompression
52+
53+
// Zstd compression levels.
54+
ZstdCompressionLevel1 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel1}
55+
ZstdCompressionLevel2 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel2}
56+
ZstdCompressionLevel3 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel3}
57+
ZstdCompressionLevel4 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel4}
58+
ZstdCompressionLevel5 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel5}
59+
ZstdCompressionLevel6 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel6}
60+
ZstdCompressionLevel7 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel7}
61+
ZstdCompressionLevel8 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel8}
62+
ZstdCompressionLevel9 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel9}
63+
ZstdCompressionLevel10 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel10}
64+
ZstdCompressionLevel11 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel11}
65+
ZstdCompressionLevel12 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel12}
66+
ZstdCompressionLevel13 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel13}
67+
ZstdCompressionLevel14 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel14}
68+
ZstdCompressionLevel15 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel15}
69+
ZstdCompressionLevel16 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel16}
70+
ZstdCompressionLevel17 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel17}
71+
ZstdCompressionLevel18 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel18}
72+
ZstdCompressionLevel19 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel19}
73+
ZstdCompressionLevel20 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel20}
74+
ZstdCompressionLevel21 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel21}
75+
ZstdCompressionLevel22 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel22}
76+
77+
// Minlz compression levels.
78+
MinlzCompressionLevelFastest = block.Compression{Family: block.MinlzCompressionFamily, Level: block.MinlzLevelFastest}
79+
MinlzCompressionLevelBalanced = block.Compression{Family: block.MinlzCompressionFamily, Level: block.MinlzLevelBalanced}
80+
MinlzCompressionLevelSmallest = block.Compression{Family: block.MinlzCompressionFamily, Level: block.MinlzLevelSmallest}
5281
)
5382

5483
// FilterType exports the base.FilterType type.
@@ -1492,7 +1521,7 @@ func (o *Options) String() string {
14921521
fmt.Fprintf(&buf, " block_restart_interval=%d\n", l.BlockRestartInterval)
14931522
fmt.Fprintf(&buf, " block_size=%d\n", l.BlockSize)
14941523
fmt.Fprintf(&buf, " block_size_threshold=%d\n", l.BlockSizeThreshold)
1495-
fmt.Fprintf(&buf, " compression=%s\n", resolveDefaultCompression(l.Compression()))
1524+
fmt.Fprintf(&buf, " compression=%s\n", resolveDefaultCompression(l.Compression()).Family.String())
14961525
fmt.Fprintf(&buf, " filter_policy=%s\n", filterPolicyName(l.FilterPolicy))
14971526
fmt.Fprintf(&buf, " filter_type=%s\n", l.FilterType)
14981527
fmt.Fprintf(&buf, " index_block_size=%d\n", l.IndexBlockSize)
@@ -2149,8 +2178,8 @@ func (o *Options) MakeBlobWriterOptions(level int) blob.FileWriterOptions {
21492178
}
21502179

21512180
func resolveDefaultCompression(c Compression) Compression {
2152-
if c <= DefaultCompression || c >= block.NCompression {
2153-
c = SnappyCompression
2181+
if c.Family <= block.DefaultCompressionFamily || c.Family >= block.NCompressionFamily {
2182+
c = block.SnappyCompression
21542183
}
21552184
return c
21562185
}

sstable/blob/blob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type FileWriterOptions struct {
5757
}
5858

5959
func (o *FileWriterOptions) ensureDefaults() {
60-
if o.Compression <= block.DefaultCompression || o.Compression >= block.NCompression {
60+
if o.Compression.Family <= block.DefaultCompressionFamily || o.Compression.Family >= block.NCompressionFamily {
6161
o.Compression = block.SnappyCompression
6262
}
6363
if o.ChecksumType == block.ChecksumTypeNone {

sstable/blob/blob_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func scanFileWriterOptions(t *testing.T, td *datadriven.TestData) FileWriterOpti
6565
td.MaybeScanArgs(t, "target-block-size", &targetBlockSize)
6666
td.MaybeScanArgs(t, "block-size-threshold", &blockSizeThreshold)
6767
if cmdArg, ok := td.Arg("compression"); ok {
68-
compression = block.CompressionFromString(cmdArg.SingleVal(t))
68+
compression = block.FamilyToDefaultCompression[block.CompressionFromString(cmdArg.SingleVal(t))]
6969
}
7070
return FileWriterOptions{
7171
Compression: compression,

sstable/block/compression.go

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,97 @@ import (
1616
"github.com/cockroachdb/pebble/objstorage"
1717
)
1818

19-
// Compression is the per-block compression algorithm to use.
20-
type Compression int
19+
// CompressionFamily identifies a compression algorithm (e.g., Snappy, Zstd, Minlz).
20+
type CompressionFamily int
21+
22+
// CompressionLevel specifies the compression level for a given family.
23+
// Some families ignore this value if they don't support levels.
24+
type CompressionLevel int
25+
26+
// Compression is the per-block compression algorithm and level to use.
27+
// For families like Snappy, the level is ignored. For families like Zstd or Minlz,
28+
// the level adjusts compression ratio and speed.
29+
type Compression struct {
30+
Family CompressionFamily
31+
Level CompressionLevel
32+
}
33+
34+
// The available compression family types.
35+
const (
36+
DefaultCompressionFamily CompressionFamily = iota
37+
NoCompressionFamily
38+
SnappyCompressionFamily
39+
ZstdCompressionFamily
40+
MinlzCompressionFamily
41+
NCompressionFamily
42+
)
2143

22-
// The available compression types.
44+
// The available compression levels.
2345
const (
24-
DefaultCompression Compression = iota
25-
NoCompression
26-
SnappyCompression
27-
ZstdCompression
28-
MinlzCompression
29-
NCompression
46+
LevelDefault CompressionLevel = 0
47+
48+
// Zstd compression levels.
49+
ZstdLevelMin CompressionLevel = 1
50+
ZstdLevel1 CompressionLevel = 1
51+
ZstdLevel2 CompressionLevel = 2
52+
ZstdLevel3 CompressionLevel = 3 // Default for Zstd.
53+
ZstdLevelDefault CompressionLevel = ZstdLevel3
54+
ZstdLevel4 CompressionLevel = 4
55+
ZstdLevel5 CompressionLevel = 5
56+
ZstdLevel6 CompressionLevel = 6
57+
ZstdLevel7 CompressionLevel = 7
58+
ZstdLevel8 CompressionLevel = 8
59+
ZstdLevel9 CompressionLevel = 9
60+
ZstdLevel10 CompressionLevel = 10
61+
ZstdLevel11 CompressionLevel = 11
62+
ZstdLevel12 CompressionLevel = 12
63+
ZstdLevel13 CompressionLevel = 13
64+
ZstdLevel14 CompressionLevel = 14
65+
ZstdLevel15 CompressionLevel = 15
66+
ZstdLevel16 CompressionLevel = 16
67+
ZstdLevel17 CompressionLevel = 17
68+
ZstdLevel18 CompressionLevel = 18
69+
ZstdLevel19 CompressionLevel = 19
70+
ZstdLevel20 CompressionLevel = 20
71+
ZstdLevel21 CompressionLevel = 21
72+
ZstdLevel22 CompressionLevel = 22
73+
ZstdLevelMax CompressionLevel = 22
74+
75+
// Minlz compression levels.
76+
MinlzLevelMin CompressionLevel = 1
77+
MinlzLevelFastest CompressionLevel = 1 // Default for MinLZ.
78+
MinlzLevelDefault CompressionLevel = MinlzLevelFastest
79+
MinlzLevelBalanced CompressionLevel = 2
80+
MinlzLevelSmallest CompressionLevel = 3
81+
MinlzLevelMax CompressionLevel = 3
3082
)
3183

84+
var DefaultCompression = Compression{Family: DefaultCompressionFamily, Level: LevelDefault}
85+
var NoCompression = Compression{Family: NoCompressionFamily, Level: LevelDefault}
86+
var SnappyCompression = Compression{Family: SnappyCompressionFamily, Level: LevelDefault}
87+
var DefaultZstdCompression = Compression{Family: ZstdCompressionFamily, Level: LevelDefault}
88+
var DefaultMinlzCompression = Compression{Family: MinlzCompressionFamily, Level: LevelDefault}
89+
90+
var FamilyToDefaultCompression = map[CompressionFamily]Compression{
91+
DefaultCompressionFamily: DefaultCompression,
92+
NoCompressionFamily: NoCompression,
93+
SnappyCompressionFamily: SnappyCompression,
94+
ZstdCompressionFamily: DefaultZstdCompression,
95+
}
96+
3297
// String implements fmt.Stringer, returning a human-readable name for the
3398
// compression algorithm.
34-
func (c Compression) String() string {
99+
func (c CompressionFamily) String() string {
35100
switch c {
36-
case DefaultCompression:
101+
case DefaultCompressionFamily:
37102
return "Default"
38-
case NoCompression:
103+
case NoCompressionFamily:
39104
return "NoCompression"
40-
case SnappyCompression:
105+
case SnappyCompressionFamily:
41106
return "Snappy"
42-
case ZstdCompression:
107+
case ZstdCompressionFamily:
43108
return "ZSTD"
44-
case MinlzCompression:
109+
case MinlzCompressionFamily:
45110
return "Minlz"
46111
default:
47112
return "Unknown"
@@ -50,20 +115,20 @@ func (c Compression) String() string {
50115

51116
// CompressionFromString returns an sstable.Compression from its
52117
// string representation. Inverse of c.String() above.
53-
func CompressionFromString(s string) Compression {
118+
func CompressionFromString(s string) CompressionFamily {
54119
switch s {
55120
case "Default":
56-
return DefaultCompression
121+
return DefaultCompressionFamily
57122
case "NoCompression":
58-
return NoCompression
123+
return NoCompressionFamily
59124
case "Snappy":
60-
return SnappyCompression
125+
return SnappyCompressionFamily
61126
case "ZSTD":
62-
return ZstdCompression
127+
return ZstdCompressionFamily
63128
case "Minlz":
64-
return MinlzCompression
129+
return MinlzCompressionFamily
65130
default:
66-
return DefaultCompression
131+
return DefaultCompressionFamily
67132
}
68133
}
69134

@@ -222,10 +287,10 @@ func CompressAndChecksum(
222287
// Compress the buffer, discarding the result if the improvement isn't at
223288
// least 12.5%.
224289
algo := NoCompressionIndicator
225-
if compression != NoCompression {
226-
compressor := GetCompressor(compression)
290+
if compression.Family != NoCompressionFamily {
291+
compressor := GetCompressor(compression.Family)
227292
defer compressor.Close()
228-
algo, buf = compressor.Compress(buf, blockData)
293+
algo, buf = compressor.Compress(buf, blockData, compression.Level)
229294
if len(buf) >= len(blockData)-len(blockData)/8 {
230295
algo = NoCompressionIndicator
231296
}

sstable/block/compression_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ func TestCompressionRoundtrip(t *testing.T) {
2626
t.Logf("seed %d", seed)
2727
rng := rand.New(rand.NewPCG(0, seed))
2828

29-
for compression := DefaultCompression + 1; compression < NCompression; compression++ {
30-
if compression == NoCompression {
29+
for compression := DefaultCompressionFamily + 1; compression < NCompressionFamily; compression++ {
30+
if compression == NoCompressionFamily {
3131
continue
3232
}
3333
t.Run(compression.String(), func(t *testing.T) {
@@ -40,7 +40,7 @@ func TestCompressionRoundtrip(t *testing.T) {
4040
compressedBuf := make([]byte, 1+rng.IntN(1<<10 /* 1 KiB */))
4141
compressor := GetCompressor(compression)
4242
defer compressor.Close()
43-
btyp, compressed := compressor.Compress(compressedBuf, payload)
43+
btyp, compressed := compressor.Compress(compressedBuf, payload, LevelDefault)
4444
v, err := decompress(btyp, compressed)
4545
require.NoError(t, err)
4646
got := payload
@@ -151,12 +151,12 @@ func TestMinlzEncodingLimit(t *testing.T) {
151151
require.Fail(t, "Expected minlz.ErrTooLarge Error")
152152
}
153153

154-
c := GetCompressor(MinlzCompression)
154+
c := GetCompressor(MinlzCompressionFamily)
155155
defer c.Close()
156-
algo, _ := c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize-1))
156+
algo, _ := c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize-1), MinlzLevelDefault)
157157
require.Equal(t, algo, MinlzCompressionIndicator)
158-
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize))
158+
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize), MinlzLevelDefault)
159159
require.Equal(t, algo, MinlzCompressionIndicator)
160-
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize+1))
160+
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize+1), MinlzLevelDefault)
161161
require.Equal(t, algo, SnappyCompressionIndicator)
162162
}

sstable/block/compressor.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
type Compressor interface {
13-
Compress(dst, src []byte) (CompressionIndicator, []byte)
13+
Compress(dst, src []byte, level CompressionLevel) (CompressionIndicator, []byte)
1414

1515
// Close must be called when the Compressor is no longer needed.
1616
// After Close is called, the Compressor must not be used again.
@@ -25,42 +25,53 @@ var _ Compressor = noopCompressor{}
2525
var _ Compressor = snappyCompressor{}
2626
var _ Compressor = minlzCompressor{}
2727

28-
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
28+
func (noopCompressor) Compress(dst, src []byte, _ CompressionLevel) (CompressionIndicator, []byte) {
2929
panic("NoCompressionCompressor.Compress() should not be called.")
3030
}
3131
func (noopCompressor) Close() {}
3232

33-
func (snappyCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
33+
func (snappyCompressor) Compress(
34+
dst, src []byte, _ CompressionLevel,
35+
) (CompressionIndicator, []byte) {
3436
dst = dst[:cap(dst):cap(dst)]
3537
return SnappyCompressionIndicator, snappy.Encode(dst, src)
3638
}
3739

3840
func (snappyCompressor) Close() {}
3941

40-
func (minlzCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
41-
// Minlz cannot encode blocks greater than 8MB. Fall back to Snappy in those cases.
42+
func (minlzCompressor) Compress(
43+
dst, src []byte, level CompressionLevel,
44+
) (CompressionIndicator, []byte) {
45+
// Minlz cannot encode blocks greater than 8MiB. Fall back to Snappy in those cases.
4246
if len(src) > minlz.MaxBlockSize {
43-
return (snappyCompressor{}).Compress(dst, src)
47+
return (snappyCompressor{}).Compress(dst, src, LevelDefault)
4448
}
45-
46-
compressed, err := minlz.Encode(dst, src, minlz.LevelFastest)
49+
var encoderLevel int
50+
if level == LevelDefault {
51+
encoderLevel = int(MinlzLevelDefault)
52+
} else if level < MinlzLevelMin || level > MinlzLevelMax {
53+
panic("minlz compression: illegal level")
54+
} else {
55+
encoderLevel = int(level)
56+
}
57+
compressed, err := minlz.Encode(dst, src, encoderLevel)
4758
if err != nil {
48-
panic(errors.Wrap(err, "minlz compression"))
59+
panic(errors.Wrap(err, "Error while compressing using Minlz."))
4960
}
5061
return MinlzCompressionIndicator, compressed
5162
}
5263

5364
func (minlzCompressor) Close() {}
5465

55-
func GetCompressor(c Compression) Compressor {
66+
func GetCompressor(c CompressionFamily) Compressor {
5667
switch c {
57-
case NoCompression:
68+
case NoCompressionFamily:
5869
return noopCompressor{}
59-
case SnappyCompression:
70+
case SnappyCompressionFamily:
6071
return snappyCompressor{}
61-
case ZstdCompression:
72+
case ZstdCompressionFamily:
6273
return getZstdCompressor()
63-
case MinlzCompression:
74+
case MinlzCompressionFamily:
6475
return minlzCompressor{}
6576
default:
6677
panic("Invalid compression type.")

0 commit comments

Comments
 (0)