Skip to content

Commit 318a7d1

Browse files
authored
Merge branch 'master' into leo/db-ops-migrate-en-results-to-pebble
2 parents f5efc4a + 418a168 commit 318a7d1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+3291
-757
lines changed

admin/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,9 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
125125
```
126126
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "create-chunk-data-packs-checkpoint" }'
127127
```
128+
129+
### Trigger pebble protocol database checkpoints
130+
Useful for reading protocol state data from the checkpoints using the read-badger util without stopping the node process.
131+
```
132+
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "create-pebble-checkpoint" }'
133+
```

admin/commands/storage/chunk_data_packs_db_checkpoint.go

-57
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"path"
7+
"time"
8+
9+
"github.com/cockroachdb/pebble"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/onflow/flow-go/admin"
13+
"github.com/onflow/flow-go/admin/commands"
14+
)
15+
16+
var _ commands.AdminCommand = (*PebbleDBCheckpointCommand)(nil)
17+
18+
// PebbleDBCheckpointCommand creates a checkpoint for pebble database for querying the data
19+
// while keeping the node alive.
20+
type PebbleDBCheckpointCommand struct {
21+
checkpointDir string
22+
dbname string // dbname is for logging purposes only
23+
pebbleDB *pebble.DB
24+
}
25+
26+
func NewPebbleDBCheckpointCommand(checkpointDir string, dbname string, pebbleDB *pebble.DB) *PebbleDBCheckpointCommand {
27+
return &PebbleDBCheckpointCommand{
28+
checkpointDir: checkpointDir,
29+
dbname: dbname,
30+
pebbleDB: pebbleDB,
31+
}
32+
}
33+
34+
func (c *PebbleDBCheckpointCommand) Handler(ctx context.Context, req *admin.CommandRequest) (interface{}, error) {
35+
log.Info().Msgf("admintool: creating %v database checkpoint", c.dbname)
36+
37+
targetDir := nextTmpFolder(c.checkpointDir)
38+
39+
log.Info().Msgf("admintool: creating %v database checkpoint at: %v", c.dbname, targetDir)
40+
41+
err := c.pebbleDB.Checkpoint(targetDir)
42+
if err != nil {
43+
return nil, admin.NewInvalidAdminReqErrorf("failed to create %v pebbledb checkpoint at %v: %w", c.dbname, targetDir, err)
44+
}
45+
46+
log.Info().Msgf("admintool: successfully created %v database checkpoint at: %v", c.dbname, targetDir)
47+
48+
return fmt.Sprintf("successfully created %v db checkpoint at %v", c.dbname, targetDir), nil
49+
}
50+
51+
func (c *PebbleDBCheckpointCommand) Validator(req *admin.CommandRequest) error {
52+
return nil
53+
}
54+
55+
func nextTmpFolder(dir string) string {
56+
// use timestamp as folder name
57+
folderName := time.Now().Format("2006-01-02_15-04-05")
58+
return path.Join(dir, folderName)
59+
}

cmd/access/node_builder/access_node_builder.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -1458,11 +1458,25 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
14581458
defaultConfig.registerDBPruneThreshold,
14591459
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))
14601460

1461-
flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
1461+
// websockets config
1462+
flags.DurationVar(
1463+
&builder.rpcConf.WebSocketConfig.InactivityTimeout,
14621464
"websocket-inactivity-timeout",
14631465
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
1464-
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
1465-
1466+
"the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed",
1467+
)
1468+
flags.Uint64Var(
1469+
&builder.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
1470+
"websocket-max-subscriptions-per-connection",
1471+
defaultConfig.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
1472+
"the maximum number of active WebSocket subscriptions allowed per connection",
1473+
)
1474+
flags.Float64Var(
1475+
&builder.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
1476+
"websocket-max-responses-per-second",
1477+
defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
1478+
fmt.Sprintf("the maximum number of responses that can be sent to a single client per second. Default: %f. if set to 0, no limit is applied to the number of responses per second.", defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond),
1479+
)
14661480
flags.BoolVar(
14671481
&builder.rpcConf.EnableWebSocketsStreamAPI,
14681482
"experimental-enable-websockets-stream-api",

cmd/execution_builder.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
239239
// Load the admin tool when chunk data packs db are initialized in execution state
240240
AdminCommand("create-chunk-data-packs-checkpoint", func(config *NodeConfig) commands.AdminCommand {
241241
// by default checkpoints will be created under "/data/chunk_data_packs_checkpoints_dir"
242-
return storageCommands.NewChunksCheckpointCommand(exeNode.exeConf.chunkDataPackCheckpointsDir,
243-
exeNode.chunkDataPackDB)
242+
return storageCommands.NewPebbleDBCheckpointCommand(exeNode.exeConf.chunkDataPackCheckpointsDir,
243+
"chunk_data_pack", exeNode.chunkDataPackDB)
244244
}).
245245
Component("stop control", exeNode.LoadStopControl).
246246
Component("execution state ledger WAL compactor", exeNode.LoadExecutionStateLedgerWALCompactor).

cmd/node_builder.go

+1
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ type BaseConfig struct {
155155
DynamicStartupSleepInterval time.Duration
156156
datadir string
157157
pebbleDir string
158+
pebbleCheckpointsDir string
158159
dbops string
159160
badgerDB *badger.DB
160161
pebbleDB *pebble.DB

cmd/observer/node_builder/observer_builder.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -820,11 +820,25 @@ func (builder *ObserverServiceBuilder) extraFlags() {
820820
defaultConfig.registerDBPruneThreshold,
821821
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))
822822

823-
flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
823+
// websockets config
824+
flags.DurationVar(
825+
&builder.rpcConf.WebSocketConfig.InactivityTimeout,
824826
"websocket-inactivity-timeout",
825827
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
826-
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
827-
828+
"the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed",
829+
)
830+
flags.Uint64Var(
831+
&builder.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
832+
"websocket-max-subscriptions-per-connection",
833+
defaultConfig.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
834+
"the maximum number of active WebSocket subscriptions allowed per connection",
835+
)
836+
flags.Float64Var(
837+
&builder.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
838+
"websocket-max-responses-per-second",
839+
defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
840+
fmt.Sprintf("the maximum number of responses that can be sent to a single client per second. Default: %f. if set to 0, no limit is applied to the number of responses per second.", defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond),
841+
)
828842
flags.BoolVar(
829843
&builder.rpcConf.EnableWebSocketsStreamAPI,
830844
"experimental-enable-websockets-stream-api",

cmd/scaffold.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
171171
fnb.flags.StringVar(&fnb.BaseConfig.BindAddr, "bind", defaultConfig.BindAddr, "address to bind on")
172172
fnb.flags.StringVarP(&fnb.BaseConfig.BootstrapDir, "bootstrapdir", "b", defaultConfig.BootstrapDir, "path to the bootstrap directory")
173173
fnb.flags.StringVarP(&fnb.BaseConfig.datadir, "datadir", "d", defaultConfig.datadir, "directory to store the public database (protocol state)")
174-
fnb.flags.StringVar(&fnb.BaseConfig.pebbleDir, "pebbledir", defaultConfig.pebbleDir, "directory to store the public pebble database (protocol state)")
174+
fnb.flags.StringVar(&fnb.BaseConfig.pebbleCheckpointsDir, "pebble-checkpoints-dir", defaultConfig.pebbleCheckpointsDir, "directory to store the checkpoints for the public pebble database (protocol state)")
175+
fnb.flags.StringVar(&fnb.BaseConfig.pebbleDir, "pebble-dir", defaultConfig.pebbleDir, "directory to store the public pebble database (protocol state)")
175176
fnb.flags.StringVar(&fnb.BaseConfig.secretsdir, "secretsdir", defaultConfig.secretsdir, "directory to store private database (secrets)")
176177
fnb.flags.StringVar(&fnb.BaseConfig.dbops, "dbops", defaultConfig.dbops, "database operations to use (badger-transaction, batch-update, pebble-update)")
177178
fnb.flags.StringVarP(&fnb.BaseConfig.level, "loglevel", "l", defaultConfig.level, "level for logging output")
@@ -2102,6 +2103,9 @@ func (fnb *FlowNodeBuilder) RegisterDefaultAdminCommands() {
21022103
return storageCommands.NewReadSealsCommand(config.State, config.Storage.Seals, config.Storage.Index)
21032104
}).AdminCommand("get-latest-identity", func(config *NodeConfig) commands.AdminCommand {
21042105
return common.NewGetIdentityCommand(config.IdentityProvider)
2106+
}).AdminCommand("create-pebble-checkpoint", func(config *NodeConfig) commands.AdminCommand {
2107+
// by default checkpoints will be created under "/data/protocol_pebble_checkpoints"
2108+
return storageCommands.NewPebbleDBCheckpointCommand(config.pebbleCheckpointsDir, "protocol", config.PebbleDB)
21052109
})
21062110
}
21072111

cmd/scaffold/pebble_db.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func InitPebbleDB(logger zerolog.Logger, dir string) (*pebble.DB, io.Closer, err
1717
// since we've set an default directory for the pebble DB, this check
1818
// is not necessary, but rather a sanity check
1919
if dir == "not set" {
20-
return nil, nil, fmt.Errorf("missing required flag '--pebbledir'")
20+
return nil, nil, fmt.Errorf("missing required flag '--pebble-dir'")
2121
}
2222

2323
// Pre-create DB path
+23-16
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package cmd
22

33
import (
4+
"fmt"
5+
46
"github.com/rs/zerolog/log"
57
"github.com/spf13/cobra"
68

7-
"github.com/onflow/flow-go/cmd/util/cmd/common"
89
findBlockByCommits "github.com/onflow/flow-go/cmd/util/cmd/read-badger/cmd/find-block-by-commits"
910
"github.com/onflow/flow-go/model/flow"
11+
"github.com/onflow/flow-go/module/metrics"
12+
"github.com/onflow/flow-go/storage"
13+
"github.com/onflow/flow-go/storage/store"
1014
)
1115

1216
func init() {
@@ -22,25 +26,28 @@ var commitsCmd = &cobra.Command{
2226
Use: "commits",
2327
Short: "get commit by block ID",
2428
Run: func(cmd *cobra.Command, args []string) {
25-
_, db := InitStorages()
26-
defer db.Close()
29+
err := WithStorage(func(db storage.DB) error {
2730

28-
commits := common.InitExecutionStorages(db).Commits
31+
commits := store.NewCommits(metrics.NewNoopCollector(), db)
2932

30-
log.Info().Msgf("got flag block id: %s", flagBlockID)
31-
blockID, err := flow.HexStringToIdentifier(flagBlockID)
32-
if err != nil {
33-
log.Error().Err(err).Msg("malformed block id")
34-
return
35-
}
33+
log.Info().Msgf("got flag block id: %s", flagBlockID)
34+
blockID, err := flow.HexStringToIdentifier(flagBlockID)
35+
if err != nil {
36+
return fmt.Errorf("malformed block id: %w", err)
37+
}
38+
39+
log.Info().Msgf("getting commit by block id: %v", blockID)
40+
commit, err := commits.ByBlockID(blockID)
41+
if err != nil {
42+
return fmt.Errorf("could not get commit for block id: %v: %w", blockID, err)
43+
}
44+
45+
log.Info().Msgf("commit: %x", commit)
46+
return nil
47+
})
3648

37-
log.Info().Msgf("getting commit by block id: %v", blockID)
38-
commit, err := commits.ByBlockID(blockID)
3949
if err != nil {
40-
log.Error().Err(err).Msgf("could not get commit for block id: %v", blockID)
41-
return
50+
log.Error().Err(err).Msg("could not get events")
4251
}
43-
44-
log.Info().Msgf("commit: %x", commit)
4552
},
4653
}

0 commit comments

Comments
 (0)