Skip to content

Commit 1447b40

Browse files
authored
Merge branch 'master' into fxamacker/close-pebbledb-batch
2 parents eac2189 + 89dfb91 commit 1447b40

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+3129
-501
lines changed

cmd/access/node_builder/access_node_builder.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -1458,11 +1458,25 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
14581458
defaultConfig.registerDBPruneThreshold,
14591459
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))
14601460

1461-
flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
1461+
// websockets config
1462+
flags.DurationVar(
1463+
&builder.rpcConf.WebSocketConfig.InactivityTimeout,
14621464
"websocket-inactivity-timeout",
14631465
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
1464-
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
1465-
1466+
"the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed",
1467+
)
1468+
flags.Uint64Var(
1469+
&builder.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
1470+
"websocket-max-subscriptions-per-connection",
1471+
defaultConfig.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
1472+
"the maximum number of active WebSocket subscriptions allowed per connection",
1473+
)
1474+
flags.Float64Var(
1475+
&builder.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
1476+
"websocket-max-responses-per-second",
1477+
defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
1478+
fmt.Sprintf("the maximum number of responses that can be sent to a single client per second. Default: %f. if set to 0, no limit is applied to the number of responses per second.", defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond),
1479+
)
14661480
flags.BoolVar(
14671481
&builder.rpcConf.EnableWebSocketsStreamAPI,
14681482
"experimental-enable-websockets-stream-api",

cmd/execution_builder.go

+64-18
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/onflow/flow-go/engine/execution/ingestion/fetcher"
5555
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
5656
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
57+
"github.com/onflow/flow-go/engine/execution/migration"
5758
exeprovider "github.com/onflow/flow-go/engine/execution/provider"
5859
exepruner "github.com/onflow/flow-go/engine/execution/pruner"
5960
"github.com/onflow/flow-go/engine/execution/rpc"
@@ -92,10 +93,13 @@ import (
9293
"github.com/onflow/flow-go/state/protocol/blocktimer"
9394
storageerr "github.com/onflow/flow-go/storage"
9495
storage "github.com/onflow/flow-go/storage/badger"
96+
"github.com/onflow/flow-go/storage/dbops"
9597
"github.com/onflow/flow-go/storage/operation"
98+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
9699
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
97100
storagepebble "github.com/onflow/flow-go/storage/pebble"
98101
"github.com/onflow/flow-go/storage/store"
102+
"github.com/onflow/flow-go/storage/store/chained"
99103
)
100104

101105
const (
@@ -136,13 +140,17 @@ type ExecutionNode struct {
136140
registerStore *storehouse.RegisterStore
137141

138142
// storage
139-
events storageerr.Events
140-
serviceEvents storageerr.ServiceEvents
141-
txResults storageerr.TransactionResults
142-
results storageerr.ExecutionResults
143-
receipts storageerr.ExecutionReceipts
144-
myReceipts storageerr.MyExecutionReceipts
145-
commits storageerr.Commits
143+
events storageerr.Events
144+
eventsReader storageerr.EventsReader
145+
serviceEvents storageerr.ServiceEvents
146+
txResults storageerr.TransactionResults
147+
txResultsReader storageerr.TransactionResultsReader
148+
results storageerr.ExecutionResults
149+
resultsReader storageerr.ExecutionResultsReader
150+
receipts storageerr.ExecutionReceipts
151+
myReceipts storageerr.MyExecutionReceipts
152+
commits storageerr.Commits
153+
commitsReader storageerr.CommitsReader
146154

147155
chunkDataPackDB *pebble.DB
148156
chunkDataPacks storageerr.ChunkDataPacks
@@ -216,11 +224,11 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
216224
Module("blobservice peer manager dependencies", exeNode.LoadBlobservicePeerManagerDependencies).
217225
Module("bootstrap", exeNode.LoadBootstrapper).
218226
Module("register store", exeNode.LoadRegisterStore).
227+
Module("migrate last executed block", exeNode.MigrateLastSealedExecutedResultToPebble).
219228
AdminCommand("get-transactions", func(conf *NodeConfig) commands.AdminCommand {
220229
return storageCommands.NewGetTransactionsCommand(conf.State, conf.Storage.Payloads, exeNode.collections)
221230
}).
222231
Component("execution state ledger", exeNode.LoadExecutionStateLedger).
223-
224232
// TODO: Modules should be able to depends on components
225233
// Because all modules are always bootstrapped first, before components,
226234
// its not possible to have a module depending on a Component.
@@ -331,15 +339,32 @@ func (exeNode *ExecutionNode) LoadExecutionStorage(
331339
node *NodeConfig,
332340
) error {
333341
db := node.ProtocolDB
342+
343+
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
344+
exeNode.serviceEvents = store.NewServiceEvents(node.Metrics.Cache, db)
334345
exeNode.commits = store.NewCommits(node.Metrics.Cache, db)
335346
exeNode.results = store.NewExecutionResults(node.Metrics.Cache, db)
336347
exeNode.receipts = store.NewExecutionReceipts(node.Metrics.Cache, db, exeNode.results, storage.DefaultCacheSize)
337348
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)
338-
339-
// Needed for gRPC server, make sure to assign to main scoped vars
340-
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
341-
exeNode.serviceEvents = store.NewServiceEvents(node.Metrics.Cache, db)
342349
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
350+
351+
if dbops.IsBadgerBased(node.dbops) {
352+
// if data are stored in badger, we can use the same storage for all data
353+
exeNode.eventsReader = exeNode.events
354+
exeNode.commitsReader = exeNode.commits
355+
exeNode.resultsReader = exeNode.results
356+
exeNode.txResultsReader = exeNode.txResults
357+
} else if dbops.IsPebbleBatch(node.dbops) {
358+
// when data are stored in pebble, we need to use chained storage to query data from
359+
// both pebble and badger
360+
// note the pebble storage is the first argument, and badger storage is the second, so
361+
// the data will be queried from pebble first, then badger
362+
badgerDB := badgerimpl.ToDB(node.DB)
363+
exeNode.eventsReader = chained.NewEvents(exeNode.events, store.NewEvents(node.Metrics.Cache, badgerDB))
364+
exeNode.commitsReader = chained.NewCommits(exeNode.commits, store.NewCommits(node.Metrics.Cache, badgerDB))
365+
exeNode.resultsReader = chained.NewExecutionResults(exeNode.results, store.NewExecutionResults(node.Metrics.Cache, badgerDB))
366+
exeNode.txResultsReader = chained.NewTransactionResults(exeNode.txResults, store.NewTransactionResults(node.Metrics.Cache, badgerDB, exeNode.exeConf.transactionResultsCacheSize))
367+
}
343368
return nil
344369
}
345370

@@ -470,6 +495,7 @@ func (exeNode *ExecutionNode) LoadGCPBlockDataUploader(
470495
)
471496

472497
// Setting up RetryableUploader for GCP uploader
498+
// deprecated
473499
retryableUploader := uploader.NewBadgerRetryableUploaderWrapper(
474500
asyncUploader,
475501
node.Storage.Blocks,
@@ -730,6 +756,16 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error {
730756
return nil
731757
}
732758

759+
func (exeNode *ExecutionNode) MigrateLastSealedExecutedResultToPebble(node *NodeConfig) error {
760+
// Migrate the last sealed executed
761+
err := migration.MigrateLastSealedExecutedResultToPebble(node.Logger, node.DB, node.PebbleDB, node.State, node.RootSeal)
762+
if err != nil {
763+
return fmt.Errorf("could not migrate last sealed executed result to pebble: %w", err)
764+
}
765+
766+
return nil
767+
}
768+
733769
func (exeNode *ExecutionNode) LoadExecutionState(
734770
node *NodeConfig,
735771
) (
@@ -765,6 +801,8 @@ func (exeNode *ExecutionNode) LoadExecutionState(
765801
exeNode.chunkDataPackDB = chunkDataPackDB
766802
exeNode.chunkDataPacks = chunkDataPacks
767803

804+
// migrate execution data for last sealed and executed block
805+
768806
exeNode.executionState = state.NewExecutionState(
769807
exeNode.ledgerStorage,
770808
exeNode.commits,
@@ -1353,10 +1391,10 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
13531391
exeNode.scriptsEng,
13541392
node.Storage.Headers,
13551393
node.State,
1356-
exeNode.events,
1357-
exeNode.results,
1358-
exeNode.txResults,
1359-
exeNode.commits,
1394+
exeNode.eventsReader,
1395+
exeNode.resultsReader,
1396+
exeNode.txResultsReader,
1397+
exeNode.commitsReader,
13601398
exeNode.metricsProvider,
13611399
node.RootChainID,
13621400
signature.NewBlockSignerDecoder(exeNode.committee),
@@ -1370,7 +1408,10 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
13701408
// check if the execution database already exists
13711409
bootstrapper := bootstrap.NewBootstrapper(node.Logger)
13721410

1373-
commit, bootstrapped, err := bootstrapper.IsBootstrapped(node.ProtocolDB)
1411+
// in order to support switching from badger to pebble in the middle of the spork,
1412+
// we will check if the execution database has been bootstrapped by reading the state from badger db.
1413+
// and if not, bootstrap both badger and pebble db.
1414+
commit, bootstrapped, err := bootstrapper.IsBootstrapped(badgerimpl.ToDB(node.DB))
13741415
if err != nil {
13751416
return fmt.Errorf("could not query database to know whether database has been bootstrapped: %w", err)
13761417
}
@@ -1395,7 +1436,12 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
13951436
return fmt.Errorf("could not load bootstrap state from checkpoint file: %w", err)
13961437
}
13971438

1398-
err = bootstrapper.BootstrapExecutionDatabase(node.ProtocolDB, node.RootSeal)
1439+
err = bootstrapper.BootstrapExecutionDatabase(badgerimpl.ToDB(node.DB), node.RootSeal)
1440+
if err != nil {
1441+
return fmt.Errorf("could not bootstrap execution database: %w", err)
1442+
}
1443+
1444+
err = bootstrapper.BootstrapExecutionDatabase(pebbleimpl.ToDB(node.PebbleDB), node.RootSeal)
13991445
if err != nil {
14001446
return fmt.Errorf("could not bootstrap execution database: %w", err)
14011447
}

cmd/observer/node_builder/observer_builder.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -820,11 +820,25 @@ func (builder *ObserverServiceBuilder) extraFlags() {
820820
defaultConfig.registerDBPruneThreshold,
821821
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))
822822

823-
flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
823+
// websockets config
824+
flags.DurationVar(
825+
&builder.rpcConf.WebSocketConfig.InactivityTimeout,
824826
"websocket-inactivity-timeout",
825827
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
826-
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
827-
828+
"the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed",
829+
)
830+
flags.Uint64Var(
831+
&builder.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
832+
"websocket-max-subscriptions-per-connection",
833+
defaultConfig.rpcConf.WebSocketConfig.MaxSubscriptionsPerConnection,
834+
"the maximum number of active WebSocket subscriptions allowed per connection",
835+
)
836+
flags.Float64Var(
837+
&builder.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
838+
"websocket-max-responses-per-second",
839+
defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond,
840+
fmt.Sprintf("the maximum number of responses that can be sent to a single client per second. Default: %f. if set to 0, no limit is applied to the number of responses per second.", defaultConfig.rpcConf.WebSocketConfig.MaxResponsesPerSecond),
841+
)
828842
flags.BoolVar(
829843
&builder.rpcConf.EnableWebSocketsStreamAPI,
830844
"experimental-enable-websockets-stream-api",

cmd/util/cmd/read-badger/cmd/stats.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
"runtime"
6+
7+
"github.com/rs/zerolog/log"
8+
"github.com/spf13/cobra"
9+
10+
"github.com/onflow/flow-go/cmd/util/cmd/common"
11+
"github.com/onflow/flow-go/storage"
12+
"github.com/onflow/flow-go/storage/operation"
13+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
14+
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
15+
"github.com/onflow/flow-go/storage/pebble"
16+
)
17+
18+
var flagDBType string
19+
20+
func init() {
21+
rootCmd.AddCommand(statsCmd)
22+
statsCmd.Flags().StringVar(&flagDBType, "dbtype", "badger", "database type to use (badger or pebble)")
23+
}
24+
25+
var statsCmd = &cobra.Command{
26+
Use: "stats",
27+
Short: "get stats for the database, such as key count, total value size, min/max value size etc",
28+
RunE: func(cmd *cobra.Command, args []string) error {
29+
var sdb storage.DB
30+
if flagDBType == "badger" {
31+
db := common.InitStorage(flagDatadir)
32+
defer db.Close()
33+
sdb = badgerimpl.ToDB(db)
34+
} else if flagDBType == "pebble" {
35+
pdb, err := pebble.MustOpenDefaultPebbleDB(log.Logger, flagPebbleDir)
36+
if err != nil {
37+
return fmt.Errorf("failed to open pebble db: %w", err)
38+
}
39+
defer pdb.Close()
40+
sdb = pebbleimpl.ToDB(pdb)
41+
} else {
42+
return fmt.Errorf("invalid db type")
43+
}
44+
45+
numWorkers := runtime.NumCPU()
46+
if numWorkers > 256 {
47+
numWorkers = 256
48+
}
49+
log.Info().Msgf("getting stats for %s db at %s with %v workers0", flagDBType, flagDatadir, numWorkers)
50+
stats, err := operation.SummarizeKeysByFirstByteConcurrent(log.Logger, sdb.Reader(), numWorkers)
51+
if err != nil {
52+
return fmt.Errorf("failed to get stats: %w", err)
53+
}
54+
55+
operation.PrintStats(log.Logger, stats)
56+
return nil
57+
},
58+
}

engine/access/rest/http/request/address.go engine/access/rest/common/parser/address.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"fmt"

engine/access/rest/http/request/address_test.go engine/access/rest/common/parser/address_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"strings"

engine/access/rest/http/request/arguments.go engine/access/rest/common/parser/arguments.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"fmt"
@@ -7,6 +7,7 @@ import (
77
)
88

99
const maxArgumentsLength = 100
10+
const MaxAllowedScriptArguments = 100
1011

1112
type Arguments [][]byte
1213

@@ -25,7 +26,7 @@ func (a *Arguments) Parse(raw []string) error {
2526
}
2627

2728
if len(args) > maxArgumentsLength {
28-
return fmt.Errorf("too many arguments. Maximum arguments allowed: %d", maxAllowedScriptArguments)
29+
return fmt.Errorf("too many arguments. Maximum arguments allowed: %d", MaxAllowedScriptArguments)
2930
}
3031

3132
*a = args

engine/access/rest/http/request/arguments_test.go engine/access/rest/common/parser/arguments_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"fmt"
@@ -21,13 +21,13 @@ func TestArguments_InvalidParse(t *testing.T) {
2121
assert.EqualError(t, err, "invalid argument encoding: illegal base64 data at input byte 0", a)
2222
}
2323

24-
tooLong := make([]string, maxAllowedScriptArguments+1)
24+
tooLong := make([]string, MaxAllowedScriptArguments+1)
2525
for i := range tooLong {
2626
tooLong[i] = "dGVzdA=="
2727
}
2828

2929
err := arguments.Parse(tooLong)
30-
assert.EqualError(t, err, fmt.Sprintf("too many arguments. Maximum arguments allowed: %d", maxAllowedScriptArguments))
30+
assert.EqualError(t, err, fmt.Sprintf("too many arguments. Maximum arguments allowed: %d", MaxAllowedScriptArguments))
3131
}
3232

3333
func TestArguments_ValidParse(t *testing.T) {

engine/access/rest/http/request/proposal_key.go engine/access/rest/common/parser/proposal_key.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"fmt"

engine/access/rest/http/request/signature_test.go engine/access/rest/common/parser/signature_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"encoding/hex"

engine/access/rest/http/request/signatures.go engine/access/rest/common/parser/signatures.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"fmt"

0 commit comments

Comments
 (0)