Skip to content

[WIP] sstable: enable compression levels #4513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestIngestLoadRand(t *testing.T) {
},
path: paths[i],
}
expected[i].tableMetadata.Stats.CompressionType = block.SnappyCompression
expected[i].tableMetadata.Stats.CompressionType = block.SnappyCompressionFamily
expected[i].StatsMarkValid()

func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type TableStats struct {
// Total size of value blocks and value index block.
ValueBlocksSize uint64
// CompressionType is the compression type of the table.
CompressionType block.Compression
CompressionType block.CompressionFamily
// TombstoneDenseBlocksRatio is the ratio of data blocks in this table that
// fulfills at least one of the following:
// 1. The block contains at least options.Experimental.NumDeletionsThreshold
Expand Down
41 changes: 35 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,41 @@ const (
type Compression = block.Compression

// Exported Compression constants.
const (
var (
DefaultCompression = block.DefaultCompression
NoCompression = block.NoCompression
SnappyCompression = block.SnappyCompression
ZstdCompression = block.ZstdCompression
MinlzCompression = block.MinlzCompression
ZstdCompression = block.DefaultZstdCompression
MinlzCompression = block.DefaultMinlzCompression

// Zstd compression levels.
ZstdCompressionLevel1 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel1}
ZstdCompressionLevel2 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel2}
ZstdCompressionLevel3 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel3}
ZstdCompressionLevel4 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel4}
ZstdCompressionLevel5 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel5}
ZstdCompressionLevel6 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel6}
ZstdCompressionLevel7 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel7}
ZstdCompressionLevel8 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel8}
ZstdCompressionLevel9 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel9}
ZstdCompressionLevel10 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel10}
ZstdCompressionLevel11 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel11}
ZstdCompressionLevel12 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel12}
ZstdCompressionLevel13 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel13}
ZstdCompressionLevel14 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel14}
ZstdCompressionLevel15 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel15}
ZstdCompressionLevel16 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel16}
ZstdCompressionLevel17 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel17}
ZstdCompressionLevel18 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel18}
ZstdCompressionLevel19 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel19}
ZstdCompressionLevel20 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel20}
ZstdCompressionLevel21 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel21}
ZstdCompressionLevel22 = block.Compression{Family: block.ZstdCompressionFamily, Level: block.ZstdLevel22}

// Minlz compression levels.
MinlzCompressionLevelFastest = block.Compression{Family: block.MinlzCompressionFamily, Level: block.MinlzLevelFastest}
MinlzCompressionLevelBalanced = block.Compression{Family: block.MinlzCompressionFamily, Level: block.MinlzLevelBalanced}
MinlzCompressionLevelSmallest = block.Compression{Family: block.MinlzCompressionFamily, Level: block.MinlzLevelSmallest}
)

// FilterType exports the base.FilterType type.
Expand Down Expand Up @@ -1492,7 +1521,7 @@ func (o *Options) String() string {
fmt.Fprintf(&buf, " block_restart_interval=%d\n", l.BlockRestartInterval)
fmt.Fprintf(&buf, " block_size=%d\n", l.BlockSize)
fmt.Fprintf(&buf, " block_size_threshold=%d\n", l.BlockSizeThreshold)
fmt.Fprintf(&buf, " compression=%s\n", resolveDefaultCompression(l.Compression()))
fmt.Fprintf(&buf, " compression=%s\n", resolveDefaultCompression(l.Compression()).Family.String())
fmt.Fprintf(&buf, " filter_policy=%s\n", filterPolicyName(l.FilterPolicy))
fmt.Fprintf(&buf, " filter_type=%s\n", l.FilterType)
fmt.Fprintf(&buf, " index_block_size=%d\n", l.IndexBlockSize)
Expand Down Expand Up @@ -2149,8 +2178,8 @@ func (o *Options) MakeBlobWriterOptions(level int) blob.FileWriterOptions {
}

func resolveDefaultCompression(c Compression) Compression {
if c <= DefaultCompression || c >= block.NCompression {
c = SnappyCompression
if c.Family <= block.DefaultCompressionFamily || c.Family >= block.NCompressionFamily {
c = block.SnappyCompression
}
return c
}
Expand Down
2 changes: 1 addition & 1 deletion sstable/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type FileWriterOptions struct {
}

func (o *FileWriterOptions) ensureDefaults() {
if o.Compression <= block.DefaultCompression || o.Compression >= block.NCompression {
if o.Compression.Family <= block.DefaultCompressionFamily || o.Compression.Family >= block.NCompressionFamily {
o.Compression = block.SnappyCompression
}
if o.ChecksumType == block.ChecksumTypeNone {
Expand Down
2 changes: 1 addition & 1 deletion sstable/blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func scanFileWriterOptions(t *testing.T, td *datadriven.TestData) FileWriterOpti
td.MaybeScanArgs(t, "target-block-size", &targetBlockSize)
td.MaybeScanArgs(t, "block-size-threshold", &blockSizeThreshold)
if cmdArg, ok := td.Arg("compression"); ok {
compression = block.CompressionFromString(cmdArg.SingleVal(t))
compression = block.FamilyToDefaultCompression[block.CompressionFromString(cmdArg.SingleVal(t))]
}
return FileWriterOptions{
Compression: compression,
Expand Down
115 changes: 90 additions & 25 deletions sstable/block/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,97 @@ import (
"github.com/cockroachdb/pebble/objstorage"
)

// Compression is the per-block compression algorithm to use.
type Compression int
// CompressionFamily identifies a compression algorithm (e.g., Snappy, Zstd, Minlz).
type CompressionFamily int

// CompressionLevel specifies the compression level for a given family.
// Some families ignore this value if they don't support levels.
type CompressionLevel int

// Compression is the per-block compression algorithm and level to use.
// For families like Snappy, the level is ignored. For families like Zstd or Minlz,
// the level adjusts compression ratio and speed.
type Compression struct {
Family CompressionFamily
Level CompressionLevel
}

// The available compression family types.
const (
DefaultCompressionFamily CompressionFamily = iota
NoCompressionFamily
SnappyCompressionFamily
ZstdCompressionFamily
MinlzCompressionFamily
NCompressionFamily
)

// The available compression types.
// The available compression levels.
const (
DefaultCompression Compression = iota
NoCompression
SnappyCompression
ZstdCompression
MinlzCompression
NCompression
LevelDefault CompressionLevel = 0

// Zstd compression levels.
ZstdLevelMin CompressionLevel = 1
ZstdLevel1 CompressionLevel = 1
ZstdLevel2 CompressionLevel = 2
ZstdLevel3 CompressionLevel = 3 // Default for Zstd.
ZstdLevelDefault CompressionLevel = ZstdLevel3
ZstdLevel4 CompressionLevel = 4
ZstdLevel5 CompressionLevel = 5
ZstdLevel6 CompressionLevel = 6
ZstdLevel7 CompressionLevel = 7
ZstdLevel8 CompressionLevel = 8
ZstdLevel9 CompressionLevel = 9
ZstdLevel10 CompressionLevel = 10
ZstdLevel11 CompressionLevel = 11
ZstdLevel12 CompressionLevel = 12
ZstdLevel13 CompressionLevel = 13
ZstdLevel14 CompressionLevel = 14
ZstdLevel15 CompressionLevel = 15
ZstdLevel16 CompressionLevel = 16
ZstdLevel17 CompressionLevel = 17
ZstdLevel18 CompressionLevel = 18
ZstdLevel19 CompressionLevel = 19
ZstdLevel20 CompressionLevel = 20
ZstdLevel21 CompressionLevel = 21
ZstdLevel22 CompressionLevel = 22
ZstdLevelMax CompressionLevel = 22

// Minlz compression levels.
MinlzLevelMin CompressionLevel = 1
MinlzLevelFastest CompressionLevel = 1 // Default for MinLZ.
MinlzLevelDefault CompressionLevel = MinlzLevelFastest
MinlzLevelBalanced CompressionLevel = 2
MinlzLevelSmallest CompressionLevel = 3
MinlzLevelMax CompressionLevel = 3
)

var DefaultCompression = Compression{Family: DefaultCompressionFamily, Level: LevelDefault}
var NoCompression = Compression{Family: NoCompressionFamily, Level: LevelDefault}
var SnappyCompression = Compression{Family: SnappyCompressionFamily, Level: LevelDefault}
var DefaultZstdCompression = Compression{Family: ZstdCompressionFamily, Level: LevelDefault}
var DefaultMinlzCompression = Compression{Family: MinlzCompressionFamily, Level: LevelDefault}

var FamilyToDefaultCompression = map[CompressionFamily]Compression{
DefaultCompressionFamily: DefaultCompression,
NoCompressionFamily: NoCompression,
SnappyCompressionFamily: SnappyCompression,
ZstdCompressionFamily: DefaultZstdCompression,
}

// String implements fmt.Stringer, returning a human-readable name for the
// compression algorithm.
func (c Compression) String() string {
func (c CompressionFamily) String() string {
switch c {
case DefaultCompression:
case DefaultCompressionFamily:
return "Default"
case NoCompression:
case NoCompressionFamily:
return "NoCompression"
case SnappyCompression:
case SnappyCompressionFamily:
return "Snappy"
case ZstdCompression:
case ZstdCompressionFamily:
return "ZSTD"
case MinlzCompression:
case MinlzCompressionFamily:
return "Minlz"
default:
return "Unknown"
Expand All @@ -50,20 +115,20 @@ func (c Compression) String() string {

// CompressionFromString returns an sstable.Compression from its
// string representation. Inverse of c.String() above.
func CompressionFromString(s string) Compression {
func CompressionFromString(s string) CompressionFamily {
switch s {
case "Default":
return DefaultCompression
return DefaultCompressionFamily
case "NoCompression":
return NoCompression
return NoCompressionFamily
case "Snappy":
return SnappyCompression
return SnappyCompressionFamily
case "ZSTD":
return ZstdCompression
return ZstdCompressionFamily
case "Minlz":
return MinlzCompression
return MinlzCompressionFamily
default:
return DefaultCompression
return DefaultCompressionFamily
}
}

Expand Down Expand Up @@ -222,10 +287,10 @@ func CompressAndChecksum(
// Compress the buffer, discarding the result if the improvement isn't at
// least 12.5%.
algo := NoCompressionIndicator
if compression != NoCompression {
compressor := GetCompressor(compression)
if compression.Family != NoCompressionFamily {
compressor := GetCompressor(compression.Family)
defer compressor.Close()
algo, buf = compressor.Compress(buf, blockData)
algo, buf = compressor.Compress(buf, blockData, compression.Level)
if len(buf) >= len(blockData)-len(blockData)/8 {
algo = NoCompressionIndicator
}
Expand Down
14 changes: 7 additions & 7 deletions sstable/block/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestCompressionRoundtrip(t *testing.T) {
t.Logf("seed %d", seed)
rng := rand.New(rand.NewPCG(0, seed))

for compression := DefaultCompression + 1; compression < NCompression; compression++ {
if compression == NoCompression {
for compression := DefaultCompressionFamily + 1; compression < NCompressionFamily; compression++ {
if compression == NoCompressionFamily {
continue
}
t.Run(compression.String(), func(t *testing.T) {
Expand All @@ -40,7 +40,7 @@ func TestCompressionRoundtrip(t *testing.T) {
compressedBuf := make([]byte, 1+rng.IntN(1<<10 /* 1 KiB */))
compressor := GetCompressor(compression)
defer compressor.Close()
btyp, compressed := compressor.Compress(compressedBuf, payload)
btyp, compressed := compressor.Compress(compressedBuf, payload, LevelDefault)
v, err := decompress(btyp, compressed)
require.NoError(t, err)
got := payload
Expand Down Expand Up @@ -151,12 +151,12 @@ func TestMinlzEncodingLimit(t *testing.T) {
require.Fail(t, "Expected minlz.ErrTooLarge Error")
}

c := GetCompressor(MinlzCompression)
c := GetCompressor(MinlzCompressionFamily)
defer c.Close()
algo, _ := c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize-1))
algo, _ := c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize-1), MinlzLevelDefault)
require.Equal(t, algo, MinlzCompressionIndicator)
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize))
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize), MinlzLevelDefault)
require.Equal(t, algo, MinlzCompressionIndicator)
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize+1))
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize+1), MinlzLevelDefault)
require.Equal(t, algo, SnappyCompressionIndicator)
}
39 changes: 25 additions & 14 deletions sstable/block/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type Compressor interface {
Compress(dst, src []byte) (CompressionIndicator, []byte)
Compress(dst, src []byte, level CompressionLevel) (CompressionIndicator, []byte)

// Close must be called when the Compressor is no longer needed.
// After Close is called, the Compressor must not be used again.
Expand All @@ -25,42 +25,53 @@ var _ Compressor = noopCompressor{}
var _ Compressor = snappyCompressor{}
var _ Compressor = minlzCompressor{}

func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
func (noopCompressor) Compress(dst, src []byte, _ CompressionLevel) (CompressionIndicator, []byte) {
panic("NoCompressionCompressor.Compress() should not be called.")
}
func (noopCompressor) Close() {}

func (snappyCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
func (snappyCompressor) Compress(
dst, src []byte, _ CompressionLevel,
) (CompressionIndicator, []byte) {
dst = dst[:cap(dst):cap(dst)]
return SnappyCompressionIndicator, snappy.Encode(dst, src)
}

func (snappyCompressor) Close() {}

func (minlzCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
// Minlz cannot encode blocks greater than 8MB. Fall back to Snappy in those cases.
func (minlzCompressor) Compress(
dst, src []byte, level CompressionLevel,
) (CompressionIndicator, []byte) {
// Minlz cannot encode blocks greater than 8MiB. Fall back to Snappy in those cases.
if len(src) > minlz.MaxBlockSize {
return (snappyCompressor{}).Compress(dst, src)
return (snappyCompressor{}).Compress(dst, src, LevelDefault)
}

compressed, err := minlz.Encode(dst, src, minlz.LevelFastest)
var encoderLevel int
if level == LevelDefault {
encoderLevel = int(MinlzLevelDefault)
} else if level < MinlzLevelMin || level > MinlzLevelMax {
panic("minlz compression: illegal level")
} else {
encoderLevel = int(level)
}
compressed, err := minlz.Encode(dst, src, encoderLevel)
if err != nil {
panic(errors.Wrap(err, "minlz compression"))
panic(errors.Wrap(err, "Error while compressing using Minlz."))
}
return MinlzCompressionIndicator, compressed
}

func (minlzCompressor) Close() {}

func GetCompressor(c Compression) Compressor {
func GetCompressor(c CompressionFamily) Compressor {
switch c {
case NoCompression:
case NoCompressionFamily:
return noopCompressor{}
case SnappyCompression:
case SnappyCompressionFamily:
return snappyCompressor{}
case ZstdCompression:
case ZstdCompressionFamily:
return getZstdCompressor()
case MinlzCompression:
case MinlzCompressionFamily:
return minlzCompressor{}
default:
panic("Invalid compression type.")
Expand Down
Loading