Skip to content

Commit 23f26f2

Browse files
committed
[factory] add parent to workingset
1 parent 1a4069f commit 23f26f2

File tree

4 files changed

+137
-29
lines changed

4 files changed

+137
-29
lines changed

state/factory/factory.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,11 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
269269
if err != nil {
270270
return nil, err
271271
}
272-
return sf.createSfWorkingSet(ctx, height, store)
272+
var parent *workingSet
273+
if height > 0 {
274+
parent = getWorkingSetByHeight(sf.workingsets, height-1)
275+
}
276+
return sf.createSfWorkingSet(ctx, height, store, parent)
273277
}
274278

275279
func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) {
@@ -290,10 +294,10 @@ func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*w
290294
if err != nil {
291295
return nil, err
292296
}
293-
return sf.createSfWorkingSet(ctx, height, store)
297+
return sf.createSfWorkingSet(ctx, height, store, nil)
294298
}
295299

296-
func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) {
300+
func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore, parent *workingSet) (*workingSet, error) {
297301
if err := store.Start(ctx); err != nil {
298302
return nil, err
299303
}
@@ -308,7 +312,7 @@ func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store
308312
}
309313
}
310314
}
311-
return newWorkingSet(height, store), nil
315+
return newWorkingSet(height, store, parent), nil
312316
}
313317

314318
func (sf *factory) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
@@ -357,6 +361,7 @@ func (sf *factory) Validate(ctx context.Context, blk *block.Block) error {
357361
return errors.Wrap(err, "failed to validate block with workingset in factory")
358362
}
359363
sf.workingsets.Add(key, ws)
364+
sf.workingsets.Add(ws.height, ws)
360365
}
361366
receipts, err := ws.Receipts()
362367
if err != nil {
@@ -399,6 +404,7 @@ func (sf *factory) NewBlockBuilder(
399404
blkCtx := protocol.MustGetBlockCtx(ctx)
400405
key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String())
401406
sf.workingsets.Add(key, ws)
407+
sf.workingsets.Add(ws.height, ws)
402408
return blkBuilder, nil
403409
}
404410

@@ -434,19 +440,35 @@ func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64, preact
434440
}
435441

436442
// PutBlock persists all changes in RunActions() into the DB
437-
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
443+
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) (err error) {
438444
timer := sf.timerFactory.NewTimer("Commit")
439-
defer timer.End()
445+
var (
446+
ws *workingSet
447+
isExist bool
448+
)
449+
defer func() {
450+
timer.End()
451+
if err != nil {
452+
// abandon current workingset, and all pending workingsets beyond current height
453+
ws.abandon()
454+
h, _ := ws.Height()
455+
abandonWorkingSets(sf.workingsets, h)
456+
}
457+
}()
440458
producer := blk.PublicKey().Address()
441459
if producer == nil {
442460
return errors.New("failed to get address")
443461
}
444462
ctx = protocol.WithRegistry(ctx, sf.registry)
445463
key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress())
446-
ws, isExist, err := sf.getFromWorkingSets(ctx, key)
464+
ws, isExist, err = sf.getFromWorkingSets(ctx, key)
447465
if err != nil {
448-
return err
466+
return
449467
}
468+
if err = ws.verifyParent(); err != nil {
469+
return
470+
}
471+
ws.detachParent()
450472
if !isExist {
451473
// regenerate workingset
452474
if !sf.skipBlockValidationOnPut {
@@ -456,14 +478,14 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
456478
}
457479
if err != nil {
458480
log.L().Error("Failed to update state.", zap.Error(err))
459-
return err
481+
return
460482
}
461483
}
462484
sf.mutex.Lock()
463485
defer sf.mutex.Unlock()
464486
receipts, err := ws.Receipts()
465487
if err != nil {
466-
return err
488+
return
467489
}
468490
blk.Receipts = receipts
469491
h, _ := ws.Height()
@@ -475,18 +497,17 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
475497
)
476498
}
477499

478-
if err := ws.Commit(ctx); err != nil {
479-
return err
500+
if err = ws.Commit(ctx); err != nil {
501+
return
480502
}
481503
rh, err := sf.dao.Get(ArchiveTrieNamespace, []byte(ArchiveTrieRootKey))
482504
if err != nil {
483-
return err
505+
return
484506
}
485-
if err := sf.twoLayerTrie.SetRootHash(rh); err != nil {
486-
return err
507+
if err = sf.twoLayerTrie.SetRootHash(rh); err != nil {
508+
return
487509
}
488510
sf.currentChainHeight = h
489-
490511
return nil
491512
}
492513

state/factory/statedb.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,11 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS
197197
if err := store.Start(ctx); err != nil {
198198
return nil, err
199199
}
200-
201-
return newWorkingSet(height, store), nil
200+
var parent *workingSet
201+
if height > 0 {
202+
parent = getWorkingSetByHeight(sdb.workingsets, height-1)
203+
}
204+
return newWorkingSet(height, store, parent), nil
202205
}
203206

204207
func (sdb *stateDB) Register(p protocol.Protocol) error {
@@ -217,6 +220,7 @@ func (sdb *stateDB) Validate(ctx context.Context, blk *block.Block) error {
217220
return errors.Wrap(err, "failed to validate block with workingset in statedb")
218221
}
219222
sdb.workingsets.Add(key, ws)
223+
sdb.workingsets.Add(ws.height, ws)
220224
}
221225
receipts, err := ws.Receipts()
222226
if err != nil {
@@ -260,6 +264,7 @@ func (sdb *stateDB) NewBlockBuilder(
260264
blkCtx := protocol.MustGetBlockCtx(ctx)
261265
key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String())
262266
sdb.workingsets.Add(key, ws)
267+
sdb.workingsets.Add(ws.height, ws)
263268
return blkBuilder, nil
264269
}
265270

@@ -287,21 +292,37 @@ func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64, preac
287292
}
288293

289294
// PutBlock persists all changes in RunActions() into the DB
290-
func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
295+
func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) (err error) {
291296
sdb.mutex.Lock()
292297
timer := sdb.timerFactory.NewTimer("Commit")
293298
sdb.mutex.Unlock()
294-
defer timer.End()
299+
var (
300+
ws *workingSet
301+
isExist bool
302+
)
303+
defer func() {
304+
timer.End()
305+
if err != nil {
306+
// abandon current workingset, and all pending workingsets beyond current height
307+
ws.abandon()
308+
h, _ := ws.Height()
309+
abandonWorkingSets(sdb.workingsets, h)
310+
}
311+
}()
295312
producer := blk.PublicKey().Address()
296313
if producer == nil {
297314
return errors.New("failed to get address")
298315
}
299316
ctx = protocol.WithRegistry(ctx, sdb.registry)
300317
key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress())
301-
ws, isExist, err := sdb.getFromWorkingSets(ctx, key)
318+
ws, isExist, err = sdb.getFromWorkingSets(ctx, key)
302319
if err != nil {
303-
return err
320+
return
304321
}
322+
if err = ws.verifyParent(); err != nil {
323+
return
324+
}
325+
ws.detachParent()
305326
if !isExist {
306327
if !sdb.skipBlockValidationOnPut {
307328
err = ws.ValidateBlock(ctx, blk)
@@ -310,14 +331,14 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
310331
}
311332
if err != nil {
312333
log.L().Error("Failed to update state.", zap.Error(err))
313-
return err
334+
return
314335
}
315336
}
316337
sdb.mutex.Lock()
317338
defer sdb.mutex.Unlock()
318339
receipts, err := ws.Receipts()
319340
if err != nil {
320-
return err
341+
return
321342
}
322343
blk.Receipts = receipts
323344
h, _ := ws.Height()
@@ -329,8 +350,8 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
329350
)
330351
}
331352

332-
if err := ws.Commit(ctx); err != nil {
333-
return err
353+
if err = ws.Commit(ctx); err != nil {
354+
return
334355
}
335356
sdb.currentChainHeight = h
336357
return nil
@@ -440,6 +461,6 @@ func (sdb *stateDB) getFromWorkingSets(ctx context.Context, key hash.Hash256) (*
440461
sdb.mutex.RLock()
441462
currHeight := sdb.currentChainHeight
442463
sdb.mutex.RUnlock()
443-
tx, err := sdb.newWorkingSet(ctx, currHeight+1)
444-
return tx, false, err
464+
ws, err := sdb.newWorkingSet(ctx, currHeight+1)
465+
return ws, false, err
445466
}

state/factory/util.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/iotexproject/go-pkgs/bloom"
12+
"github.com/iotexproject/go-pkgs/cache"
1213
"github.com/iotexproject/go-pkgs/crypto"
1314
"github.com/iotexproject/go-pkgs/hash"
1415
"github.com/pkg/errors"
@@ -96,6 +97,28 @@ func generateWorkingSetCacheKey(blkHeader block.Header, producerAddr string) has
9697
return hash.Hash256b(sum)
9798
}
9899

100+
func getWorkingSetByHeight(c cache.LRUCache, h uint64) *workingSet {
101+
if h == 0 {
102+
return nil
103+
}
104+
if data, ok := c.Get(h); ok {
105+
if ws, ok := data.(*workingSet); ok {
106+
return ws
107+
}
108+
}
109+
return nil
110+
}
111+
112+
func abandonWorkingSets(c cache.LRUCache, h uint64) {
113+
for ; ; h++ {
114+
if ws := getWorkingSetByHeight(c, h); ws != nil {
115+
c.Remove(h)
116+
} else {
117+
break
118+
}
119+
}
120+
}
121+
99122
func protocolPreCommit(ctx context.Context, sr protocol.StateManager) error {
100123
if reg, ok := protocol.GetRegistry(ctx); ok {
101124
for _, p := range reg.All() {

state/factory/workingset.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"math/big"
1111
"sort"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/ethereum/go-ethereum/common"
@@ -64,17 +65,20 @@ type (
6465
height uint64
6566
store workingSetStore
6667
finalized bool
68+
abandoned atomic.Bool
6769
dock protocol.Dock
6870
txValidator *protocol.GenericValidator
6971
receipts []*action.Receipt
72+
parent *workingSet
7073
}
7174
)
7275

73-
func newWorkingSet(height uint64, store workingSetStore) *workingSet {
76+
func newWorkingSet(height uint64, store workingSetStore, parent *workingSet) *workingSet {
7477
ws := &workingSet{
7578
height: height,
7679
store: store,
7780
dock: protocol.NewDock(),
81+
parent: parent,
7882
}
7983
ws.txValidator = protocol.NewGenericValidator(ws, accountutil.AccountState)
8084
return ws
@@ -114,6 +118,26 @@ func (ws *workingSet) validate(ctx context.Context) error {
114118
return nil
115119
}
116120

121+
func (ws *workingSet) isAbandoned() bool {
122+
return ws.abandoned.Load()
123+
}
124+
125+
func (ws *workingSet) abandon() {
126+
ws.abandoned.Store(true)
127+
}
128+
129+
func (ws *workingSet) verifyParent() error {
130+
if ws.parent != nil && ws.parent.isAbandoned() {
131+
ws.abandon()
132+
return errors.New("workingset abandoned")
133+
}
134+
return nil
135+
}
136+
137+
func (ws *workingSet) detachParent() {
138+
ws.parent = nil
139+
}
140+
117141
func withActionCtx(ctx context.Context, selp *action.SealedEnvelope) (context.Context, error) {
118142
var actionCtx protocol.ActionCtx
119143
var err error
@@ -292,8 +316,15 @@ func (ws *workingSet) freshAccountConversion(ctx context.Context, actCtx *protoc
292316
return nil
293317
}
294318

319+
func (ws *workingSet) getDirty(ns string, key []byte) ([]byte, bool) {
320+
return ws.store.GetDirty(ns, key)
321+
}
322+
295323
// Commit persists all changes in RunActions() into the DB
296324
func (ws *workingSet) Commit(ctx context.Context) error {
325+
if err := ws.verifyParent(); err != nil {
326+
return err
327+
}
297328
if err := protocolPreCommit(ctx, ws); err != nil {
298329
return err
299330
}
@@ -318,6 +349,14 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64
318349
if cfg.Keys != nil {
319350
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
320351
}
352+
if ws.parent != nil {
353+
if value, dirty := ws.getDirty(cfg.Namespace, cfg.Key); dirty {
354+
return ws.height, state.Deserialize(s, value)
355+
}
356+
if value, dirty := ws.parent.getDirty(cfg.Namespace, cfg.Key); dirty {
357+
return ws.height, state.Deserialize(s, value)
358+
}
359+
}
321360
value, err := ws.store.Get(cfg.Namespace, cfg.Key)
322361
if err != nil {
323362
return ws.height, err
@@ -333,6 +372,7 @@ func (ws *workingSet) States(opts ...protocol.StateOption) (uint64, state.Iterat
333372
if cfg.Key != nil {
334373
return 0, nil, errors.Wrap(ErrNotSupported, "Read states with key option has not been implemented yet")
335374
}
375+
// TODO: check parent
336376
keys, values, err := ws.store.States(cfg.Namespace, cfg.Keys)
337377
if err != nil {
338378
return 0, nil, err
@@ -480,6 +520,9 @@ func (ws *workingSet) Process(ctx context.Context, actions []*action.SealedEnvel
480520
}
481521

482522
func (ws *workingSet) processWithCorrectOrder(ctx context.Context, actions []*action.SealedEnvelope) error {
523+
if err := ws.verifyParent(); err != nil {
524+
return err
525+
}
483526
if err := ws.validate(ctx); err != nil {
484527
return err
485528
}

0 commit comments

Comments
 (0)