Skip to content

Commit 24ff37a

Browse files
committed
[staking] reduce candidates bucket indexer storage
1 parent a0a08d4 commit 24ff37a

File tree

7 files changed

+532
-0
lines changed

7 files changed

+532
-0
lines changed

action/protocol/staking/read_state.go

+10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ import (
1616
"github.com/iotexproject/iotex-core/v2/state"
1717
)
1818

19+
func ToIoTeXTypesVoteBucketList(sr protocol.StateReader, buckets []*VoteBucket) (*iotextypes.VoteBucketList, error) {
20+
// TODO: change toIoTeXTypesVoteBucketList() to this name
21+
return nil, nil
22+
}
23+
1924
func toIoTeXTypesVoteBucketList(sr protocol.StateReader, buckets []*VoteBucket) (*iotextypes.VoteBucketList, error) {
2025
esr := NewEndorsementStateReader(sr)
2126
res := iotextypes.VoteBucketList{
@@ -98,6 +103,11 @@ func toIoTeXTypesCandidateV2(csr CandidateStateReader, cand *Candidate, featureC
98103
return c, nil
99104
}
100105

106+
func ToIoTeXTypesCandidateListV2(csr CandidateStateReader, candidates CandidateList, featureCtx protocol.FeatureCtx) (*iotextypes.CandidateListV2, error) {
107+
// TODO: change toIoTeXTypesCandidateListV2 to this name
108+
return nil, nil
109+
}
110+
101111
func toIoTeXTypesCandidateListV2(csr CandidateStateReader, candidates CandidateList, featureCtx protocol.FeatureCtx) (*iotextypes.CandidateListV2, error) {
102112
res := iotextypes.CandidateListV2{
103113
Candidates: make([]*iotextypes.CandidateV2, 0, len(candidates)),
+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) 2025 IoTeX Foundation
2+
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
3+
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
4+
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.
5+
6+
package stakingindex
7+
8+
import (
9+
"google.golang.org/protobuf/proto"
10+
11+
"github.com/iotexproject/iotex-core/v2/blockindex/nativestaking/stakingpb"
12+
)
13+
14+
type bucketList struct {
15+
maxBucket uint64
16+
deleted []uint64
17+
}
18+
19+
func (bl *bucketList) serialize() ([]byte, error) {
20+
return proto.Marshal(bl.toProto())
21+
}
22+
23+
func (bl *bucketList) toProto() *stakingpb.BucketList {
24+
return &stakingpb.BucketList{
25+
MaxBucket: bl.maxBucket,
26+
Deleted: bl.deleted,
27+
}
28+
}
29+
30+
func fromProtoBucketList(pb *stakingpb.BucketList) *bucketList {
31+
return &bucketList{
32+
maxBucket: pb.MaxBucket,
33+
deleted: pb.Deleted,
34+
}
35+
}
36+
37+
func deserializeBucketList(buf []byte) (*bucketList, error) {
38+
pb := stakingpb.BucketList{}
39+
if err := proto.Unmarshal(buf, &pb); err != nil {
40+
return nil, err
41+
}
42+
return fromProtoBucketList(&pb), nil
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
// Copyright (c) 2025 IoTeX Foundation
2+
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
3+
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
4+
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.
5+
6+
package stakingindex
7+
8+
import (
9+
"context"
10+
"fmt"
11+
12+
"github.com/pkg/errors"
13+
"google.golang.org/protobuf/proto"
14+
15+
"github.com/iotexproject/iotex-address/address"
16+
17+
"github.com/iotexproject/iotex-core/v2/action/protocol"
18+
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
19+
"github.com/iotexproject/iotex-core/v2/blockchain/block"
20+
"github.com/iotexproject/iotex-core/v2/db"
21+
"github.com/iotexproject/iotex-core/v2/db/batch"
22+
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
23+
)
24+
25+
const (
26+
// StakingCandidatesNamespace is a namespace to store candidates with epoch start height
27+
StakingCandidatesNamespace = "stakingCandidates"
28+
// StakingBucketsNamespace is a namespace to store vote buckets with epoch start height
29+
StakingBucketsNamespace = "stakingBuckets"
30+
)
31+
32+
var (
33+
_currentHeightKey = []byte("crh")
34+
_bucketListKey = []byte("blt")
35+
)
36+
37+
// CandBucketsIndexer is an indexer to store buckets and candidates by given height
38+
type CandBucketsIndexer struct {
39+
currentHeight uint64
40+
deleteList *bucketList
41+
kvBase db.KVStore
42+
kvVersioned db.KvVersioned
43+
stateReader protocol.StateReader
44+
}
45+
46+
// NewCandBucketsIndexer creates a new indexer
47+
func NewCandBucketsIndexer(kv db.KvVersioned) (*CandBucketsIndexer, error) {
48+
if kv == nil {
49+
return nil, errors.New("kvStore is nil")
50+
}
51+
return &CandBucketsIndexer{
52+
kvBase: kv.Base(),
53+
kvVersioned: kv,
54+
}, nil
55+
}
56+
57+
// Start starts the indexer
58+
func (cbi *CandBucketsIndexer) Start(ctx context.Context) error {
59+
if err := cbi.kvVersioned.Start(ctx); err != nil {
60+
return err
61+
}
62+
ret, err := cbi.kvBase.Get(db.MetadataNamespace, _currentHeightKey)
63+
switch errors.Cause(err) {
64+
case nil:
65+
cbi.currentHeight = byteutil.BytesToUint64BigEndian(ret)
66+
case db.ErrNotExist:
67+
cbi.currentHeight = 0
68+
default:
69+
return err
70+
}
71+
return nil
72+
}
73+
74+
// Stop stops the indexer
75+
func (cbi *CandBucketsIndexer) Stop(ctx context.Context) error {
76+
return cbi.kvVersioned.Stop(ctx)
77+
}
78+
79+
func (cbi *CandBucketsIndexer) candBucketFromBlock(blk *block.Block) (staking.CandidateList, []*staking.VoteBucket, []uint64, error) {
80+
// TODO: extract affected buckets and candidates from tx in block
81+
return nil, nil, nil, nil
82+
}
83+
84+
func (cbi *CandBucketsIndexer) PutBlock(ctx context.Context, blk *block.Block) error {
85+
cands, changedBuckets, deletedBuckets, err := cbi.candBucketFromBlock(blk)
86+
if err != nil {
87+
return err
88+
}
89+
csr, err := staking.ConstructBaseView(cbi.stateReader)
90+
if err != nil {
91+
return err
92+
}
93+
candidateList, err := staking.ToIoTeXTypesCandidateListV2(csr, cands, protocol.MustGetFeatureCtx(ctx))
94+
if err != nil {
95+
return err
96+
}
97+
bucketList, err := staking.ToIoTeXTypesVoteBucketList(cbi.stateReader, changedBuckets)
98+
if err != nil {
99+
return err
100+
}
101+
b := batch.NewBatch()
102+
for _, c := range candidateList.Candidates {
103+
addr, err := address.FromString(c.Id)
104+
if err != nil {
105+
return err
106+
}
107+
cand, err := proto.Marshal(c)
108+
if err != nil {
109+
return err
110+
}
111+
b.Put(StakingCandidatesNamespace, addr.Bytes(), cand, fmt.Sprintf("failed to write cand = %x\n", cand))
112+
}
113+
for _, bucket := range bucketList.Buckets {
114+
cb, err := proto.Marshal(bucket)
115+
if err != nil {
116+
return err
117+
}
118+
b.Put(StakingBucketsNamespace, byteutil.Uint64ToBytesBigEndian(bucket.Index), cb, fmt.Sprintf("failed to write bucket = %x\n", cb))
119+
}
120+
// update deleted bucket list
121+
var (
122+
newBucket uint64
123+
h = blk.Height()
124+
)
125+
for _, v := range changedBuckets {
126+
if v.Index > cbi.deleteList.maxBucket {
127+
newBucket = v.Index
128+
break
129+
}
130+
}
131+
if newBucket > 0 || len(deletedBuckets) > 0 {
132+
if newBucket > 0 {
133+
cbi.deleteList.maxBucket = newBucket
134+
}
135+
if len(deletedBuckets) > 0 {
136+
cbi.deleteList.deleted = append(cbi.deleteList.deleted, deletedBuckets...)
137+
}
138+
buf, err := cbi.deleteList.serialize()
139+
if err != nil {
140+
return err
141+
}
142+
b.Put(db.MetadataNamespace, _bucketListKey, buf, fmt.Sprintf("failed to write deleted bucket list = %d\n", h))
143+
}
144+
// update height
145+
b.Put(db.MetadataNamespace, _currentHeightKey, byteutil.Uint64ToBytesBigEndian(h), fmt.Sprintf("failed to write height = %d\n", h))
146+
return cbi.kvVersioned.SetVersion(h).WriteBatch(b)
147+
}
148+
149+
/*
150+
// PutCandidates puts candidates into indexer
151+
func (cbi *CandBucketsIndexer) PutCandidates(height uint64, candidates *iotextypes.CandidateListV2) error {
152+
candidatesBytes, err := proto.Marshal(candidates)
153+
if err != nil {
154+
return err
155+
}
156+
157+
if err := cbi.putToIndexer(StakingCandidatesNamespace, height, candidatesBytes); err != nil {
158+
return err
159+
}
160+
cbi.currentHeight = height
161+
return nil
162+
}
163+
164+
// GetCandidates gets candidates from indexer given epoch start height
165+
func (cbi *CandBucketsIndexer) GetCandidates(height uint64, offset, limit uint32) (*iotextypes.CandidateListV2, uint64, error) {
166+
if height > cbi.currentHeight {
167+
height = cbi.currentHeight
168+
}
169+
candidateList := &iotextypes.CandidateListV2{}
170+
ret, err := getFromIndexer(cbi.kvVersioned, StakingCandidatesNamespace, height)
171+
cause := errors.Cause(err)
172+
if cause == db.ErrNotExist || cause == db.ErrBucketNotExist {
173+
return candidateList, height, nil
174+
}
175+
if err != nil {
176+
return nil, height, err
177+
}
178+
if err := proto.Unmarshal(ret, candidateList); err != nil {
179+
return nil, height, err
180+
}
181+
length := uint32(len(candidateList.Candidates))
182+
if offset >= length {
183+
return &iotextypes.CandidateListV2{}, height, nil
184+
}
185+
end := offset + limit
186+
if end > uint32(len(candidateList.Candidates)) {
187+
end = uint32(len(candidateList.Candidates))
188+
}
189+
candidateList.Candidates = candidateList.Candidates[offset:end]
190+
// fill id if it's empty for backward compatibility
191+
for i := range candidateList.Candidates {
192+
if candidateList.Candidates[i].Id == "" {
193+
candidateList.Candidates[i].Id = candidateList.Candidates[i].OwnerAddress
194+
}
195+
}
196+
return candidateList, height, nil
197+
}
198+
199+
// PutBuckets puts vote buckets into indexer
200+
func (cbi *CandBucketsIndexer) PutBuckets(height uint64, buckets *iotextypes.VoteBucketList) error {
201+
bucketsBytes, err := proto.Marshal(buckets)
202+
if err != nil {
203+
return err
204+
}
205+
206+
if err := cbi.putToIndexer(StakingBucketsNamespace, height, bucketsBytes); err != nil {
207+
return err
208+
}
209+
cbi.latestBucketsHeight = height
210+
return nil
211+
}
212+
213+
// GetBuckets gets vote buckets from indexer given epoch start height
214+
func (cbi *CandBucketsIndexer) GetBuckets(height uint64, offset, limit uint32) (*iotextypes.VoteBucketList, uint64, error) {
215+
if height > cbi.latestBucketsHeight {
216+
height = cbi.latestBucketsHeight
217+
}
218+
buckets := &iotextypes.VoteBucketList{}
219+
ret, err := getFromIndexer(cbi.kvVersioned, StakingBucketsNamespace, height)
220+
cause := errors.Cause(err)
221+
if cause == db.ErrNotExist || cause == db.ErrBucketNotExist {
222+
return buckets, height, nil
223+
}
224+
if err != nil {
225+
return nil, height, err
226+
}
227+
if err := proto.Unmarshal(ret, buckets); err != nil {
228+
return nil, height, err
229+
}
230+
length := uint32(len(buckets.Buckets))
231+
if offset >= length {
232+
return &iotextypes.VoteBucketList{}, height, nil
233+
}
234+
end := offset + limit
235+
if end > uint32(len(buckets.Buckets)) {
236+
end = uint32(len(buckets.Buckets))
237+
}
238+
buckets.Buckets = buckets.Buckets[offset:end]
239+
return buckets, height, nil
240+
}
241+
242+
func (cbi *CandBucketsIndexer) putToIndexer(ns string, height uint64, data []byte) error {
243+
var (
244+
h = hash.Hash160b(data)
245+
dataExist bool
246+
heightKey []byte
247+
latestHash []byte
248+
)
249+
switch ns {
250+
case StakingCandidatesNamespace:
251+
dataExist = (h == cbi.latestCandidatesHash)
252+
heightKey = _candHeightKey
253+
latestHash = _latestCandidatesHash
254+
case StakingBucketsNamespace:
255+
dataExist = (h == cbi.latestBucketsHash)
256+
heightKey = _bucketHeightKey
257+
latestHash = _latestBucketsHash
258+
default:
259+
return ErrTypeAssertion
260+
}
261+
262+
heightBytes := byteutil.Uint64ToBytesBigEndian(height)
263+
if dataExist {
264+
// same bytes already exist, do nothing
265+
return cbi.kvVersioned.Put(StakingMetaNamespace, heightKey, heightBytes)
266+
}
267+
268+
// update latest height
269+
b := batch.NewBatch()
270+
b.Put(ns, heightBytes, data, "failed to write data bytes")
271+
b.Put(StakingMetaNamespace, heightKey, heightBytes, "failed to update indexer height")
272+
b.Put(StakingMetaNamespace, latestHash, h[:], "failed to update latest hash")
273+
if err := cbi.kvVersioned.WriteBatch(b); err != nil {
274+
return err
275+
}
276+
// update latest hash
277+
if ns == StakingCandidatesNamespace {
278+
cbi.latestCandidatesHash = h
279+
} else {
280+
cbi.latestBucketsHash = h
281+
}
282+
return nil
283+
}
284+
285+
func getFromIndexer(kv db.KVStoreForRangeIndex, ns string, height uint64) ([]byte, error) {
286+
b, err := kv.Get(ns, byteutil.Uint64ToBytesBigEndian(height))
287+
switch errors.Cause(err) {
288+
case nil:
289+
return b, nil
290+
case db.ErrNotExist:
291+
// height does not exist, fallback to previous height
292+
return kv.SeekPrev([]byte(ns), height)
293+
default:
294+
return nil, err
295+
}
296+
}
297+
*/

0 commit comments

Comments
 (0)