Skip to content

teach pebble about block level prefix synthesis #3350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,9 @@ type ExternalFile struct {
// is accessed as if those keys all instead have prefix SyntheticPrefix.
//
// SyntheticPrefix must be a prefix of both SmallestUserKey and LargestUserKey.
//
// NB: If the SyntheticPrefix is non-empty and the ContentPrefix is empty,
// then the read path will conduct block level prefix synthesis.
ContentPrefix, SyntheticPrefix []byte

// SyntheticSuffix will replace the suffix of every key in the file during
Expand Down
5 changes: 5 additions & 0 deletions internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,14 @@ func (m *FileMetadata) SyntheticSeqNum() sstable.SyntheticSeqNum {

// IterTransforms returns an sstable.IterTransforms that has SyntheticSeqNum set as needed.
func (m *FileMetadata) IterTransforms() sstable.IterTransforms {
var syntheticPrefix []byte
if m.PrefixReplacement != nil && !m.PrefixReplacement.UsePrefixReplacementIterator() {
syntheticPrefix = m.PrefixReplacement.SyntheticPrefix
}
return sstable.IterTransforms{
SyntheticSeqNum: m.SyntheticSeqNum(),
SyntheticSuffix: m.SyntheticSuffix,
SyntheticPrefix: syntheticPrefix,
}
}

Expand Down
75 changes: 65 additions & 10 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package sstable

import (
"bytes"
"context"
"encoding/binary"
"slices"
"unsafe"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -416,7 +418,9 @@ type blockIter struct {
// replacement rule, this will not be the case. Therefore, key should never
// be used after ikey is set.
key []byte
// fullKey is a buffer used for key prefix decompression.
// fullKey is a buffer used for key prefix decompression. Note that if
// transforms.SyntheticPrifix is not nil, fullKey always starts with that
// prefix.
fullKey []byte
// val contains the value the iterator is currently pointed at. If non-nil,
// this points to a slice of the block data.
Expand Down Expand Up @@ -454,7 +458,8 @@ type blockIter struct {
vbr *valueBlockReader
hasValuePrefix bool
}
synthSuffixBuf []byte
synthSuffixBuf []byte
firstUserKeyWithPrefixBuf []byte
}

// blockIter implements the base.InternalIterator interface.
Expand Down Expand Up @@ -484,7 +489,11 @@ func (i *blockIter) init(cmp Compare, split Split, block block, transforms IterT
i.numRestarts = numRestarts
i.ptr = unsafe.Pointer(&block[0])
i.data = block
i.fullKey = i.fullKey[:0]
if i.transforms.SyntheticPrefix != nil {
i.fullKey = append(i.fullKey[:0], i.transforms.SyntheticPrefix...)
} else {
i.fullKey = i.fullKey[:0]
}
i.val = nil
i.clearCache()
if i.restarts > 0 {
Expand Down Expand Up @@ -528,10 +537,11 @@ func (i *blockIter) isDataInvalidated() bool {

func (i *blockIter) resetForReuse() blockIter {
return blockIter{
fullKey: i.fullKey[:0],
cached: i.cached[:0],
cachedBuf: i.cachedBuf[:0],
data: nil,
fullKey: i.fullKey[:0],
cached: i.cached[:0],
cachedBuf: i.cachedBuf[:0],
firstUserKeyWithPrefixBuf: i.firstUserKeyWithPrefixBuf[:0],
data: nil,
}
}

Expand Down Expand Up @@ -602,7 +612,7 @@ func (i *blockIter) readEntry() {
value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

shared += uint32(len(i.transforms.SyntheticPrefix))
unsharedKey := getBytes(ptr, int(unshared))
// TODO(sumeer): move this into the else block below.
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
Expand Down Expand Up @@ -679,6 +689,12 @@ func (i *blockIter) readFirstKey() error {
i.firstUserKey = nil
return base.CorruptionErrorf("pebble/table: invalid firstKey in block")
}
if i.transforms.SyntheticPrefix != nil {
i.firstUserKeyWithPrefixBuf = slices.Grow(i.firstUserKeyWithPrefixBuf[:0], len(i.transforms.SyntheticPrefix)+len(i.firstUserKey))
i.firstUserKeyWithPrefixBuf = append(i.firstUserKeyWithPrefixBuf, i.transforms.SyntheticPrefix...)
i.firstUserKeyWithPrefixBuf = append(i.firstUserKeyWithPrefixBuf, i.firstUserKey...)
i.firstUserKey = i.firstUserKeyWithPrefixBuf
}
return nil
}

Expand Down Expand Up @@ -757,6 +773,24 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
if invariants.Enabled && i.isDataInvalidated() {
panic(errors.AssertionFailedf("invalidated blockIter used"))
}
searchKey := key
if i.transforms.SyntheticPrefix != nil {
// The seek key is before or after the entire block of keys that start with
// SyntheticPrefix. To determine which, we need to compare against a valid
// key in the block. We use firstUserKey which has the synthetic prefix.
if !bytes.HasPrefix(key, i.transforms.SyntheticPrefix) {
if i.cmp(i.firstUserKey, key) >= 0 {
return i.First()
}
// Set the offset to the end of the block to mimic the offset of an
// invalid iterator. This ensures a subsequent i.Prev() returns a valid
// result.
i.offset = i.restarts
i.nextOffset = i.restarts
return nil, base.LazyValue{}
}
searchKey = key[len(i.transforms.SyntheticPrefix):]
}

i.clearCache()
// Find the index of the smallest restart point whose key is > the key
Expand Down Expand Up @@ -821,7 +855,7 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
}
// Else k is invalid, and left as nil

if i.cmp(key, k) > 0 {
if i.cmp(searchKey, k) > 0 {
// The search key is greater than the user key at this restart point.
// Search beyond this restart point, since we are trying to find the
// first restart point with a user key >= the search key.
Expand Down Expand Up @@ -921,6 +955,24 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
if invariants.Enabled && i.isDataInvalidated() {
panic(errors.AssertionFailedf("invalidated blockIter used"))
}
searchKey := key
if i.transforms.SyntheticPrefix != nil {
// The seek key is before or after the entire block of keys that start with
// SyntheticPrefix. To determine which, we need to compare against a valid
// key in the block. We use firstUserKey which has the synthetic prefix.
if !bytes.HasPrefix(key, i.transforms.SyntheticPrefix) {
if i.cmp(i.firstUserKey, key) < 0 {
return i.Last()
}
// Set the offset to the beginning of the block to mimic an exhausted
// iterator that has conducted backward interation. This ensures a
// subsequent Next() call returns the first key in the block.
i.offset = -1
i.nextOffset = 0
return nil, base.LazyValue{}
}
searchKey = key[len(i.transforms.SyntheticPrefix):]
}

i.clearCache()
// Find the index of the smallest restart point whose key is >= the key
Expand Down Expand Up @@ -985,7 +1037,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
}
// Else k is invalid, and left as nil

if i.cmp(key, k) > 0 {
if i.cmp(searchKey, k) > 0 {
// The search key is greater than the user key at this restart point.
// Search beyond this restart point, since we are trying to find the
// first restart point with a user key >= the search key.
Expand Down Expand Up @@ -1414,6 +1466,9 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue)
value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}
if i.transforms.SyntheticPrefix != nil {
shared += uint32(len(i.transforms.SyntheticPrefix))
}
// The starting position of the value.
valuePtr := unsafe.Pointer(uintptr(ptr) + uintptr(unshared))
i.nextOffset = int32(uintptr(valuePtr)-uintptr(i.ptr)) + int32(value)
Expand Down
Loading