Skip to content

Commit 6e6c6cd

Browse files
authored
Merge pull request #7092 from onflow/leo/util-read-results-receipts
[Util] update util to read results from pebble
2 parents 3ca6409 + b59a7d2 commit 6e6c6cd

11 files changed

+288
-241
lines changed

admin/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,9 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
125125
```
126126
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "create-chunk-data-packs-checkpoint" }'
127127
```
128+
129+
### Trigger pebble protocol database checkpoints
130+
Useful for reading protocol state data from the checkpoints using the read-badger util without stopping the node process.
131+
```
132+
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "create-pebble-checkpoint" }'
133+
```

admin/commands/storage/chunk_data_packs_db_checkpoint.go

-57
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"path"
7+
"time"
8+
9+
"github.com/cockroachdb/pebble"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/onflow/flow-go/admin"
13+
"github.com/onflow/flow-go/admin/commands"
14+
)
15+
16+
var _ commands.AdminCommand = (*PebbleDBCheckpointCommand)(nil)
17+
18+
// PebbleDBCheckpointCommand creates a checkpoint for pebble database for querying the data
19+
// while keeping the node alive.
20+
type PebbleDBCheckpointCommand struct {
21+
checkpointDir string
22+
dbname string // dbname is for logging purposes only
23+
pebbleDB *pebble.DB
24+
}
25+
26+
func NewPebbleDBCheckpointCommand(checkpointDir string, dbname string, pebbleDB *pebble.DB) *PebbleDBCheckpointCommand {
27+
return &PebbleDBCheckpointCommand{
28+
checkpointDir: checkpointDir,
29+
dbname: dbname,
30+
pebbleDB: pebbleDB,
31+
}
32+
}
33+
34+
func (c *PebbleDBCheckpointCommand) Handler(ctx context.Context, req *admin.CommandRequest) (interface{}, error) {
35+
log.Info().Msgf("admintool: creating %v database checkpoint", c.dbname)
36+
37+
targetDir := nextTmpFolder(c.checkpointDir)
38+
39+
log.Info().Msgf("admintool: creating %v database checkpoint at: %v", c.dbname, targetDir)
40+
41+
err := c.pebbleDB.Checkpoint(targetDir)
42+
if err != nil {
43+
return nil, admin.NewInvalidAdminReqErrorf("failed to create %v pebbledb checkpoint at %v: %w", c.dbname, targetDir, err)
44+
}
45+
46+
log.Info().Msgf("admintool: successfully created %v database checkpoint at: %v", c.dbname, targetDir)
47+
48+
return fmt.Sprintf("successfully created %v db checkpoint at %v", c.dbname, targetDir), nil
49+
}
50+
51+
func (c *PebbleDBCheckpointCommand) Validator(req *admin.CommandRequest) error {
52+
return nil
53+
}
54+
55+
func nextTmpFolder(dir string) string {
56+
// use timestamp as folder name
57+
folderName := time.Now().Format("2006-01-02_15-04-05")
58+
return path.Join(dir, folderName)
59+
}

cmd/execution_builder.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,8 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
231231
// Load the admin tool when chunk data packs db are initialized in execution state
232232
AdminCommand("create-chunk-data-packs-checkpoint", func(config *NodeConfig) commands.AdminCommand {
233233
// by default checkpoints will be created under "/data/chunk_data_packs_checkpoints_dir"
234-
return storageCommands.NewChunksCheckpointCommand(exeNode.exeConf.chunkDataPackCheckpointsDir,
235-
exeNode.chunkDataPackDB)
234+
return storageCommands.NewPebbleDBCheckpointCommand(exeNode.exeConf.chunkDataPackCheckpointsDir,
235+
"chunk_data_pack", exeNode.chunkDataPackDB)
236236
}).
237237
Component("stop control", exeNode.LoadStopControl).
238238
Component("execution state ledger WAL compactor", exeNode.LoadExecutionStateLedgerWALCompactor).

cmd/node_builder.go

+1
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ type BaseConfig struct {
155155
DynamicStartupSleepInterval time.Duration
156156
datadir string
157157
pebbleDir string
158+
pebbleCheckpointsDir string
158159
dbops string
159160
badgerDB *badger.DB
160161
pebbleDB *pebble.DB

cmd/scaffold.go

+4
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
171171
fnb.flags.StringVar(&fnb.BaseConfig.BindAddr, "bind", defaultConfig.BindAddr, "address to bind on")
172172
fnb.flags.StringVarP(&fnb.BaseConfig.BootstrapDir, "bootstrapdir", "b", defaultConfig.BootstrapDir, "path to the bootstrap directory")
173173
fnb.flags.StringVarP(&fnb.BaseConfig.datadir, "datadir", "d", defaultConfig.datadir, "directory to store the public database (protocol state)")
174+
fnb.flags.StringVar(&fnb.BaseConfig.pebbleCheckpointsDir, "pebble-checkpoints-dir", defaultConfig.pebbleCheckpointsDir, "directory to store the checkpoints for the public pebble database (protocol state)")
174175
fnb.flags.StringVar(&fnb.BaseConfig.pebbleDir, "pebble-dir", defaultConfig.pebbleDir, "directory to store the public pebble database (protocol state)")
175176
fnb.flags.StringVar(&fnb.BaseConfig.secretsdir, "secretsdir", defaultConfig.secretsdir, "directory to store private database (secrets)")
176177
fnb.flags.StringVar(&fnb.BaseConfig.dbops, "dbops", defaultConfig.dbops, "database operations to use (badger-transaction, batch-update, pebble-update)")
@@ -2102,6 +2103,9 @@ func (fnb *FlowNodeBuilder) RegisterDefaultAdminCommands() {
21022103
return storageCommands.NewReadSealsCommand(config.State, config.Storage.Seals, config.Storage.Index)
21032104
}).AdminCommand("get-latest-identity", func(config *NodeConfig) commands.AdminCommand {
21042105
return common.NewGetIdentityCommand(config.IdentityProvider)
2106+
}).AdminCommand("create-pebble-checkpoint", func(config *NodeConfig) commands.AdminCommand {
2107+
// by default checkpoints will be created under "/data/protocol_pebble_checkpoints"
2108+
return storageCommands.NewPebbleDBCheckpointCommand(config.pebbleCheckpointsDir, "protocol", config.PebbleDB)
21052109
})
21062110
}
21072111

+23-16
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package cmd
22

33
import (
4+
"fmt"
5+
46
"github.com/rs/zerolog/log"
57
"github.com/spf13/cobra"
68

7-
"github.com/onflow/flow-go/cmd/util/cmd/common"
89
findBlockByCommits "github.com/onflow/flow-go/cmd/util/cmd/read-badger/cmd/find-block-by-commits"
910
"github.com/onflow/flow-go/model/flow"
11+
"github.com/onflow/flow-go/module/metrics"
12+
"github.com/onflow/flow-go/storage"
13+
"github.com/onflow/flow-go/storage/store"
1014
)
1115

1216
func init() {
@@ -22,25 +26,28 @@ var commitsCmd = &cobra.Command{
2226
Use: "commits",
2327
Short: "get commit by block ID",
2428
Run: func(cmd *cobra.Command, args []string) {
25-
_, db := InitStorages()
26-
defer db.Close()
29+
err := WithStorage(func(db storage.DB) error {
2730

28-
commits := common.InitExecutionStorages(db).Commits
31+
commits := store.NewCommits(metrics.NewNoopCollector(), db)
2932

30-
log.Info().Msgf("got flag block id: %s", flagBlockID)
31-
blockID, err := flow.HexStringToIdentifier(flagBlockID)
32-
if err != nil {
33-
log.Error().Err(err).Msg("malformed block id")
34-
return
35-
}
33+
log.Info().Msgf("got flag block id: %s", flagBlockID)
34+
blockID, err := flow.HexStringToIdentifier(flagBlockID)
35+
if err != nil {
36+
return fmt.Errorf("malformed block id: %w", err)
37+
}
38+
39+
log.Info().Msgf("getting commit by block id: %v", blockID)
40+
commit, err := commits.ByBlockID(blockID)
41+
if err != nil {
42+
return fmt.Errorf("could not get commit for block id: %v: %w", blockID, err)
43+
}
44+
45+
log.Info().Msgf("commit: %x", commit)
46+
return nil
47+
})
3648

37-
log.Info().Msgf("getting commit by block id: %v", blockID)
38-
commit, err := commits.ByBlockID(blockID)
3949
if err != nil {
40-
log.Error().Err(err).Msgf("could not get commit for block id: %v", blockID)
41-
return
50+
log.Error().Err(err).Msg("could not get events")
4251
}
43-
44-
log.Info().Msgf("commit: %x", commit)
4552
},
4653
}
+60-57
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package cmd
22

33
import (
4+
"fmt"
5+
46
"github.com/rs/zerolog/log"
57
"github.com/spf13/cobra"
68

79
"github.com/onflow/flow-go/cmd/util/cmd/common"
810
"github.com/onflow/flow-go/model/flow"
11+
"github.com/onflow/flow-go/module/metrics"
12+
"github.com/onflow/flow-go/storage"
13+
"github.com/onflow/flow-go/storage/store"
914
)
1015

1116
var flagEventType string
@@ -23,81 +28,79 @@ func init() {
2328

2429
var eventsCmd = &cobra.Command{
2530
Use: "events",
26-
Short: "Read events from badger",
31+
Short: "Read events",
2732
Run: func(cmd *cobra.Command, args []string) {
28-
_, db := InitStorages()
29-
defer db.Close()
30-
31-
events := common.InitExecutionStorages(db).Events
32-
33-
if flagEventType != "" && flagTransactionID != "" {
34-
log.Error().Msg("provide only one of --transaction-id or --event-type")
35-
return
36-
}
37-
38-
log.Info().Msgf("got flag block id: %s", flagBlockID)
39-
blockID, err := flow.HexStringToIdentifier(flagBlockID)
40-
if err != nil {
41-
log.Error().Err(err).Msg("malformed block id")
42-
return
43-
}
33+
err := WithStorage(func(db storage.DB) error {
34+
events := store.NewEvents(metrics.NewNoopCollector(), db)
4435

45-
if flagTransactionID != "" {
46-
log.Info().Msgf("got flag transaction id: %s", flagTransactionID)
47-
transactionID, err := flow.HexStringToIdentifier(flagTransactionID)
48-
if err != nil {
49-
log.Error().Err(err).Msg("malformed transaction id")
50-
return
36+
if flagEventType != "" && flagTransactionID != "" {
37+
return fmt.Errorf("provide only one of --transaction-id or --event-type")
5138
}
5239

53-
log.Info().Msgf("getting events for block id: %v, transaction id: %v", blockID, transactionID)
54-
events, err := events.ByBlockIDTransactionID(blockID, transactionID)
40+
log.Info().Msgf("got flag block id: %s", flagBlockID)
41+
blockID, err := flow.HexStringToIdentifier(flagBlockID)
5542
if err != nil {
56-
log.Error().Err(err).Msgf("could not get events for block id: %v, transaction id: %v", blockID, transactionID)
57-
return
43+
return fmt.Errorf("malformed block id: %w", err)
5844
}
5945

60-
for _, event := range events {
61-
common.PrettyPrint(event)
62-
}
63-
return
64-
}
46+
if flagTransactionID != "" {
47+
log.Info().Msgf("got flag transaction id: %s", flagTransactionID)
48+
transactionID, err := flow.HexStringToIdentifier(flagTransactionID)
49+
if err != nil {
50+
return fmt.Errorf("malformed traansaction id: %w", err)
51+
}
6552

66-
if flagEventType != "" {
67-
validEvents := map[string]bool{
68-
"flow.AccountCreated": true,
69-
"flow.AccountUpdated": true,
70-
"flow.EpochCommit": true,
71-
"flow.EpochSetup": true,
72-
}
73-
if _, ok := validEvents[flagEventType]; ok {
74-
log.Info().Msgf("getting events for block id: %v, event type: %s", blockID, flagEventType)
75-
events, err := events.ByBlockIDEventType(blockID, flow.EventType(flagEventType))
53+
log.Info().Msgf("getting events for block id: %v, transaction id: %v", blockID, transactionID)
54+
events, err := events.ByBlockIDTransactionID(blockID, transactionID)
7655
if err != nil {
77-
log.Error().Err(err).Msgf("could not get events for block id: %v, event type: %s", blockID, flagEventType)
78-
return
56+
return fmt.Errorf("could not get events for block id: %v, transaction id: %v: %w", blockID, transactionID, err)
7957
}
8058

8159
for _, event := range events {
8260
common.PrettyPrint(event)
8361
}
84-
return
62+
return nil
8563
}
8664

87-
log.Fatal().Msgf("not a valid event type: %s", flagEventType)
88-
return
89-
}
65+
if flagEventType != "" {
66+
validEvents := map[string]bool{
67+
"flow.AccountCreated": true,
68+
"flow.AccountUpdated": true,
69+
"flow.EpochCommit": true,
70+
"flow.EpochSetup": true,
71+
}
72+
if _, ok := validEvents[flagEventType]; ok {
73+
log.Info().Msgf("getting events for block id: %v, event type: %s", blockID, flagEventType)
74+
events, err := events.ByBlockIDEventType(blockID, flow.EventType(flagEventType))
75+
if err != nil {
76+
return fmt.Errorf("could not get events for block id: %v, event type: %s, %w", blockID, flagEventType, err)
77+
}
78+
79+
for _, event := range events {
80+
common.PrettyPrint(event)
81+
}
82+
return nil
83+
}
9084

91-
// just fetch events for block
92-
log.Info().Msgf("getting events for block id: %v", blockID)
93-
evts, err := events.ByBlockID(blockID)
94-
if err != nil {
95-
log.Error().Err(err).Msgf("could not get events for block id: %v", blockID)
96-
return
97-
}
85+
return fmt.Errorf("not a valid event type: %s", flagEventType)
86+
}
87+
88+
// just fetch events for block
89+
log.Info().Msgf("getting events for block id: %v", blockID)
90+
evts, err := events.ByBlockID(blockID)
91+
if err != nil {
92+
return fmt.Errorf("could not get events for block id: %v: %w", blockID, err)
93+
}
9894

99-
for _, event := range evts {
100-
common.PrettyPrint(event)
95+
for _, event := range evts {
96+
common.PrettyPrint(event)
97+
}
98+
99+
return nil
100+
})
101+
102+
if err != nil {
103+
log.Error().Err(err).Msg("could not get events")
101104
}
102105
},
103106
}

0 commit comments

Comments
 (0)