Skip to content

Commit 092026b

Browse files
authored
Merge pull request #7131 from onflow/leo/util-read-storage-stats
[Util] Read storage stats
2 parents 6e6c6cd + 2bd9bc7 commit 092026b

File tree

3 files changed

+335
-0
lines changed

3 files changed

+335
-0
lines changed

cmd/util/cmd/read-badger/cmd/stats.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
"runtime"
6+
7+
"github.com/rs/zerolog/log"
8+
"github.com/spf13/cobra"
9+
10+
"github.com/onflow/flow-go/cmd/util/cmd/common"
11+
"github.com/onflow/flow-go/storage"
12+
"github.com/onflow/flow-go/storage/operation"
13+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
14+
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
15+
"github.com/onflow/flow-go/storage/pebble"
16+
)
17+
18+
var flagDBType string
19+
20+
func init() {
21+
rootCmd.AddCommand(statsCmd)
22+
statsCmd.Flags().StringVar(&flagDBType, "dbtype", "badger", "database type to use (badger or pebble)")
23+
}
24+
25+
var statsCmd = &cobra.Command{
26+
Use: "stats",
27+
Short: "get stats for the database, such as key count, total value size, min/max value size etc",
28+
RunE: func(cmd *cobra.Command, args []string) error {
29+
var sdb storage.DB
30+
if flagDBType == "badger" {
31+
db := common.InitStorage(flagDatadir)
32+
defer db.Close()
33+
sdb = badgerimpl.ToDB(db)
34+
} else if flagDBType == "pebble" {
35+
pdb, err := pebble.MustOpenDefaultPebbleDB(log.Logger, flagPebbleDir)
36+
if err != nil {
37+
return fmt.Errorf("failed to open pebble db: %w", err)
38+
}
39+
defer pdb.Close()
40+
sdb = pebbleimpl.ToDB(pdb)
41+
} else {
42+
return fmt.Errorf("invalid db type")
43+
}
44+
45+
numWorkers := runtime.NumCPU()
46+
if numWorkers > 256 {
47+
numWorkers = 256
48+
}
49+
log.Info().Msgf("getting stats for %s db at %s with %v workers0", flagDBType, flagDatadir, numWorkers)
50+
stats, err := operation.SummarizeKeysByFirstByteConcurrent(log.Logger, sdb.Reader(), numWorkers)
51+
if err != nil {
52+
return fmt.Errorf("failed to get stats: %w", err)
53+
}
54+
55+
operation.PrintStats(log.Logger, stats)
56+
return nil
57+
},
58+
}

storage/operation/stats.go

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package operation
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"math"
8+
"sort"
9+
"sync"
10+
11+
"github.com/rs/zerolog"
12+
13+
"github.com/onflow/flow-go/module/util"
14+
"github.com/onflow/flow-go/storage"
15+
)
16+
17+
// Stats holds statistics for a single prefix group.
18+
type Stats struct {
19+
Count int `json:"count"`
20+
MinSize int `json:"min_size"`
21+
MaxSize int `json:"max_size"`
22+
TotalSize int `json:"total_size"`
23+
AverageSize float64 `json:"avg_size"`
24+
}
25+
26+
// SummarizeKeysByFirstByteConcurrent iterates over all prefixes [0x00..0xFF] in parallel
27+
// using nWorker goroutines. Each worker handles one prefix at a time until all are processed.
28+
//
29+
// The storage.Reader must be able to create multiple iterators concurrently.
30+
func SummarizeKeysByFirstByteConcurrent(log zerolog.Logger, r storage.Reader, nWorker int) (map[byte]Stats, error) {
31+
// We'll have at most 256 possible prefixes (0x00..0xFF).
32+
// Create tasks (one per prefix), a results channel, and a wait group.
33+
taskChan := make(chan byte, 256)
34+
resultChan := make(chan struct {
35+
prefix byte
36+
stats Stats
37+
err error
38+
}, 256)
39+
40+
var wg sync.WaitGroup
41+
ctx, cancel := context.WithCancel(context.Background())
42+
defer cancel()
43+
44+
// Start nWorker goroutines.
45+
for i := 0; i < nWorker; i++ {
46+
wg.Add(1)
47+
go func() {
48+
defer wg.Done()
49+
for {
50+
select {
51+
case <-ctx.Done():
52+
return // Stop immediately on cancellation
53+
case prefix, ok := <-taskChan:
54+
if !ok {
55+
return // Stop if taskChan is closed
56+
}
57+
58+
st, err := processPrefix(r, prefix)
59+
resultChan <- struct {
60+
prefix byte
61+
stats Stats
62+
err error
63+
}{
64+
prefix: prefix,
65+
stats: st,
66+
err: err,
67+
}
68+
}
69+
}
70+
}()
71+
}
72+
73+
progress := util.LogProgress(log,
74+
util.DefaultLogProgressConfig(
75+
"Summarizing keys by first byte",
76+
256,
77+
))
78+
79+
// Send all prefixes [0..255] to taskChan.
80+
for p := 0; p < 256; p++ {
81+
taskChan <- byte(p)
82+
}
83+
close(taskChan)
84+
85+
// Once all workers finish, close the result channel.
86+
go func() {
87+
wg.Wait()
88+
close(resultChan)
89+
}()
90+
91+
// Gather results. We'll accumulate them in a map[prefix]Stats.
92+
finalStats := make(map[byte]Stats, 256)
93+
94+
var err error
95+
// If we encounter an error, we will return it immediately.
96+
for res := range resultChan {
97+
if res.err != nil {
98+
cancel() // Cancel running goroutines
99+
err = res.err
100+
break
101+
}
102+
finalStats[res.prefix] = res.stats
103+
log.Info().
104+
Int("prefix", int(res.prefix)).
105+
Int("total", res.stats.TotalSize).
106+
Int("count", res.stats.Count).
107+
Int("min", res.stats.MinSize).
108+
Int("max", res.stats.MaxSize).
109+
Msg("Processed prefix")
110+
progress(1) // log the progress
111+
}
112+
113+
if err != nil {
114+
return nil, err
115+
}
116+
return finalStats, nil
117+
}
118+
119+
// processPrefix does the actual iteration and statistic calculation for a single prefix.
120+
// It returns the Stats for that prefix, or an error if iteration fails.
121+
func processPrefix(r storage.Reader, prefix byte) (Stats, error) {
122+
var s Stats
123+
// We use MinSize = math.MaxInt as a sentinel so the first real size will become the new minimum.
124+
s.MinSize = math.MaxInt
125+
126+
// Iterator range is [prefix, prefix] (inclusive).
127+
start, end := []byte{prefix}, []byte{prefix}
128+
it, err := r.NewIter(start, end, storage.IteratorOption{BadgerIterateKeyOnly: true})
129+
if err != nil {
130+
return s, fmt.Errorf("failed to create iterator for prefix 0x%X: %w", prefix, err)
131+
}
132+
defer it.Close()
133+
134+
for it.First(); it.Valid(); it.Next() {
135+
item := it.IterItem()
136+
137+
// item.Value(...) is a function call that gives us the value, on which we measure size.
138+
err := item.Value(func(val []byte) error {
139+
size := len(val)
140+
s.Count++
141+
s.TotalSize += size
142+
if size < s.MinSize {
143+
s.MinSize = size
144+
}
145+
if size > s.MaxSize {
146+
s.MaxSize = size
147+
}
148+
return nil
149+
})
150+
151+
if err != nil {
152+
return s, fmt.Errorf("failed to process value for prefix %v: %w", int(prefix), err)
153+
}
154+
}
155+
156+
// If we found no keys for this prefix, reset MinSize to 0 to avoid confusion.
157+
if s.Count == 0 {
158+
s.MinSize = 0
159+
} else {
160+
// Compute average size.
161+
s.AverageSize = float64(s.TotalSize) / float64(s.Count)
162+
}
163+
164+
return s, nil
165+
}
166+
167+
// PrintStats logs the statistics for each prefix in ascending order.
168+
// Each prefix is shown in hex, along with count, min, max, total, and average sizes.
169+
func PrintStats(log zerolog.Logger, stats map[byte]Stats) {
170+
if len(stats) == 0 {
171+
log.Info().Msg("No stats to print (map is empty).")
172+
return
173+
}
174+
175+
// Convert map to a slice of key-value pairs
176+
statList := make([]struct {
177+
Prefix int `json:"prefix"`
178+
Stats Stats `json:"stats"`
179+
}, 0, len(stats))
180+
181+
for p, s := range stats {
182+
statList = append(statList, struct {
183+
Prefix int `json:"prefix"`
184+
Stats Stats `json:"stats"`
185+
}{Prefix: int(p), Stats: s})
186+
}
187+
188+
// Sort by TotalSize in ascending order
189+
sort.Slice(statList, func(i, j int) bool {
190+
return statList[i].Stats.TotalSize < statList[j].Stats.TotalSize
191+
})
192+
193+
// Convert sorted stats to JSON
194+
jsonData, err := json.MarshalIndent(statList, "", " ")
195+
if err != nil {
196+
log.Error().Err(err).Msg("Failed to marshal stats to JSON")
197+
return
198+
}
199+
200+
// Log the JSON
201+
log.Info().RawJSON("stats", jsonData).Msg("Sorted prefix stats")
202+
}

storage/operation/stats_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package operation_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/onflow/flow-go/storage"
9+
"github.com/onflow/flow-go/storage/operation"
10+
"github.com/onflow/flow-go/storage/operation/dbtest"
11+
"github.com/onflow/flow-go/utils/unittest"
12+
)
13+
14+
func TestSummarizeKeysByFirstByteConcurrent(t *testing.T) {
15+
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
16+
17+
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
18+
// insert random events
19+
b := unittest.IdentifierFixture()
20+
events := unittest.EventsFixture(30)
21+
for _, evt := range events {
22+
err := operation.InsertEvent(rw.Writer(), b, evt)
23+
if err != nil {
24+
return err
25+
}
26+
}
27+
28+
// insert 100 chunk data packs
29+
for i := 0; i < 100; i++ {
30+
collectionID := unittest.IdentifierFixture()
31+
cdp := &storage.StoredChunkDataPack{
32+
ChunkID: unittest.IdentifierFixture(),
33+
StartState: unittest.StateCommitmentFixture(),
34+
Proof: []byte{'p'},
35+
CollectionID: collectionID,
36+
}
37+
err := operation.InsertChunkDataPack(rw.Writer(), cdp)
38+
if err != nil {
39+
return err
40+
}
41+
}
42+
43+
// insert 20 results
44+
for i := 0; i < 20; i++ {
45+
result := unittest.ExecutionResultFixture()
46+
err := operation.InsertExecutionResult(rw.Writer(), result)
47+
if err != nil {
48+
return err
49+
}
50+
}
51+
52+
return nil
53+
})
54+
require.NoError(t, err)
55+
56+
// summarize keys by first byte
57+
stats, err := operation.SummarizeKeysByFirstByteConcurrent(unittest.Logger(), db.Reader(), 10)
58+
require.NoError(t, err)
59+
60+
// print
61+
operation.PrintStats(unittest.Logger(), stats)
62+
63+
for i := 0; i < 256; i++ {
64+
count := 0
65+
if i == 102 { // events
66+
count = 30
67+
} else if i == 100 { // CDP
68+
count = 100
69+
} else if i == 36 { // results
70+
count = 20
71+
}
72+
require.Equal(t, count, stats[byte(i)].Count, "byte %d", i)
73+
}
74+
})
75+
}

0 commit comments

Comments
 (0)