Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support cache among blocks #2575

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,8 @@ type txLookup struct {
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
Expand Down Expand Up @@ -314,6 +313,9 @@ type BlockChain struct {
vmConfig vm.Config
pipeCommit bool

// Cache among blocks
cacheAmongBlocks *state.CacheAmongBlocks

// monitor
doubleSignMonitor *monitor.DoubleSignMonitor
}
Expand Down Expand Up @@ -545,6 +547,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
return nil, err
}
}

// Initialise cache among blocks
// Start future block processor.
bc.wg.Add(1)
go bc.updateFutureBlocks()
Expand Down Expand Up @@ -579,6 +583,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}

return bc, nil
}

Expand Down Expand Up @@ -2233,7 +2238,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}

statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if bc.cacheAmongBlocks == nil {
bc.cacheAmongBlocks = state.NewCacheAmongBlocks(parent.Root)
}
// Check whether the cache pool among blocks can be used, if parent root is the same, use it
// Else drop and reset the cache.
if parent.Root != bc.cacheAmongBlocks.GetRoot() {
log.Info("root is not same with cache root", "parent root:", parent.Root,
"cache root", bc.cacheAmongBlocks.GetRoot())
bc.cacheAmongBlocks.Reset()
}

//log.Info("new state db with cache", "cache root", bc.cacheAmongBlocks.GetRoot())
statedb, err := state.NewWithCacheAmongBlocks(parent.Root, bc.stateCache, bc.snaps, bc.cacheAmongBlocks)
if err != nil {
return it.index, err
}
Expand Down
8 changes: 8 additions & 0 deletions core/state/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ var (
slotDeletionCount = metrics.NewRegisteredMeter("state/delete/storage/slot", nil)
slotDeletionSize = metrics.NewRegisteredMeter("state/delete/storage/size", nil)
slotDeletionSkip = metrics.NewRegisteredGauge("state/delete/storage/skip", nil)

SnapshotBlockCacheAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/account/miss", nil)
SnapshotBlockCacheAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/account/hit", nil)
BlockCacheAccountTimer = metrics.NewRegisteredResettingTimer("state/cacheblock/account/timer", nil)
BlockCacheStorageTimer = metrics.NewRegisteredResettingTimer("state/cacheblock/storage/timer", nil)
SnapshotBlockCacheStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/storage/miss", nil)
SnapshotBlockCacheStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/storage/hit", nil)
SnapshotBlockCacheStoragePurge = metrics.NewRegisteredMeter("state/snapshot/cacheblock/storage/purge", nil)
)
76 changes: 76 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@ package state
import (
"sync"

"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"

"github.com/ethereum/go-ethereum/common"
)

const (
AccountCacheSize = 10
StorageCacheSize = 100
)

// StoragePool is used to store maps of originStorage of stateObjects
type StoragePool struct {
sync.RWMutex
Expand Down Expand Up @@ -37,3 +46,70 @@ func (s *StoragePool) getStorage(address common.Address) *sync.Map {
}
return storageMap
}

// CacheAmongBlocks is used to store difflayer data in a flat cache,
// it only stores the latest version of the data
type CacheAmongBlocks struct {
cacheRoot common.Hash
// sMux sync.Mutex // TODO use mutex to update the cache if pipeline used the cache
accountsCache *fastcache.Cache
storagesCache *fastcache.Cache
}

func NewCacheAmongBlocks(cacheRoot common.Hash) *CacheAmongBlocks {
return &CacheAmongBlocks{
cacheRoot: cacheRoot,
accountsCache: fastcache.New(AccountCacheSize * 1024 * 1024),
storagesCache: fastcache.New(StorageCacheSize * 1024 * 1024),
}
}

func (c *CacheAmongBlocks) GetRoot() common.Hash {
return c.cacheRoot
}

func (c *CacheAmongBlocks) PurgeStorageCache() {
c.storagesCache.Reset()
}

func (c *CacheAmongBlocks) Reset() {
c.accountsCache.Reset()
c.storagesCache.Reset()
c.cacheRoot = types.EmptyRootHash
}

func (c *CacheAmongBlocks) SetRoot(root common.Hash) {
c.cacheRoot = root
}

func (c *CacheAmongBlocks) GetAccount(key common.Hash) (*types.SlimAccount, bool) {
if blob, found := c.accountsCache.HasGet(nil, key[:]); found {
if len(blob) == 0 { // can be both nil and []byte{}
return nil, true
}
account := new(types.SlimAccount)
if err := rlp.DecodeBytes(blob, account); err != nil {
panic(err)
} else {
return account, true
}
}
return nil, false
}

func (c *CacheAmongBlocks) GetStorage(accountHash common.Hash, storageKey common.Hash) ([]byte, bool) {
key := append(accountHash.Bytes(), storageKey.Bytes()...)
if blob, found := c.storagesCache.HasGet(nil, key); found {
return blob, true
}
return nil, false
}

func (c *CacheAmongBlocks) SetAccount(key common.Hash, account []byte) {
c.accountsCache.Set(key[:], account)
}

func (c *CacheAmongBlocks) SetStorage(accountHash common.Hash, storageKey common.Hash, value []byte) {
key := append(accountHash.Bytes(), storageKey.Bytes()...)
c.storagesCache.Set(key, value)
}
5 changes: 4 additions & 1 deletion core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
bloomfilter "github.com/holiman/bloomfilter/v2"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -145,7 +146,8 @@ func storageBloomHash(h0, h1 common.Hash) uint64 {

// newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low
// level persistent database or a hierarchical diff already.
func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{},
accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
// Create the new layer with some pre-allocated data segments
dl := &diffLayer{
parent: parent,
Expand Down Expand Up @@ -451,6 +453,7 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([
// If the account is known locally, but deleted, return an empty slot
if _, ok := dl.destructSet[accountHash]; ok {
snapshotDirtyStorageHitMeter.Mark(1)
log.Info("difflayer hit ", "account ", accountHash, "destruct", "true")
//snapshotDirtyStorageHitDepthHist.Update(int64(depth))
snapshotDirtyStorageInexMeter.Mark(1)
snapshotBloomStorageTrueHitMeter.Mark(1)
Expand Down
1 change: 1 addition & 0 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, cache
if err := batch.Write(); err != nil {
log.Crit("Failed to write initialized state marker", "err", err)
}
log.Info("use fast cache", "cache size", cache)
base := &diskLayer{
diskdb: diskdb,
triedb: triedb,
Expand Down
14 changes: 10 additions & 4 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,14 @@ var (
snapshotBloomIndexTimer = metrics.NewRegisteredResettingTimer("state/snapshot/bloom/index", nil)
snapshotBloomErrorGauge = metrics.NewRegisteredGaugeFloat64("state/snapshot/bloom/error", nil)

snapshotBloomAccountTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/truehit", nil)
snapshotBloomAccountFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/falsehit", nil)
snapshotBloomAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/miss", nil)
snapshotBloomAccountTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/truehit", nil)
snapshotBloomAccountFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/falsehit", nil)
snapshotBloomAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/miss", nil)
SnapshotBlockCacheAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/account/miss", nil)
SnapshotBlockCacheAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/account/hit", nil)

SnapshotBlockCacheStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/storage/miss", nil)
SnapshotBlockCacheStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/cacheblock/storage/hit", nil)

snapshotBloomStorageTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/truehit", nil)
snapshotBloomStorageFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/falsehit", nil)
Expand Down Expand Up @@ -389,7 +394,7 @@ func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs m
defer t.lock.Unlock()

t.layers[snap.root] = snap
log.Debug("Snapshot updated", "blockRoot", blockRoot)
//log.Info("Snapshot updated", "blockRoot", blockRoot)
return nil
}

Expand Down Expand Up @@ -433,6 +438,7 @@ func (t *Tree) Cap(root common.Hash, layers int) error {
if layers == 0 {
// If full commit was requested, flatten the diffs and merge onto disk
diff.lock.RLock()
log.Info("diff layer faltten happen")
base := diffToDisk(diff.flatten().(*diffLayer))
diff.lock.RUnlock()

Expand Down
20 changes: 17 additions & 3 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,26 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
enc []byte
err error
value common.Hash
exist bool
)

if s.db.snap != nil {
start := time.Now()
enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
if metrics.EnabledExpensive {
s.db.SnapshotStorageReads += time.Since(start)
storageKey := crypto.Keccak256Hash(key.Bytes())
// Try to get from cache among blocks if the cache root is the pre-state root
if s.db.cacheAmongBlocks != nil && s.db.cacheAmongBlocks.GetRoot() == s.db.originalRoot {
enc, exist = s.db.cacheAmongBlocks.GetStorage(s.addrHash, storageKey)
if exist {
SnapshotBlockCacheStorageHitMeter.Mark(1)
} else {
SnapshotBlockCacheStorageMissMeter.Mark(1)
}
}
if !exist {
enc, err = s.db.snap.Storage(s.addrHash, storageKey)
if metrics.EnabledExpensive {
s.db.SnapshotStorageReads += time.Since(start)
}
}
if len(enc) > 0 {
_, content, _, err := rlp.Split(enc)
Expand Down
Loading
Loading