Skip to content

Commit 5f3c79b

Browse files
committed
use batch instead of channel
Signed-off-by: jsvisa <[email protected]>
1 parent eee21b7 commit 5f3c79b

File tree

2 files changed

+74
-39
lines changed

2 files changed

+74
-39
lines changed

triedb/pathdb/lookup.go

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,21 @@ func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte {
3838
return key
3939
}
4040

41-
// trienodeKey returns a key for uniquely identifying the trie node.
42-
func trienodeKey(accountHash common.Hash, path string) string {
43-
return accountHash.Hex() + path
41+
// trienodeKey uses a fixed-size byte array instead of string to avoid string allocations.
42+
type trienodeKey [96]byte // 32 bytes for hash + up to 64 bytes for path
43+
44+
// makeTrienodeKey returns a key for uniquely identifying the trie node.
45+
func makeTrienodeKey(accountHash common.Hash, path string) trienodeKey {
46+
var key trienodeKey
47+
copy(key[:32], accountHash[:])
48+
copy(key[32:], path)
49+
return key
50+
}
51+
52+
// shardTask used to batch task by shard to minimize lock contention
53+
type shardTask struct {
54+
accountHash common.Hash
55+
path string
4456
}
4557

4658
// lookup is an internal structure used to efficiently determine the layer in
@@ -69,7 +81,7 @@ type lookup struct {
6981
// The key is the account address hash and the trie path of the node,
7082
// the value is a slice of **diff layer** IDs indicating where the
7183
// slot was modified, with the order from oldest to newest.
72-
storageNodes [storageNodesShardCount]map[string][]common.Hash
84+
storageNodes [storageNodesShardCount]map[trienodeKey][]common.Hash
7385

7486
// descendant is the callback indicating whether the layer with
7587
// given root is a descendant of the one specified by `ancestor`.
@@ -103,7 +115,7 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha
103115
}
104116
// Initialize all 16 storage node shards
105117
for i := 0; i < storageNodesShardCount; i++ {
106-
l.storageNodes[i] = make(map[string][]common.Hash)
118+
l.storageNodes[i] = make(map[trienodeKey][]common.Hash)
107119
}
108120

109121
// Apply the diff layers from bottom to top
@@ -216,7 +228,7 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha
216228
list = l.accountNodes[path]
217229
} else {
218230
shardIndex := getStorageShardIndex(path) // Use only path for sharding
219-
list = l.storageNodes[shardIndex][trienodeKey(accountHash, path)]
231+
list = l.storageNodes[shardIndex][makeTrienodeKey(accountHash, path)]
220232
}
221233
for i := len(list) - 1; i >= 0; i-- {
222234
// If the current state matches the stateID, or the requested state is a
@@ -323,38 +335,47 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st
323335

324336
var (
325337
wg sync.WaitGroup
326-
tasks = make([]chan string, storageNodesShardCount)
338+
locks [storageNodesShardCount]sync.Mutex
339+
tasks = make([][]shardTask, storageNodesShardCount)
327340
)
328-
wg.Add(storageNodesShardCount)
329-
for i := 0; i < storageNodesShardCount; i++ {
330-
tasks[i] = make(chan string, 10) // Buffer to avoid blocking
341+
342+
// Pre-allocate work lists
343+
for accountHash, slots := range nodes {
344+
for path := range slots {
345+
shardIndex := getStorageShardIndex(path)
346+
tasks[shardIndex] = append(tasks[shardIndex], shardTask{
347+
accountHash: accountHash,
348+
path: path,
349+
})
350+
}
331351
}
352+
332353
// Start all workers, each handling its own shard
354+
wg.Add(storageNodesShardCount)
333355
for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ {
334356
go func(shardIdx int) {
335357
defer wg.Done()
336358

359+
taskList := tasks[shardIdx]
360+
if len(taskList) == 0 {
361+
return
362+
}
363+
364+
locks[shardIdx].Lock()
365+
defer locks[shardIdx].Unlock()
366+
337367
shard := l.storageNodes[shardIdx]
338-
for key := range tasks[shardIdx] {
368+
for _, task := range taskList {
369+
key := makeTrienodeKey(task.accountHash, task.path)
339370
list, exists := shard[key]
340371
if !exists {
341372
list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool
342373
}
343374
list = append(list, state)
344375
shard[key] = list
345376
}
346-
}(shardIndex)
347-
}
348377

349-
for accountHash, slots := range nodes {
350-
for path := range slots {
351-
shardIndex := getStorageShardIndex(path)
352-
tasks[shardIndex] <- trienodeKey(accountHash, path)
353-
}
354-
}
355-
// Close all channels to signal workers to finish
356-
for i := 0; i < storageNodesShardCount; i++ {
357-
close(tasks[i])
378+
}(shardIndex)
358379
}
359380
wg.Wait()
360381
}
@@ -456,20 +477,39 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map
456477

457478
var (
458479
eg errgroup.Group
459-
tasks = make([]chan string, storageNodesShardCount)
480+
locks [storageNodesShardCount]sync.Mutex
481+
tasks = make([][]shardTask, storageNodesShardCount)
460482
)
461-
for i := 0; i < storageNodesShardCount; i++ {
462-
tasks[i] = make(chan string, 10) // Buffer to avoid blocking
483+
484+
// Pre-allocate work lists
485+
for accountHash, slots := range nodes {
486+
for path := range slots {
487+
shardIndex := getStorageShardIndex(path)
488+
tasks[shardIndex] = append(tasks[shardIndex], shardTask{
489+
accountHash: accountHash,
490+
path: path,
491+
})
492+
}
463493
}
494+
464495
// Start all workers, each handling its own shard
465496
for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ {
466497
shardIdx := shardIndex // Capture the variable
467498
eg.Go(func() error {
499+
taskList := tasks[shardIdx]
500+
if len(taskList) == 0 {
501+
return nil
502+
}
503+
504+
locks[shardIdx].Lock()
505+
defer locks[shardIdx].Unlock()
506+
468507
shard := l.storageNodes[shardIdx]
469-
for key := range tasks[shardIdx] {
508+
for _, task := range taskList {
509+
key := makeTrienodeKey(task.accountHash, task.path)
470510
found, list := removeFromList(shard[key], state)
471511
if !found {
472-
return fmt.Errorf("storage lookup is not found, key: %s, state: %x", key, state)
512+
return fmt.Errorf("storage lookup is not found, key: %x, state: %x", key, state)
473513
}
474514
if len(list) != 0 {
475515
shard[key] = list
@@ -480,15 +520,5 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map
480520
return nil
481521
})
482522
}
483-
484-
for accountHash, slots := range nodes {
485-
for path := range slots {
486-
shardIndex := getStorageShardIndex(path)
487-
tasks[shardIndex] <- trienodeKey(accountHash, path)
488-
}
489-
}
490-
for i := 0; i < storageNodesShardCount; i++ {
491-
close(tasks[i])
492-
}
493523
return eg.Wait()
494524
}

triedb/pathdb/lookup_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ func BenchmarkAddNodes(b *testing.B) {
6363
accountNodeCount: 2000,
6464
nodesPerAccount: 40,
6565
},
66+
{
67+
name: "XLarge-5000-accounts-50-nodes",
68+
accountNodeCount: 5000,
69+
nodesPerAccount: 50,
70+
},
6671
}
6772

6873
for _, tc := range tests {
@@ -75,7 +80,7 @@ func BenchmarkAddNodes(b *testing.B) {
7580

7681
// Initialize all 16 storage node shards
7782
for i := 0; i < storageNodesShardCount; i++ {
78-
lookup.storageNodes[i] = make(map[string][]common.Hash)
83+
lookup.storageNodes[i] = make(map[trienodeKey][]common.Hash)
7984
}
8085

8186
var state common.Hash
@@ -87,7 +92,7 @@ func BenchmarkAddNodes(b *testing.B) {
8792
for i := 0; i < b.N; i++ {
8893
// Reset the lookup instance for each benchmark iteration
8994
for j := 0; j < storageNodesShardCount; j++ {
90-
lookup.storageNodes[j] = make(map[string][]common.Hash)
95+
lookup.storageNodes[j] = make(map[trienodeKey][]common.Hash)
9196
}
9297

9398
lookup.addStorageNodes(state, storageNodes)

0 commit comments

Comments
 (0)