Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Bug Fixes

- [#1007](https://github.com/cosmos/iavl/pull/1007) Add the extra check for the reformatted root node in `GetNode`
- [#1106](https://github.com/cosmos/iavl/pull/1106) Fix unlock of unlocked mutex panic.
- [#1107](https://github.com/cosmos/iavl/pull/1107) nodedb latest version should not set to a partial committed version.

### Improvements

Expand Down
20 changes: 10 additions & 10 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
return err
}
if batchSizeAfter > b.flushThreshold {
b.mtx.Unlock()
if err := b.Write(); err != nil {
if err := b.write(); err != nil {
return err
}
b.mtx.Lock()
}
return b.batch.Set(key, value)
}
Expand All @@ -79,19 +77,14 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
return err
}
if batchSizeAfter > b.flushThreshold {
b.mtx.Unlock()
if err := b.Write(); err != nil {
if err := b.write(); err != nil {
return err
}
b.mtx.Lock()
}
return b.batch.Delete(key)
}

func (b *BatchWithFlusher) Write() error {
b.mtx.Lock()
defer b.mtx.Unlock()

func (b *BatchWithFlusher) write() error {
if err := b.batch.Write(); err != nil {
return err
}
Expand All @@ -102,6 +95,13 @@ func (b *BatchWithFlusher) Write() error {
return nil
}

func (b *BatchWithFlusher) Write() error {
b.mtx.Lock()
defer b.mtx.Unlock()

return b.write()
}

func (b *BatchWithFlusher) WriteSync() error {
b.mtx.Lock()
defer b.mtx.Unlock()
Expand Down
86 changes: 86 additions & 0 deletions batch_flusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package iavl

import (
"fmt"
"math/rand"
"testing"

corestore "cosmossdk.io/core/store"
dbm "github.com/cosmos/iavl/db"
"github.com/stretchr/testify/require"
)

type MockDBBatch struct {
corestore.Batch

// simulate low level system error
err error
}

func (b *MockDBBatch) Write() error {
if b.err != nil {
return b.err
}
return b.Batch.Write()
}

func (b *MockDBBatch) WriteSync() error {
if b.err != nil {
return b.err
}
return b.Batch.WriteSync()
}

type MockDB struct {
dbm.DB

batchIndex int
// simulate low level system error at i-th batch write
errors map[int]error
}

func (db *MockDB) NewBatch() corestore.Batch {
err, _ := db.errors[db.batchIndex]
batch := &MockDBBatch{Batch: db.DB.NewBatch(), err: err}
db.batchIndex++
return batch
}

func (db *MockDB) NewBatchWithSize(size int) corestore.Batch {
return db.NewBatch()
}

func TestBatchFlusher(t *testing.T) {
db := &MockDB{DB: dbm.NewMemDB()}

{
tree := NewMutableTree(db, 10000, true, NewNopLogger())
v, err := tree.Load()
require.NoError(t, err)
require.Equal(t, int64(0), v)

// the batch size exceeds the threshold, and persist into db in two batches
for i := 0; i < 1000; i++ {
// random key value pairs
key := []byte(fmt.Sprintf("key-%064d", rand.Intn(100000000)))
value := []byte(fmt.Sprintf("value-%064d", rand.Intn(100000000)))
_, err := tree.Set(key, value)
require.NoError(t, err)
}

// the first batch write will success,
// the second batch write will fail
db.errors = map[int]error{
1: fmt.Errorf("filesystem failure"),
}
_, _, err = tree.SaveVersion()
require.Error(t, err)
}

{
tree := NewMutableTree(db, 10000, true, NewNopLogger())
v, err := tree.Load()
require.NoError(t, err)
require.Equal(t, int64(0), v)
}
}
12 changes: 8 additions & 4 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,13 +913,17 @@ func (ndb *nodeDB) getLatestVersion() (bool, int64, error) {
}
defer itr.Close()

if itr.Valid() {
for ; itr.Valid(); itr.Next() {
k := itr.Key()
var nk []byte
nodeKeyFormat.Scan(k, &nk)
latestVersion = GetNodeKey(nk).version
ndb.resetLatestVersion(latestVersion)
return true, latestVersion, nil
nodeKey := GetNodeKey(nk)
if bytes.Equal(nk, GetRootKey(nodeKey.version)) {
// only find the latest version when we find the root node
latestVersion = nodeKey.version
ndb.resetLatestVersion(latestVersion)
return true, latestVersion, nil
}
}

if err := itr.Error(); err != nil {
Expand Down