Skip to content

Commit 4fac806

Browse files
manishrjainharshil-goel
authored andcommitted
feat(Skiplist): Introduce a way to hand over skiplists to Badger (#1696)
In Dgraph, we already use Raft write-ahead log. Also, when we commit transactions, we update tens of thousands of keys in one go. To optimize this write path, this PR introduces a way to directly hand over Skiplist to Badger, short circuiting Badger's Value Log and WAL. This feature allows Dgraph to generate Skiplists while processing mutations and just hand them over to Badger during commits. It also accepts a callback which can be run when Skiplist is written to disk. This is useful for determining when to create a snapshot in Dgraph.
1 parent 4e7c745 commit 4fac806

File tree

13 files changed

+215
-50
lines changed

13 files changed

+215
-50
lines changed

backup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func TestBackupLoadIncremental(t *testing.T) {
435435
if err := txn.SetEntry(entry); err != nil {
436436
return err
437437
}
438-
updates[i] = bitDiscardEarlierVersions
438+
updates[i] = BitDiscardEarlierVersions
439439
}
440440
return nil
441441
})

db.go

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -783,16 +783,9 @@ var requestPool = sync.Pool{
783783
}
784784

785785
func (db *DB) writeToLSM(b *request) error {
786-
// We should check the length of b.Prts and b.Entries only when badger is not
787-
// running in InMemory mode. In InMemory mode, we don't write anything to the
788-
// value log and that's why the length of b.Ptrs will always be zero.
789-
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
790-
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
791-
}
792-
793786
for i, entry := range b.Entries {
794787
var err error
795-
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
788+
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
796789
// Will include deletion / tombstone case.
797790
err = db.mt.Put(entry.Key,
798791
y.ValueStruct{
@@ -838,10 +831,13 @@ func (db *DB) writeRequests(reqs []*request) error {
838831
}
839832
}
840833
db.opt.Debugf("writeRequests called. Writing to value log")
841-
err := db.vlog.write(reqs)
842-
if err != nil {
843-
done(err)
844-
return err
834+
if !db.opt.managedTxns {
835+
// Don't do value log writes in managed mode.
836+
err := db.vlog.write(reqs)
837+
if err != nil {
838+
done(err)
839+
return err
840+
}
845841
}
846842

847843
db.opt.Debugf("Writing to memtable")
@@ -852,6 +848,7 @@ func (db *DB) writeRequests(reqs []*request) error {
852848
}
853849
count += len(b.Entries)
854850
var i uint64
851+
var err error
855852
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
856853
i++
857854
if i%100 == 0 {
@@ -1035,16 +1032,61 @@ func (db *DB) ensureRoomForWrite() error {
10351032
}
10361033
}
10371034

1035+
func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
1036+
if !db.opt.managedTxns {
1037+
panic("Handover Skiplist is only available in managed mode.")
1038+
}
1039+
db.lock.Lock()
1040+
defer db.lock.Unlock()
1041+
1042+
// If we have some data in db.mt, we should push that first, so the ordering of writes is
1043+
// maintained.
1044+
if !db.mt.sl.Empty() {
1045+
sz := db.mt.sl.MemSize()
1046+
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
1047+
var err error
1048+
select {
1049+
case db.flushChan <- flushTask{mt: db.mt}:
1050+
db.imm = append(db.imm, db.mt)
1051+
db.mt, err = db.newMemTable()
1052+
if err != nil {
1053+
return y.Wrapf(err, "cannot push current memtable")
1054+
}
1055+
default:
1056+
return errNoRoom
1057+
}
1058+
}
1059+
1060+
mt := &memTable{sl: skl}
1061+
select {
1062+
case db.flushChan <- flushTask{mt: mt, cb: callback}:
1063+
db.imm = append(db.imm, mt)
1064+
return nil
1065+
default:
1066+
return errNoRoom
1067+
}
1068+
}
1069+
10381070
func arenaSize(opt Options) int64 {
10391071
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
10401072
}
10411073

1074+
func (db *DB) NewSkiplist() *skl.Skiplist {
1075+
return skl.NewSkiplist(arenaSize(db.opt))
1076+
}
1077+
10421078
// buildL0Table builds a new table from the memtable.
10431079
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
1044-
iter := ft.mt.sl.NewIterator()
1080+
var iter y.Iterator
1081+
if ft.itr != nil {
1082+
iter = ft.itr
1083+
} else {
1084+
iter = ft.mt.sl.NewUniIterator(false)
1085+
}
10451086
defer iter.Close()
1087+
10461088
b := table.NewTableBuilder(bopts)
1047-
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
1089+
for iter.Rewind(); iter.Valid(); iter.Next() {
10481090
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
10491091
continue
10501092
}
@@ -1060,16 +1102,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10601102

10611103
type flushTask struct {
10621104
mt *memTable
1105+
cb func()
1106+
itr y.Iterator
10631107
dropPrefixes [][]byte
10641108
}
10651109

10661110
// handleFlushTask must be run serially.
10671111
func (db *DB) handleFlushTask(ft flushTask) error {
1068-
// There can be a scenario, when empty memtable is flushed.
1069-
if ft.mt.sl.Empty() {
1070-
return nil
1071-
}
1072-
1112+
// ft.mt could be nil with ft.itr being the valid field.
10731113
bopts := buildTableOptions(db)
10741114
builder := buildL0Table(ft, bopts)
10751115
defer builder.Close()
@@ -1105,11 +1145,52 @@ func (db *DB) handleFlushTask(ft flushTask) error {
11051145
func (db *DB) flushMemtable(lc *z.Closer) error {
11061146
defer lc.Done()
11071147

1148+
var sz int64
1149+
var itrs []y.Iterator
1150+
var mts []*memTable
1151+
var cbs []func()
1152+
slurp := func() {
1153+
for {
1154+
select {
1155+
case more := <-db.flushChan:
1156+
if more.mt == nil {
1157+
return
1158+
}
1159+
sl := more.mt.sl
1160+
itrs = append(itrs, sl.NewUniIterator(false))
1161+
mts = append(mts, more.mt)
1162+
cbs = append(cbs, more.cb)
1163+
1164+
sz += sl.MemSize()
1165+
if sz > db.opt.MemTableSize {
1166+
return
1167+
}
1168+
default:
1169+
return
1170+
}
1171+
}
1172+
}
1173+
11081174
for ft := range db.flushChan {
11091175
if ft.mt == nil {
11101176
// We close db.flushChan now, instead of sending a nil ft.mt.
11111177
continue
11121178
}
1179+
sz = ft.mt.sl.MemSize()
1180+
// Reset of itrs, mts etc. is being done below.
1181+
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
1182+
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
1183+
mts = append(mts, ft.mt)
1184+
cbs = append(cbs, ft.cb)
1185+
1186+
// Pick more memtables, so we can really fill up the L0 table.
1187+
slurp()
1188+
1189+
// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1190+
ft.mt = nil
1191+
ft.itr = table.NewMergeIterator(itrs, false)
1192+
ft.cb = nil
1193+
11131194
for {
11141195
err := db.handleFlushTask(ft)
11151196
if err == nil {
@@ -1120,17 +1201,26 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
11201201
// which would arrive here would match db.imm[0], because we acquire a
11211202
// lock over DB when pushing to flushChan.
11221203
// TODO: This logic is dirty AF. Any change and this could easily break.
1123-
y.AssertTrue(ft.mt == db.imm[0])
1124-
db.imm = db.imm[1:]
1125-
ft.mt.DecrRef() // Return memory.
1204+
for _, mt := range mts {
1205+
y.AssertTrue(mt == db.imm[0])
1206+
db.imm = db.imm[1:]
1207+
mt.DecrRef() // Return memory.
1208+
}
11261209
db.lock.Unlock()
11271210

1211+
for _, cb := range cbs {
1212+
if cb != nil {
1213+
cb()
1214+
}
1215+
}
11281216
break
11291217
}
11301218
// Encountered error. Retry indefinitely.
11311219
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
11321220
time.Sleep(time.Second)
11331221
}
1222+
// Reset everything.
1223+
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
11341224
}
11351225
return nil
11361226
}

iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (item *Item) IsDeletedOrExpired() bool {
133133
// DiscardEarlierVersions returns whether the item was created with the
134134
// option to discard earlier versions of a key when multiple are available.
135135
func (item *Item) DiscardEarlierVersions() bool {
136-
return item.meta&bitDiscardEarlierVersions > 0
136+
return item.meta&BitDiscardEarlierVersions > 0
137137
}
138138

139139
func (item *Item) yieldItemValue() ([]byte, func(), error) {

levels.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
724724
}
725725
lastKey = y.SafeCopy(lastKey, it.Key())
726726
numVersions = 0
727-
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
727+
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0
728728

729729
if len(tableKr.left) == 0 {
730730
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
@@ -761,7 +761,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
761761
// - The `discardEarlierVersions` bit is set OR
762762
// - We've already processed `NumVersionsToKeep` number of versions
763763
// (including the current item being processed)
764-
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
764+
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
765765
numVersions == s.kv.opt.NumVersionsToKeep
766766

767767
if isExpired || lastValidVersion {

levels_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -696,11 +696,11 @@ func TestDiscardFirstVersion(t *testing.T) {
696696

697697
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
698698
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
699-
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
699+
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
700700
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
701701
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
702702
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
703-
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
703+
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}
704704

705705
// Level 0 has all the tables.
706706
createAndOpen(db, l0, 0)
@@ -731,11 +731,11 @@ func TestDiscardFirstVersion(t *testing.T) {
731731
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
732732
// marker so IT WILL BE REMOVED.
733733
ExpectedKeys := []keyValVersion{
734-
{"foo", "bar", 10, bitDiscardEarlierVersions},
734+
{"foo", "bar", 10, BitDiscardEarlierVersions},
735735
{"foo", "bar", 9, 0},
736736
{"foo", "bar", 4, 0},
737737
{"foo", "bar", 3, 0},
738-
{"foo", "bar", 2, bitDiscardEarlierVersions}}
738+
{"foo", "bar", 2, BitDiscardEarlierVersions}}
739739

740740
getAllAndCheck(t, db, ExpectedKeys)
741741
})
@@ -1049,15 +1049,15 @@ func TestSameLevel(t *testing.T) {
10491049
opt.LmaxCompaction = true
10501050
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
10511051
l6 := []keyValVersion{
1052-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1052+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10531053
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
10541054
}
10551055
l61 := []keyValVersion{
1056-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1056+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
10571057
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
10581058
}
10591059
l62 := []keyValVersion{
1060-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1060+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
10611061
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
10621062
}
10631063
createAndOpen(db, l6, 6)
@@ -1066,11 +1066,11 @@ func TestSameLevel(t *testing.T) {
10661066
require.NoError(t, db.lc.validate())
10671067

10681068
getAllAndCheck(t, db, []keyValVersion{
1069-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1069+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10701070
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
1071-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1071+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
10721072
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
1073-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1073+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
10741074
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
10751075
})
10761076

@@ -1086,11 +1086,11 @@ func TestSameLevel(t *testing.T) {
10861086
db.SetDiscardTs(3)
10871087
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
10881088
getAllAndCheck(t, db, []keyValVersion{
1089-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1089+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10901090
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
1091-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1091+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
10921092
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
1093-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1093+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
10941094
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
10951095
})
10961096

@@ -1107,9 +1107,9 @@ func TestSameLevel(t *testing.T) {
11071107
cdef.t.baseLevel = 1
11081108
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
11091109
getAllAndCheck(t, db, []keyValVersion{
1110-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
1111-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
1112-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
1110+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
1111+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
1112+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
11131113
require.NoError(t, db.lc.validate())
11141114
})
11151115
}
@@ -1255,7 +1255,7 @@ func TestStaleDataCleanup(t *testing.T) {
12551255
for i := count; i > 0; i-- {
12561256
var meta byte
12571257
if i == 0 {
1258-
meta = bitDiscardEarlierVersions
1258+
meta = BitDiscardEarlierVersions
12591259
}
12601260
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
12611261
}

0 commit comments

Comments
 (0)