Skip to content

Commit 89dfb91

Browse files
authored
Merge pull request #7117 from onflow/leo/db-ops-migrate-en-results-to-pebble
[Execution] Migrate last executed block from badger to pebble
2 parents 418a168 + 318a7d1 commit 89dfb91

17 files changed

+1090
-54
lines changed

cmd/execution_builder.go

+64-18
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/onflow/flow-go/engine/execution/ingestion/fetcher"
5555
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
5656
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
57+
"github.com/onflow/flow-go/engine/execution/migration"
5758
exeprovider "github.com/onflow/flow-go/engine/execution/provider"
5859
exepruner "github.com/onflow/flow-go/engine/execution/pruner"
5960
"github.com/onflow/flow-go/engine/execution/rpc"
@@ -92,10 +93,13 @@ import (
9293
"github.com/onflow/flow-go/state/protocol/blocktimer"
9394
storageerr "github.com/onflow/flow-go/storage"
9495
storage "github.com/onflow/flow-go/storage/badger"
96+
"github.com/onflow/flow-go/storage/dbops"
9597
"github.com/onflow/flow-go/storage/operation"
98+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
9699
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
97100
storagepebble "github.com/onflow/flow-go/storage/pebble"
98101
"github.com/onflow/flow-go/storage/store"
102+
"github.com/onflow/flow-go/storage/store/chained"
99103
)
100104

101105
const (
@@ -136,13 +140,17 @@ type ExecutionNode struct {
136140
registerStore *storehouse.RegisterStore
137141

138142
// storage
139-
events storageerr.Events
140-
serviceEvents storageerr.ServiceEvents
141-
txResults storageerr.TransactionResults
142-
results storageerr.ExecutionResults
143-
receipts storageerr.ExecutionReceipts
144-
myReceipts storageerr.MyExecutionReceipts
145-
commits storageerr.Commits
143+
events storageerr.Events
144+
eventsReader storageerr.EventsReader
145+
serviceEvents storageerr.ServiceEvents
146+
txResults storageerr.TransactionResults
147+
txResultsReader storageerr.TransactionResultsReader
148+
results storageerr.ExecutionResults
149+
resultsReader storageerr.ExecutionResultsReader
150+
receipts storageerr.ExecutionReceipts
151+
myReceipts storageerr.MyExecutionReceipts
152+
commits storageerr.Commits
153+
commitsReader storageerr.CommitsReader
146154

147155
chunkDataPackDB *pebble.DB
148156
chunkDataPacks storageerr.ChunkDataPacks
@@ -216,11 +224,11 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
216224
Module("blobservice peer manager dependencies", exeNode.LoadBlobservicePeerManagerDependencies).
217225
Module("bootstrap", exeNode.LoadBootstrapper).
218226
Module("register store", exeNode.LoadRegisterStore).
227+
Module("migrate last executed block", exeNode.MigrateLastSealedExecutedResultToPebble).
219228
AdminCommand("get-transactions", func(conf *NodeConfig) commands.AdminCommand {
220229
return storageCommands.NewGetTransactionsCommand(conf.State, conf.Storage.Payloads, exeNode.collections)
221230
}).
222231
Component("execution state ledger", exeNode.LoadExecutionStateLedger).
223-
224232
// TODO: Modules should be able to depends on components
225233
// Because all modules are always bootstrapped first, before components,
226234
// its not possible to have a module depending on a Component.
@@ -331,15 +339,32 @@ func (exeNode *ExecutionNode) LoadExecutionStorage(
331339
node *NodeConfig,
332340
) error {
333341
db := node.ProtocolDB
342+
343+
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
344+
exeNode.serviceEvents = store.NewServiceEvents(node.Metrics.Cache, db)
334345
exeNode.commits = store.NewCommits(node.Metrics.Cache, db)
335346
exeNode.results = store.NewExecutionResults(node.Metrics.Cache, db)
336347
exeNode.receipts = store.NewExecutionReceipts(node.Metrics.Cache, db, exeNode.results, storage.DefaultCacheSize)
337348
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)
338-
339-
// Needed for gRPC server, make sure to assign to main scoped vars
340-
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
341-
exeNode.serviceEvents = store.NewServiceEvents(node.Metrics.Cache, db)
342349
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
350+
351+
if dbops.IsBadgerBased(node.dbops) {
352+
// if data are stored in badger, we can use the same storage for all data
353+
exeNode.eventsReader = exeNode.events
354+
exeNode.commitsReader = exeNode.commits
355+
exeNode.resultsReader = exeNode.results
356+
exeNode.txResultsReader = exeNode.txResults
357+
} else if dbops.IsPebbleBatch(node.dbops) {
358+
// when data are stored in pebble, we need to use chained storage to query data from
359+
// both pebble and badger
360+
// note the pebble storage is the first argument, and badger storage is the second, so
361+
// the data will be queried from pebble first, then badger
362+
badgerDB := badgerimpl.ToDB(node.DB)
363+
exeNode.eventsReader = chained.NewEvents(exeNode.events, store.NewEvents(node.Metrics.Cache, badgerDB))
364+
exeNode.commitsReader = chained.NewCommits(exeNode.commits, store.NewCommits(node.Metrics.Cache, badgerDB))
365+
exeNode.resultsReader = chained.NewExecutionResults(exeNode.results, store.NewExecutionResults(node.Metrics.Cache, badgerDB))
366+
exeNode.txResultsReader = chained.NewTransactionResults(exeNode.txResults, store.NewTransactionResults(node.Metrics.Cache, badgerDB, exeNode.exeConf.transactionResultsCacheSize))
367+
}
343368
return nil
344369
}
345370

@@ -470,6 +495,7 @@ func (exeNode *ExecutionNode) LoadGCPBlockDataUploader(
470495
)
471496

472497
// Setting up RetryableUploader for GCP uploader
498+
// deprecated
473499
retryableUploader := uploader.NewBadgerRetryableUploaderWrapper(
474500
asyncUploader,
475501
node.Storage.Blocks,
@@ -730,6 +756,16 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error {
730756
return nil
731757
}
732758

759+
func (exeNode *ExecutionNode) MigrateLastSealedExecutedResultToPebble(node *NodeConfig) error {
760+
// Migrate the last sealed executed
761+
err := migration.MigrateLastSealedExecutedResultToPebble(node.Logger, node.DB, node.PebbleDB, node.State, node.RootSeal)
762+
if err != nil {
763+
return fmt.Errorf("could not migrate last sealed executed result to pebble: %w", err)
764+
}
765+
766+
return nil
767+
}
768+
733769
func (exeNode *ExecutionNode) LoadExecutionState(
734770
node *NodeConfig,
735771
) (
@@ -765,6 +801,8 @@ func (exeNode *ExecutionNode) LoadExecutionState(
765801
exeNode.chunkDataPackDB = chunkDataPackDB
766802
exeNode.chunkDataPacks = chunkDataPacks
767803

804+
// migrate execution data for last sealed and executed block
805+
768806
exeNode.executionState = state.NewExecutionState(
769807
exeNode.ledgerStorage,
770808
exeNode.commits,
@@ -1353,10 +1391,10 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
13531391
exeNode.scriptsEng,
13541392
node.Storage.Headers,
13551393
node.State,
1356-
exeNode.events,
1357-
exeNode.results,
1358-
exeNode.txResults,
1359-
exeNode.commits,
1394+
exeNode.eventsReader,
1395+
exeNode.resultsReader,
1396+
exeNode.txResultsReader,
1397+
exeNode.commitsReader,
13601398
exeNode.metricsProvider,
13611399
node.RootChainID,
13621400
signature.NewBlockSignerDecoder(exeNode.committee),
@@ -1370,7 +1408,10 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
13701408
// check if the execution database already exists
13711409
bootstrapper := bootstrap.NewBootstrapper(node.Logger)
13721410

1373-
commit, bootstrapped, err := bootstrapper.IsBootstrapped(node.ProtocolDB)
1411+
// in order to support switching from badger to pebble in the middle of the spork,
1412+
// we will check if the execution database has been bootstrapped by reading the state from badger db.
1413+
// and if not, bootstrap both badger and pebble db.
1414+
commit, bootstrapped, err := bootstrapper.IsBootstrapped(badgerimpl.ToDB(node.DB))
13741415
if err != nil {
13751416
return fmt.Errorf("could not query database to know whether database has been bootstrapped: %w", err)
13761417
}
@@ -1395,7 +1436,12 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
13951436
return fmt.Errorf("could not load bootstrap state from checkpoint file: %w", err)
13961437
}
13971438

1398-
err = bootstrapper.BootstrapExecutionDatabase(node.ProtocolDB, node.RootSeal)
1439+
err = bootstrapper.BootstrapExecutionDatabase(badgerimpl.ToDB(node.DB), node.RootSeal)
1440+
if err != nil {
1441+
return fmt.Errorf("could not bootstrap execution database: %w", err)
1442+
}
1443+
1444+
err = bootstrapper.BootstrapExecutionDatabase(pebbleimpl.ToDB(node.PebbleDB), node.RootSeal)
13991445
if err != nil {
14001446
return fmt.Errorf("could not bootstrap execution database: %w", err)
14011447
}
+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package migration
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/cockroachdb/pebble"
8+
"github.com/dgraph-io/badger/v2"
9+
"github.com/rs/zerolog"
10+
11+
"github.com/onflow/flow-go/engine/execution/state/bootstrap"
12+
"github.com/onflow/flow-go/model/flow"
13+
"github.com/onflow/flow-go/module/block_iterator/latest"
14+
"github.com/onflow/flow-go/module/metrics"
15+
"github.com/onflow/flow-go/state/protocol"
16+
"github.com/onflow/flow-go/storage"
17+
"github.com/onflow/flow-go/storage/operation"
18+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
19+
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
20+
"github.com/onflow/flow-go/storage/store"
21+
)
22+
23+
var (
24+
mainnet26SporkID flow.Identifier
25+
testnet52SporkID flow.Identifier
26+
)
27+
28+
func init() {
29+
var err error
30+
31+
mainnet26SporkID, err = flow.HexStringToIdentifier("45894bde3f45dbfd89cab12be84e9172385da5079d40bf63979ca8a6a7ede741")
32+
if err != nil {
33+
panic(fmt.Sprintf("failed to parse Mainnet26SporkID: %v", err))
34+
}
35+
36+
testnet52SporkID, err = flow.HexStringToIdentifier("5b88b81cfce2619305213489c2137f98e6efa0ec333dab2c31042b743388a3ce")
37+
if err != nil {
38+
panic(fmt.Sprintf("failed to parse Testnet52SporkID: %v", err))
39+
}
40+
}
41+
42+
// MigrateLastSealedExecutedResultToPebble run the migration to the pebble database, so that
43+
// it has necessary data to be able execute the next block.
44+
// the migration includes the following operations:
45+
// 1. bootstrap the pebble database
46+
// 2. copy execution data of the last sealed and executed block from badger to pebble.
47+
// the execution data includes the execution result and statecommitment, which is the minimum data needed from the database
48+
// to be able to continue executing the next block
49+
func MigrateLastSealedExecutedResultToPebble(logger zerolog.Logger, badgerDB *badger.DB, pebbleDB *pebble.DB, state protocol.State, rootSeal *flow.Seal) error {
50+
// only run the migration for mainnet26 and testnet52
51+
sporkID := state.Params().SporkID()
52+
chainID := state.Params().ChainID()
53+
if chainID == flow.Mainnet || chainID == flow.Testnet {
54+
if sporkID != mainnet26SporkID && sporkID != testnet52SporkID {
55+
logger.Warn().Msgf("spork ID %v is not Mainnet26SporkID %v or Testnet52SporkID %v, skip migration",
56+
sporkID, mainnet26SporkID, testnet52SporkID)
57+
return nil
58+
}
59+
}
60+
61+
bdb := badgerimpl.ToDB(badgerDB)
62+
pdb := pebbleimpl.ToDB(pebbleDB)
63+
lg := logger.With().Str("module", "badger-pebble-migration").Logger()
64+
65+
// bootstrap pebble database
66+
bootstrapper := bootstrap.NewBootstrapper(logger)
67+
commit, bootstrapped, err := bootstrapper.IsBootstrapped(pdb)
68+
if err != nil {
69+
return fmt.Errorf("could not query database to know whether database has been bootstrapped: %w", err)
70+
}
71+
72+
if !bootstrapped {
73+
err = bootstrapper.BootstrapExecutionDatabase(pdb, rootSeal)
74+
if err != nil {
75+
return fmt.Errorf("could not bootstrap pebble execution database: %w", err)
76+
}
77+
}
78+
79+
// get last sealed and executed block in badger
80+
lastExecutedSealedHeightInBadger, err := latest.LatestSealedAndExecutedHeight(state, bdb)
81+
if err != nil {
82+
return fmt.Errorf("failed to get last executed sealed block: %w", err)
83+
}
84+
85+
// read all the data and save to pebble
86+
header, err := state.AtHeight(lastExecutedSealedHeightInBadger).Head()
87+
if err != nil {
88+
return fmt.Errorf("failed to get block at height %d: %w", lastExecutedSealedHeightInBadger, err)
89+
}
90+
91+
blockID := header.ID()
92+
93+
lg.Info().Msgf(
94+
"migrating last executed and sealed block %v (%v) from badger to pebble",
95+
header.Height, blockID)
96+
97+
// create badger storage modules
98+
badgerResults, badgerCommits := createStores(bdb)
99+
// read data from badger
100+
result, commit, err := readResultsForBlock(blockID, badgerResults, badgerCommits)
101+
102+
if err != nil {
103+
return fmt.Errorf("failed to read data from badger: %w", err)
104+
}
105+
106+
// create pebble storage modules
107+
pebbleResults, pebbleCommits := createStores(pdb)
108+
109+
var existingExecuted flow.Identifier
110+
err = operation.RetrieveExecutedBlock(pdb.Reader(), &existingExecuted)
111+
if err == nil {
112+
// there is an executed block in pebble, compare if it's newer than the badger one,
113+
// if newer, it means EN is storing new results in pebble, in this case, we don't
114+
// want to update the executed block with the badger one.
115+
116+
header, err := state.AtBlockID(existingExecuted).Head()
117+
if err != nil {
118+
return fmt.Errorf("failed to get block at height %d from badger: %w", lastExecutedSealedHeightInBadger, err)
119+
}
120+
121+
if header.Height > lastExecutedSealedHeightInBadger {
122+
// existing executed in pebble is higher than badger, no need to store anything
123+
// why?
124+
// because the migration only copy the last sealed and executed block from badger to pebble,
125+
// if EN is still storing new results in badger, then the existingExecuted in pebble will be the same as
126+
// badger not higher.
127+
// if EN is storing new results in pebble, then the existingExecuted in pebble will be higher than badger,
128+
// in this case, we don't need to update the executed block in pebble.
129+
lg.Info().Msgf("existing executed block %v in pebble is newer than %v in badger, skip update",
130+
header.Height, lastExecutedSealedHeightInBadger)
131+
return nil
132+
}
133+
134+
// otherwise continue to update last executed block in pebble
135+
lg.Info().Msgf("existing executed block %v in pebble is older than %v in badger, update executed block",
136+
header.Height, lastExecutedSealedHeightInBadger,
137+
)
138+
} else if !errors.Is(err, storage.ErrNotFound) {
139+
// exception
140+
return fmt.Errorf("failed to retrieve executed block from pebble: %w", err)
141+
}
142+
143+
// store data to pebble in a batch update
144+
err = pdb.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
145+
if err := pebbleResults.BatchStore(result, batch); err != nil {
146+
return fmt.Errorf("failed to store receipt for block %s: %w", blockID, err)
147+
}
148+
149+
if err := pebbleResults.BatchIndex(blockID, result.ID(), batch); err != nil {
150+
return fmt.Errorf("failed to index result for block %s: %w", blockID, err)
151+
}
152+
153+
if err := pebbleCommits.BatchStore(blockID, commit, batch); err != nil {
154+
return fmt.Errorf("failed to store commit for block %s: %w", blockID, err)
155+
}
156+
157+
// two cases here:
158+
// 1. no executed block in pebble
159+
// in this case: set this block as last executed block
160+
// 2. badger has newer executed block than pebble
161+
// in this case: set this block as last executed block
162+
if err := operation.UpdateExecutedBlock(batch.Writer(), blockID); err != nil {
163+
return fmt.Errorf("failed to update executed block in pebble: %w", err)
164+
}
165+
166+
return nil
167+
})
168+
169+
if err != nil {
170+
return fmt.Errorf("failed to write data to pebble: %w", err)
171+
}
172+
173+
lg.Info().Msgf("migrated last executed and sealed block %v (%v) from badger to pebble",
174+
header.Height, blockID)
175+
176+
return nil
177+
}
178+
179+
func readResultsForBlock(
180+
blockID flow.Identifier,
181+
resultsStore storage.ExecutionResults,
182+
commitsStore storage.Commits,
183+
) (*flow.ExecutionResult, flow.StateCommitment, error) {
184+
result, err := resultsStore.ByBlockID(blockID)
185+
if err != nil {
186+
return nil, flow.DummyStateCommitment, fmt.Errorf("failed to get receipt for block %s: %w", blockID, err)
187+
}
188+
189+
commit, err := commitsStore.ByBlockID(blockID)
190+
if err != nil {
191+
return nil, flow.DummyStateCommitment, fmt.Errorf("failed to get commit for block %s: %w", blockID, err)
192+
}
193+
return result, commit, nil
194+
}
195+
196+
func createStores(db storage.DB) (storage.ExecutionResults, storage.Commits) {
197+
noop := metrics.NewNoopCollector()
198+
results := store.NewExecutionResults(noop, db)
199+
commits := store.NewCommits(noop, db)
200+
return results, commits
201+
}

0 commit comments

Comments
 (0)