Skip to content

Commit 908259a

Browse files
committed
fix(stream): improve incremental stream writer
This commit fails incremental stream writer if memtable has data. It also allows calling incremental stream writer more times than the number of levels we have. This is done by calling Flatten when we have want to write and L0 already has data.
1 parent d85287b commit 908259a

File tree

2 files changed

+96
-7
lines changed

2 files changed

+96
-7
lines changed

stream_writer.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ func (sw *StreamWriter) PrepareIncremental() error {
104104
}
105105
sw.done = func() { once.Do(done) }
106106

107+
mts, decr := sw.db.getMemTables()
108+
defer decr()
109+
for _, m := range mts {
110+
if !m.sl.Empty() {
111+
return fmt.Errorf("Unable to do incremental writes because MemTable has data")
112+
}
113+
}
114+
107115
isEmptyDB := true
108116
for _, level := range sw.db.Levels() {
109117
if level.NumTables > 0 {
@@ -117,7 +125,13 @@ func (sw *StreamWriter) PrepareIncremental() error {
117125
return nil
118126
}
119127
if sw.prevLevel == 0 {
120-
return fmt.Errorf("Unable to do incremental writes because L0 has data")
128+
// It seems that data is present in all levels from Lmax to L0. If we call flatten
129+
// on the tree, all the data will go to Lmax. All the levels above will be empty
130+
// after flatten call. Now, we should be able to use incremental stream writer again.
131+
if err := sw.db.Flatten(3); err != nil {
132+
return errors.Wrapf(err, "error during flatten in StreamWriter")
133+
}
134+
sw.prevLevel = len(sw.db.Levels()) - 1
121135
}
122136
return nil
123137
}

stream_writer_test.go

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
602602
}
603603

604604
func TestStreamWriterIncremental(t *testing.T) {
605-
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
605+
addIncremental := func(t *testing.T, db *DB, keys [][]byte) {
606606
buf := z.NewBuffer(10<<20, "test")
607607
defer func() { require.NoError(t, buf.Release()) }()
608608
for _, key := range keys {
@@ -633,7 +633,7 @@ func TestStreamWriterIncremental(t *testing.T) {
633633
require.NoError(t, sw.Write(buf), "sw.Write() failed")
634634
require.NoError(t, sw.Flush(), "sw.Flush() failed")
635635

636-
addIncremtal(t, db, [][]byte{[]byte("key-2")})
636+
addIncremental(t, db, [][]byte{[]byte("key-2")})
637637

638638
txn := db.NewTransaction(false)
639639
defer txn.Discard()
@@ -646,7 +646,7 @@ func TestStreamWriterIncremental(t *testing.T) {
646646

647647
t.Run("incremental on empty DB", func(t *testing.T) {
648648
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
649-
addIncremtal(t, db, [][]byte{[]byte("key-1")})
649+
addIncremental(t, db, [][]byte{[]byte("key-1")})
650650
txn := db.NewTransaction(false)
651651
defer txn.Discard()
652652
_, err := txn.Get([]byte("key-1"))
@@ -656,9 +656,9 @@ func TestStreamWriterIncremental(t *testing.T) {
656656

657657
t.Run("multiple incremental", func(t *testing.T) {
658658
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
659-
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
660-
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
661-
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
659+
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
660+
addIncremental(t, db, [][]byte{[]byte("a2"), []byte("c2")})
661+
addIncremental(t, db, [][]byte{[]byte("a3"), []byte("c3")})
662662
txn := db.NewTransaction(false)
663663
defer txn.Discard()
664664
_, err := txn.Get([]byte("a1"))
@@ -675,4 +675,79 @@ func TestStreamWriterIncremental(t *testing.T) {
675675
require.NoError(t, err)
676676
})
677677
})
678+
679+
t.Run("write between incremental writes", func(t *testing.T) {
680+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
681+
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
682+
require.NoError(t, db.Update(func(txn *Txn) error {
683+
return txn.Set([]byte("a3"), []byte("c3"))
684+
}))
685+
686+
sw := db.NewStreamWriter()
687+
defer sw.Cancel()
688+
require.EqualError(t, sw.PrepareIncremental(), "Unable to do incremental writes because MemTable has data")
689+
690+
txn := db.NewTransaction(false)
691+
defer txn.Discard()
692+
_, err := txn.Get([]byte("a1"))
693+
require.NoError(t, err)
694+
_, err = txn.Get([]byte("c1"))
695+
require.NoError(t, err)
696+
_, err = txn.Get([]byte("a3"))
697+
require.NoError(t, err)
698+
})
699+
})
700+
701+
t.Run("incremental writes > #levels", func(t *testing.T) {
702+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
703+
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
704+
addIncremental(t, db, [][]byte{[]byte("a2"), []byte("c2")})
705+
addIncremental(t, db, [][]byte{[]byte("a3"), []byte("c3")})
706+
addIncremental(t, db, [][]byte{[]byte("a4"), []byte("c4")})
707+
addIncremental(t, db, [][]byte{[]byte("a5"), []byte("c5")})
708+
addIncremental(t, db, [][]byte{[]byte("a6"), []byte("c6")})
709+
addIncremental(t, db, [][]byte{[]byte("a7"), []byte("c7")})
710+
addIncremental(t, db, [][]byte{[]byte("a8"), []byte("c8")})
711+
addIncremental(t, db, [][]byte{[]byte("a9"), []byte("c9")})
712+
713+
txn := db.NewTransaction(false)
714+
defer txn.Discard()
715+
_, err := txn.Get([]byte("a1"))
716+
require.NoError(t, err)
717+
_, err = txn.Get([]byte("c1"))
718+
require.NoError(t, err)
719+
_, err = txn.Get([]byte("a2"))
720+
require.NoError(t, err)
721+
_, err = txn.Get([]byte("c2"))
722+
require.NoError(t, err)
723+
_, err = txn.Get([]byte("a3"))
724+
require.NoError(t, err)
725+
_, err = txn.Get([]byte("c3"))
726+
require.NoError(t, err)
727+
_, err = txn.Get([]byte("a4"))
728+
require.NoError(t, err)
729+
_, err = txn.Get([]byte("c4"))
730+
require.NoError(t, err)
731+
_, err = txn.Get([]byte("a5"))
732+
require.NoError(t, err)
733+
_, err = txn.Get([]byte("c5"))
734+
require.NoError(t, err)
735+
_, err = txn.Get([]byte("a6"))
736+
require.NoError(t, err)
737+
_, err = txn.Get([]byte("c6"))
738+
require.NoError(t, err)
739+
_, err = txn.Get([]byte("a7"))
740+
require.NoError(t, err)
741+
_, err = txn.Get([]byte("c7"))
742+
require.NoError(t, err)
743+
_, err = txn.Get([]byte("a8"))
744+
require.NoError(t, err)
745+
_, err = txn.Get([]byte("c8"))
746+
require.NoError(t, err)
747+
_, err = txn.Get([]byte("a9"))
748+
require.NoError(t, err)
749+
_, err = txn.Get([]byte("c9"))
750+
require.NoError(t, err)
751+
})
752+
})
678753
}

0 commit comments

Comments
 (0)