Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions triedb/pathdb/layertree.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,21 @@ func (tree *layerTree) lookupStorage(accountHash common.Hash, slotHash common.Ha
}
return l, nil
}

// lookupNode returns the layer that is guaranteed to contain the trie node
// data corresponding to the specified state root being queried.
func (tree *layerTree) lookupNode(accountHash common.Hash, path string, state common.Hash) (layer, error) {
// Hold the read lock to prevent the unexpected layer changes
tree.lock.RLock()
defer tree.lock.RUnlock()

tip := tree.lookup.nodeTip(accountHash, path, state, tree.base.root)
if tip == (common.Hash{}) {
return nil, fmt.Errorf("[%#x] %w", state, errSnapshotStale)
}
l := tree.layers[tip]
if l == nil {
return nil, fmt.Errorf("triedb layer [%#x] missing", tip)
}
return l, nil
}
252 changes: 249 additions & 3 deletions triedb/pathdb/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/trienode"
"golang.org/x/sync/errgroup"
)

// storageNodesShardCount is the number of shards used for storage nodes.
const storageNodesShardCount = 16

// storageKey returns a key for uniquely identifying the storage slot.
func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte {
var key [64]byte
Expand All @@ -33,6 +38,23 @@
return key
}

// trienodeKey uses a fixed-size byte array instead of string to avoid string allocations.
type trienodeKey [96]byte // 32 bytes for hash + up to 64 bytes for path

// makeTrienodeKey returns a key for uniquely identifying the trie node.
func makeTrienodeKey(accountHash common.Hash, path string) trienodeKey {
var key trienodeKey
copy(key[:32], accountHash[:])
copy(key[32:], path)
return key
}

// shardTask used to batch task by shard to minimize lock contention
type shardTask struct {
accountHash common.Hash
path string
}

// lookup is an internal structure used to efficiently determine the layer in
// which a state entry resides.
type lookup struct {
Expand All @@ -48,11 +70,33 @@
// where the slot was modified, with the order from oldest to newest.
storages map[[64]byte][]common.Hash

// accountNodes represents the mutation history for specific account
// trie nodes. The key is the trie path of the node, and the value is a slice
// of **diff layer** IDs indicating where the account was modified,
// with the order from oldest to newest.
accountNodes map[string][]common.Hash

// storageNodes represents the mutation history for specific storage
// slot trie nodes, distributed across 16 shards for efficiency.
// The key is the account address hash and the trie path of the node,
// the value is a slice of **diff layer** IDs indicating where the
// slot was modified, with the order from oldest to newest.
storageNodes [storageNodesShardCount]map[trienodeKey][]common.Hash

// descendant is the callback indicating whether the layer with
// given root is a descendant of the one specified by `ancestor`.
descendant func(state common.Hash, ancestor common.Hash) bool
}

// getStorageShardIndex returns the shard index for a given path
func getStorageShardIndex(path string) int {
if len(path) == 0 {
return 0
}
// use the first char of the path to determine the shard index
return int(path[0]) % storageNodesShardCount
}

// newLookup initializes the lookup structure.
func newLookup(head layer, descendant func(state common.Hash, ancestor common.Hash) bool) *lookup {
var (
Expand All @@ -64,10 +108,16 @@
current = current.parentLayer()
}
l := &lookup{
accounts: make(map[common.Hash][]common.Hash),
storages: make(map[[64]byte][]common.Hash),
descendant: descendant,
accounts: make(map[common.Hash][]common.Hash),
storages: make(map[[64]byte][]common.Hash),
accountNodes: make(map[string][]common.Hash),
descendant: descendant,
}
// Initialize all 16 storage node shards
for i := 0; i < storageNodesShardCount; i++ {
l.storageNodes[i] = make(map[trienodeKey][]common.Hash)
}

// Apply the diff layers from bottom to top
for i := len(layers) - 1; i >= 0; i-- {
switch diff := layers[i].(type) {
Expand Down Expand Up @@ -161,6 +211,44 @@
return common.Hash{}
}

// nodeTip traverses the layer list associated with the given account and path
// in reverse order to locate the first entry that either matches
// the specified stateID or is a descendant of it.
//
// If found, the trie node data corresponding to the supplied stateID resides
// in that layer. Otherwise, two scenarios are possible:
//
// (a) the trie node remains unmodified from the current disk layer up to
// the state layer specified by the stateID: fallback to the disk layer for
// data retrieval, (b) or the layer specified by the stateID is stale: reject
// the data retrieval.
func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Hash, base common.Hash) common.Hash {
var list []common.Hash
if accountHash == (common.Hash{}) {
list = l.accountNodes[path]
} else {
shardIndex := getStorageShardIndex(path) // Use only path for sharding
list = l.storageNodes[shardIndex][makeTrienodeKey(accountHash, path)]
}
for i := len(list) - 1; i >= 0; i-- {
// If the current state matches the stateID, or the requested state is a
// descendant of it, return the current state as the most recent one
// containing the modified data. Otherwise, the current state may be ahead
// of the requested one or belong to a different branch.
if list[i] == stateID || l.descendant(stateID, list[i]) {
return list[i]
}
}
// No layer matching the stateID or its descendants was found. Use the
// current disk layer as a fallback.
if base == stateID || l.descendant(stateID, base) {
return base
}
// The layer associated with 'stateID' is not the descendant of the current
// disk layer, it's already stale, return nothing.
return common.Hash{}
}

// addLayer traverses the state data retained in the specified diff layer and
// integrates it into the lookup set.
//
Expand All @@ -170,6 +258,7 @@
func (l *lookup) addLayer(diff *diffLayer) {
defer func(now time.Time) {
lookupAddLayerTimer.UpdateSince(now)
log.Debug("PathDB lookup add layer", "id", diff.id, "block", diff.block, "elapsed", time.Since(now))
}(time.Now())

var (
Expand Down Expand Up @@ -204,6 +293,90 @@
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for path := range diff.nodes.accountNodes {
list, exists := l.accountNodes[path]
if !exists {
list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool
}
list = append(list, state)
l.accountNodes[path] = list
}
}()

wg.Add(1)
go func() {
defer wg.Done()
l.addStorageNodes(state, diff.nodes.storageNodes)
}()

states := len(diff.states.accountData)
for _, slots := range diff.states.storageData {
states += len(slots)
}
lookupStateMeter.Mark(int64(states))

trienodes := len(diff.nodes.accountNodes)
for _, nodes := range diff.nodes.storageNodes {
trienodes += len(nodes)
}
lookupTrienodeMeter.Mark(int64(trienodes))

wg.Wait()
}

func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) {
defer func(start time.Time) {
lookupAddTrienodeLayerTimer.UpdateSince(start)
}(time.Now())

var (
wg sync.WaitGroup
locks [storageNodesShardCount]sync.Mutex
tasks = make([][]shardTask, storageNodesShardCount)
)

// Pre-allocate work lists
for accountHash, slots := range nodes {
for path := range slots {
shardIndex := getStorageShardIndex(path)
tasks[shardIndex] = append(tasks[shardIndex], shardTask{
accountHash: accountHash,
path: path,
})
}
}

// Start all workers, each handling its own shard
wg.Add(storageNodesShardCount)
for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ {
go func(shardIdx int) {
defer wg.Done()

taskList := tasks[shardIdx]
if len(taskList) == 0 {
return
}

locks[shardIdx].Lock()
defer locks[shardIdx].Unlock()

shard := l.storageNodes[shardIdx]
for _, task := range taskList {
key := makeTrienodeKey(task.accountHash, task.path)
list, exists := shard[key]
if !exists {
list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool
}
list = append(list, state)
shard[key] = list
}

}(shardIndex)

Check failure on line 378 in triedb/pathdb/lookup.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary trailing newline (whitespace)
}
wg.Wait()
}

Expand Down Expand Up @@ -236,6 +409,7 @@
func (l *lookup) removeLayer(diff *diffLayer) error {
defer func(now time.Time) {
lookupRemoveLayerTimer.UpdateSince(now)
log.Debug("PathDB lookup remove layer", "id", diff.id, "block", diff.block, "elapsed", time.Since(now))
}(time.Now())

var (
Expand Down Expand Up @@ -274,5 +448,77 @@
}
return nil
})

eg.Go(func() error {
for path := range diff.nodes.accountNodes {
found, list := removeFromList(l.accountNodes[path], state)
if !found {
return fmt.Errorf("account lookup is not found, %x, state: %x", path, state)
}
if len(list) != 0 {
l.accountNodes[path] = list
} else {
delete(l.accountNodes, path)
}
}
return nil
})

eg.Go(func() error {
return l.removeStorageNodes(state, diff.nodes.storageNodes)
})
return eg.Wait()
}

func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) error {
defer func(start time.Time) {
lookupRemoveTrienodeLayerTimer.UpdateSince(start)
}(time.Now())

var (
eg errgroup.Group
locks [storageNodesShardCount]sync.Mutex
tasks = make([][]shardTask, storageNodesShardCount)
)

// Pre-allocate work lists
for accountHash, slots := range nodes {
for path := range slots {
shardIndex := getStorageShardIndex(path)
tasks[shardIndex] = append(tasks[shardIndex], shardTask{
accountHash: accountHash,
path: path,
})
}
}

// Start all workers, each handling its own shard
for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ {
shardIdx := shardIndex // Capture the variable
eg.Go(func() error {
taskList := tasks[shardIdx]
if len(taskList) == 0 {
return nil
}

locks[shardIdx].Lock()
defer locks[shardIdx].Unlock()

shard := l.storageNodes[shardIdx]
for _, task := range taskList {
key := makeTrienodeKey(task.accountHash, task.path)
found, list := removeFromList(shard[key], state)
if !found {
return fmt.Errorf("storage lookup is not found, key: %x, state: %x", key, state)
}
if len(list) != 0 {
shard[key] = list
} else {
delete(shard, key)
}
}
return nil
})
}
return eg.Wait()
}
Loading
Loading