Skip to content

opt(stream): add option to directly copy over tables from lower levels (#1700) #1872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
version: "2"
linters:
default: none
enable:
- errcheck
- gosec
- govet
- ineffassign
- lll
- staticcheck
- unconvert
- unused
settings:
gosec:
excludes:
- G114
- G204
- G306
- G404
- G115
lll:
line-length: 120
staticcheck:
checks:
- all
- -SA1019
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gofmt
- goimports
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
28 changes: 14 additions & 14 deletions .trunk/trunk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
version: 0.1

cli:
version: 1.22.10
version: 1.24.0

# Trunk provides extensibility via plugins. (https://docs.trunk.io/plugins)
plugins:
sources:
- id: trunk
ref: v1.6.7
ref: v1.7.1
uri: https://github.com/trunk-io/plugins

# Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes)
runtimes:
enabled:
- [email protected]
- node@18.20.5
- node@22.16.0
- [email protected]

# This is the section where you manage your linters. (https://docs.trunk.io/check/configuration)
Expand All @@ -26,23 +26,23 @@ lint:
paths:
- "*.pb.go"
enabled:
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected].369
- [email protected].447
- git-diff-check
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- svgo@3.3.2
- svgo@4.0.0
- [email protected]
- trufflehog@3.88.7
- yamllint@1.35.1
- trufflehog@3.89.2
- yamllint@1.37.1
actions:
enabled:
- trunk-announce
Expand Down
1 change: 1 addition & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,7 @@ func (db *DB) StreamDB(outOptions Options) error {
// Stream contents of DB to the output DB.
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
stream.FullCopy = true

stream.Send = func(buf *z.Buffer) error {
return writer.Write(buf)
Expand Down
20 changes: 10 additions & 10 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
// that the tables are sorted in the right order.
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
filterTables := func(tables []*table.Table) []*table.Table {
if opt.SinceTs > 0 {
tmp := tables[:0]
for _, t := range tables {
if t.MaxVersion() < opt.SinceTs {
continue
}
tmp = append(tmp, t)
if opt.SinceTs == 0 {
return tables
}
out := tables[:0]
for _, t := range tables {
if t.MaxVersion() < opt.SinceTs {
continue
}
tables = tmp
out = append(out, t)
}
return tables
return out
}

if len(opt.Prefix) == 0 {
Expand Down Expand Up @@ -480,7 +480,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
for i := 0; i < len(tables); i++ {
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
}
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
res := &Iterator{
txn: txn,
iitr: table.NewMergeIterator(iters, opt.Reverse),
Expand Down
82 changes: 46 additions & 36 deletions key_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"crypto/aes"
"crypto/rand"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"os"
Expand Down Expand Up @@ -328,17 +329,16 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
defer kr.Unlock()
// Key might have generated by another go routine. So,
// checking once again.
key, valid = validKey()
if valid {
if key, valid := validKey(); valid {
return key, nil
}
k := make([]byte, len(kr.opt.EncryptionKey))
iv, err := y.GenerateIV()
if err != nil {
return nil, err
}
_, err = rand.Read(k)
if err != nil {

if _, err := rand.Read(k); err != nil {
return nil, err
}
// Otherwise Increment the KeyID and generate new datakey.
Expand All @@ -349,25 +349,40 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
CreatedAt: time.Now().Unix(),
Iv: iv,
}
// Don't store the datakey on file if badger is running in InMemory mode.
if !kr.opt.InMemory {
// Store the datekey.
buf := &bytes.Buffer{}
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
return nil, err
}
// Persist the datakey to the disk
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
return nil, err
}
}
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
dk.Data = k
kr.lastCreated = dk.CreatedAt
kr.dataKeys[kr.nextKeyID] = dk
// Don't store the datakey on file if badger is running in InMemory mode.
if kr.opt.InMemory {
return dk, nil

}
// Store the datekey.
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
return nil, err
}
return dk, nil
}

func (kr *KeyRegistry) AddKey(dk *pb.DataKey) (uint64, error) {
// If we don't have a encryption key, we cannot store the datakey.
if len(kr.opt.EncryptionKey) == 0 {
return 0, errors.New("No encryption key found. Cannot add data key")
}

if _, ok := kr.dataKeys[dk.KeyId]; !ok {
// If KeyId does not exists already, then use the next available KeyId to store data key.
kr.nextKeyID++
dk.KeyId = kr.nextKeyID
}
kr.dataKeys[dk.KeyId] = dk

if kr.opt.InMemory {
return dk.KeyId, nil
}
// Store the datakey.
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
}

// Close closes the key registry.
func (kr *KeyRegistry) Close() error {
if !(kr.opt.ReadOnly || kr.opt.InMemory) {
Expand All @@ -377,38 +392,33 @@ func (kr *KeyRegistry) Close() error {
}

// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
func storeDataKey(w io.Writer, storageKey []byte, key *pb.DataKey) error {
// xor will encrypt the IV and xor with the given data.
// It'll used for both encryption and decryption.
xor := func() error {
if len(storageKey) == 0 {
return nil
}
var err error
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
return err
}

// In memory datakey will be plain text so encrypting before storing to the disk.
var err error
if err = xor(); err != nil {
if err := xor(); err != nil {
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
}
var data []byte
if data, err = proto.Marshal(k); err != nil {
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
var err2 error
// decrypting the datakey back.
if err2 = xor(); err2 != nil {
return y.Wrapf(err,
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
}
return err

data, err := proto.Marshal(key)
if err != nil {
return y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
}

var lenCrcBuf [8]byte
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
y.Check2(buf.Write(lenCrcBuf[:]))
y.Check2(buf.Write(data))
// Decrypting the datakey back since we're using the pointer.
return xor()
y.Check2(w.Write(lenCrcBuf[:]))
y.Check2(w.Write(data))
return nil
}
36 changes: 31 additions & 5 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
return maxVs, decr()
}

// appendIterators appends iterators to an array of iterators, for merging.
// iterators returns an array of iterators, for merging.
// Note: This obtains references for the table handlers. Remember to close these iterators.
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
s.RLock()
defer s.RUnlock()

Expand All @@ -313,14 +313,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
out = append(out, t)
}
}
return appendIteratorsReversed(iters, out, topt)
return iteratorsReversed(out, topt)
}

tables := opt.pickTables(s.tables)
if len(tables) == 0 {
return iters
return nil
}
return append(iters, table.NewConcatIterator(tables, topt))
return []y.Iterator{table.NewConcatIterator(tables, topt)}
}

func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
if opt.Reverse {
panic("Invalid option for getTables")
}

s.RLock()
defer s.RUnlock()

if s.level == 0 {
var out []*table.Table
for _, t := range s.tables {
if opt.pickTable(t) {
t.IncrRef()
out = append(out, t)
}
}
return out
}

tables := opt.pickTables(s.tables)
for _, t := range tables {
t.IncrRef()
}
return tables
}

type levelHandlerRLocked struct{}
Expand Down
Loading
Loading