Skip to content

Commit 88c37d9

Browse files
committed
opt(stream): add option to directly copy over tables from lower levels (#1700)
Also takes a bug fix from PR #1712, commit 58d0674 This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs. For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels. To use this option, the following options should be set in Stream. stream.KeyToList = nil stream.ChooseKey = nil stream.SinceTs = 0 db.managedTxns = true If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes.
1 parent 22ff83d commit 88c37d9

16 files changed

+789
-196
lines changed

.golangci.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
version: "2"
2+
linters:
3+
default: none
4+
enable:
5+
- errcheck
6+
- gosec
7+
- govet
8+
- ineffassign
9+
- lll
10+
- staticcheck
11+
- unconvert
12+
- unused
13+
settings:
14+
gosec:
15+
excludes:
16+
- G114
17+
- G204
18+
- G306
19+
- G404
20+
- G115
21+
lll:
22+
line-length: 120
23+
staticcheck:
24+
checks:
25+
- all
26+
- -SA1019
27+
exclusions:
28+
generated: lax
29+
presets:
30+
- comments
31+
- common-false-positives
32+
- legacy
33+
- std-error-handling
34+
paths:
35+
- third_party$
36+
- builtin$
37+
- examples$
38+
formatters:
39+
enable:
40+
- gofmt
41+
- goimports
42+
exclusions:
43+
generated: lax
44+
paths:
45+
- third_party$
46+
- builtin$
47+
- examples$

.trunk/trunk.yaml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,20 @@
33
version: 0.1
44

55
cli:
6-
version: 1.22.10
6+
version: 1.24.0
77

88
# Trunk provides extensibility via plugins. (https://docs.trunk.io/plugins)
99
plugins:
1010
sources:
1111
- id: trunk
12-
ref: v1.6.7
12+
ref: v1.7.1
1313
uri: https://github.com/trunk-io/plugins
1414

1515
# Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes)
1616
runtimes:
1717
enabled:
1818
19-
- node@18.20.5
19+
- node@22.16.0
2020
2121

2222
# This is the section where you manage your linters. (https://docs.trunk.io/check/configuration)
@@ -26,23 +26,23 @@ lint:
2626
paths:
2727
- "*.pb.go"
2828
enabled:
29-
30-
29+
30+
31+
3132
32-
33+
3334
- git-diff-check
3435
35-
36-
37-
38-
39-
36+
37+
38+
39+
4040
4141
42-
- svgo@3.3.2
42+
- svgo@4.0.0
4343
44-
- trufflehog@3.88.7
45-
- yamllint@1.35.1
44+
- trufflehog@3.89.2
45+
- yamllint@1.37.1
4646
actions:
4747
enabled:
4848
- trunk-announce

db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1993,6 +1993,7 @@ func (db *DB) StreamDB(outOptions Options) error {
19931993
// Stream contents of DB to the output DB.
19941994
stream := db.NewStreamAt(math.MaxUint64)
19951995
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
1996+
stream.FullCopy = true
19961997

19971998
stream.Send = func(buf *z.Buffer) error {
19981999
return writer.Write(buf)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/dustin/go-humanize v1.0.1
1111
github.com/google/flatbuffers v25.2.10+incompatible
1212
github.com/klauspost/compress v1.18.0
13+
github.com/pkg/errors v0.9.1
1314
github.com/spf13/cobra v1.9.1
1415
github.com/stretchr/testify v1.10.0
1516
go.opentelemetry.io/contrib/zpages v0.61.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
2828
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
2929
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3030
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
31+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
32+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
3133
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3234
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3335
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=

iterator.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -355,17 +355,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
355355
// that the tables are sorted in the right order.
356356
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
357357
filterTables := func(tables []*table.Table) []*table.Table {
358-
if opt.SinceTs > 0 {
359-
tmp := tables[:0]
360-
for _, t := range tables {
361-
if t.MaxVersion() < opt.SinceTs {
362-
continue
363-
}
364-
tmp = append(tmp, t)
358+
if opt.SinceTs == 0 {
359+
return tables
360+
}
361+
out := tables[:0]
362+
for _, t := range tables {
363+
if t.MaxVersion() < opt.SinceTs {
364+
continue
365365
}
366-
tables = tmp
366+
out = append(out, t)
367367
}
368-
return tables
368+
return out
369369
}
370370

371371
if len(opt.Prefix) == 0 {
@@ -480,7 +480,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
480480
for i := 0; i < len(tables); i++ {
481481
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
482482
}
483-
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
483+
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
484484
res := &Iterator{
485485
txn: txn,
486486
iitr: table.NewMergeIterator(iters, opt.Reverse),

key_registry.go

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"crypto/aes"
1111
"crypto/rand"
1212
"encoding/binary"
13+
"errors"
1314
"hash/crc32"
1415
"io"
1516
"os"
@@ -328,17 +329,16 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
328329
defer kr.Unlock()
329330
// Key might have generated by another go routine. So,
330331
// checking once again.
331-
key, valid = validKey()
332-
if valid {
332+
if key, valid := validKey(); valid {
333333
return key, nil
334334
}
335335
k := make([]byte, len(kr.opt.EncryptionKey))
336336
iv, err := y.GenerateIV()
337337
if err != nil {
338338
return nil, err
339339
}
340-
_, err = rand.Read(k)
341-
if err != nil {
340+
341+
if _, err := rand.Read(k); err != nil {
342342
return nil, err
343343
}
344344
// Otherwise Increment the KeyID and generate new datakey.
@@ -349,25 +349,40 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
349349
CreatedAt: time.Now().Unix(),
350350
Iv: iv,
351351
}
352-
// Don't store the datakey on file if badger is running in InMemory mode.
353-
if !kr.opt.InMemory {
354-
// Store the datekey.
355-
buf := &bytes.Buffer{}
356-
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
357-
return nil, err
358-
}
359-
// Persist the datakey to the disk
360-
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
361-
return nil, err
362-
}
363-
}
364-
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
365-
dk.Data = k
366352
kr.lastCreated = dk.CreatedAt
367353
kr.dataKeys[kr.nextKeyID] = dk
354+
// Don't store the datakey on file if badger is running in InMemory mode.
355+
if kr.opt.InMemory {
356+
return dk, nil
357+
358+
}
359+
// Store the datekey.
360+
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
361+
return nil, err
362+
}
368363
return dk, nil
369364
}
370365

366+
func (kr *KeyRegistry) AddKey(dk *pb.DataKey) (uint64, error) {
367+
// If we don't have a encryption key, we cannot store the datakey.
368+
if len(kr.opt.EncryptionKey) == 0 {
369+
return 0, errors.New("No encryption key found. Cannot add data key")
370+
}
371+
372+
if _, ok := kr.dataKeys[dk.KeyId]; !ok {
373+
// If KeyId does not exists already, then use the next available KeyId to store data key.
374+
kr.nextKeyID++
375+
dk.KeyId = kr.nextKeyID
376+
}
377+
kr.dataKeys[dk.KeyId] = dk
378+
379+
if kr.opt.InMemory {
380+
return dk.KeyId, nil
381+
}
382+
// Store the datakey.
383+
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
384+
}
385+
371386
// Close closes the key registry.
372387
func (kr *KeyRegistry) Close() error {
373388
if !(kr.opt.ReadOnly || kr.opt.InMemory) {
@@ -377,38 +392,33 @@ func (kr *KeyRegistry) Close() error {
377392
}
378393

379394
// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
380-
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
395+
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
396+
func storeDataKey(w io.Writer, storageKey []byte, key *pb.DataKey) error {
381397
// xor will encrypt the IV and xor with the given data.
382398
// It'll used for both encryption and decryption.
383399
xor := func() error {
384400
if len(storageKey) == 0 {
385401
return nil
386402
}
387403
var err error
388-
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
404+
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
389405
return err
390406
}
407+
391408
// In memory datakey will be plain text so encrypting before storing to the disk.
392-
var err error
393-
if err = xor(); err != nil {
409+
if err := xor(); err != nil {
394410
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
395411
}
396-
var data []byte
397-
if data, err = proto.Marshal(k); err != nil {
398-
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
399-
var err2 error
400-
// decrypting the datakey back.
401-
if err2 = xor(); err2 != nil {
402-
return y.Wrapf(err,
403-
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
404-
}
405-
return err
412+
413+
data, err := proto.Marshal(key)
414+
if err != nil {
415+
return y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
406416
}
417+
407418
var lenCrcBuf [8]byte
408419
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
409420
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
410-
y.Check2(buf.Write(lenCrcBuf[:]))
411-
y.Check2(buf.Write(data))
412-
// Decrypting the datakey back since we're using the pointer.
413-
return xor()
421+
y.Check2(w.Write(lenCrcBuf[:]))
422+
y.Check2(w.Write(data))
423+
return nil
414424
}

level_handler.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
293293
return maxVs, decr()
294294
}
295295

296-
// appendIterators appends iterators to an array of iterators, for merging.
296+
// iterators returns an array of iterators, for merging.
297297
// Note: This obtains references for the table handlers. Remember to close these iterators.
298-
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
298+
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
299299
s.RLock()
300300
defer s.RUnlock()
301301

@@ -313,14 +313,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
313313
out = append(out, t)
314314
}
315315
}
316-
return appendIteratorsReversed(iters, out, topt)
316+
return iteratorsReversed(out, topt)
317317
}
318318

319319
tables := opt.pickTables(s.tables)
320320
if len(tables) == 0 {
321-
return iters
321+
return nil
322322
}
323-
return append(iters, table.NewConcatIterator(tables, topt))
323+
return []y.Iterator{table.NewConcatIterator(tables, topt)}
324+
}
325+
326+
func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
327+
if opt.Reverse {
328+
panic("Invalid option for getTables")
329+
}
330+
331+
s.RLock()
332+
defer s.RUnlock()
333+
334+
if s.level == 0 {
335+
var out []*table.Table
336+
for _, t := range s.tables {
337+
if opt.pickTable(t) {
338+
t.IncrRef()
339+
out = append(out, t)
340+
}
341+
}
342+
return out
343+
}
344+
345+
tables := opt.pickTables(s.tables)
346+
for _, t := range tables {
347+
t.IncrRef()
348+
}
349+
return tables
324350
}
325351

326352
type levelHandlerRLocked struct{}

0 commit comments

Comments
 (0)