Skip to content

Commit

Permalink
Merge pull request #1533 from 0chain/feat/recover-trie
Browse files Browse the repository at this point in the history
Recover trie
  • Loading branch information
dabasov authored Feb 1, 2025
2 parents 06767cc + 2d21861 commit d150b9b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 0 deletions.
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobber/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
httpsKeyFile string
httpsCertFile string
hostUrl string
recoverTrie bool
)

func init() {
Expand All @@ -45,6 +46,7 @@ func init() {
flag.StringVar(&hostUrl, "hosturl", "", "register url on blockchain instead of [schema://hostname+port] if it has value")

flag.IntVar(&grpcPort, "grpc_port", 0, "grpc_port")
flag.BoolVar(&recoverTrie, "recover_trie", false, "recover_trie")
}

func parseFlags() {
Expand Down
6 changes: 6 additions & 0 deletions code/go/0chain.net/blobber/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
Expand Down Expand Up @@ -58,6 +59,11 @@ func main() {
panic(err)
}

if recoverTrie {
logging.Logger.Info("Recovering trie")
allocation.RecoverTrie()
}

// todo: activate this when gRPC functionalities are implemented
// go startGRPCServer()

Expand Down
53 changes: 53 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import (
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/common/core/encryption"
"github.com/0chain/common/core/util/wmpt"
"go.uber.org/zap"
"gorm.io/gorm/clause"

"gorm.io/gorm"
Expand Down Expand Up @@ -357,3 +361,52 @@ type ReadPoolRedeem struct {
PoolID string `json:"pool_id"` // read pool ID
Balance int64 `json:"balance"` // balance reduction
}

func (a *Allocation) recoverTrie() error {

trie := wmpt.New(nil, datastore.GetBlockStore())
//fetch all the files of the allocation to rebuild the trie
var offsetPath string
for {
var (
pRefs *[]reference.PaginatedRef
err error
)
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
pRefs, _, offsetPath, err = reference.GetRefs(ctx, a.ID, "/", offsetPath, reference.FILE, 0, 100)
return err
})
if err != nil {
logging.Logger.Error("recover_trie_fetch_refs", zap.Error(err))
return err
}
if len(*pRefs) == 0 {
break
}
trie.SaveRoot()
for _, ref := range *pRefs {
decodedKey, _ := hex.DecodeString(ref.LookupHash)
decodedValue, _ := hex.DecodeString(ref.FileMetaHash)
err = trie.Update(decodedKey, decodedValue, uint64(ref.NumBlocks))
if err != nil {
logging.Logger.Error("recover_trie_update", zap.Error(err))
return err
}
}
batcher, err := trie.Commit(filestore.COLLAPSE_DEPTH)
if err != nil {
logging.Logger.Error("recover_trie_commit", zap.Error(err))
return err
}
err = batcher.Commit(true)
if err != nil {
logging.Logger.Error("recover_trie_batcher_commit", zap.Error(err))
return err
}
_ = trie.DeleteNodes()
if len(*pRefs) < 100 {
break
}
}
return nil
}
31 changes: 31 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,34 @@ func deleteAllocation(ctx context.Context, a *Allocation) (err error) {
a.ID).Error
return err
}

func RecoverTrie() {
var (
allocs []*Allocation
err error
offset int64
)

for {
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
allocs, err = Repo.GetAllocations(ctx, offset)
return err
})
if err != nil {
logging.Logger.Error("recover_trie_fetch_alloc", zap.Error(err))
return
}
if len(allocs) == 0 {
return
}
offset += int64(len(allocs))
// recover trie of each allocation
for _, a := range allocs {
logging.Logger.Info("recover_trie", zap.String("allocation_id", a.ID))
err = a.recoverTrie()
if err != nil {
logging.Logger.Error("recover_trie", zap.Error(err))
}
}
}
}

0 comments on commit d150b9b

Please sign in to comment.