Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/0x5459/store minor refactor #930

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -2035,18 +2035,18 @@ var utilSealerSectorsExportFilesCmd = &cli.Command{
<-throttle
}()

for _, dir := range []string{loc.Private.SealedSectorPath, loc.Private.CacheDirPath} {
for _, dir := range []string{loc.Private.SealedFullPath, loc.Private.CacheDirFullPath} {
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if !info.IsDir() && !strings.Contains(info.Name(), ".json") {
destPath := ""
if dir == loc.Private.SealedSectorPath {
destPath = filepath.Join(absPath, loc.Private.SealedSectorURI)
if dir == loc.Private.SealedFullPath {
destPath = filepath.Join(absPath, loc.Private.SealedSubPath)
} else {
destPath = filepath.Join(absPath, loc.Private.CacheDirURI, info.Name())
destPath = filepath.Join(absPath, loc.Private.CacheDirSubPath, info.Name())
}

if reserve {
Expand Down Expand Up @@ -2080,7 +2080,7 @@ var utilSealerSectorsExportFilesCmd = &cli.Command{
}
}
} else {
if err := os.MkdirAll(filepath.Join(absPath, loc.Private.CacheDirURI), 0755); err != nil {
if err := os.MkdirAll(filepath.Join(absPath, loc.Private.CacheDirSubPath), 0755); err != nil {
return err
}
}
Expand Down
49 changes: 34 additions & 15 deletions damocles-manager/cmd/damocles-manager/internal/util_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/dep"
"github.com/ipfs-force-community/damocles/damocles-manager/modules/util"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore"
filestorebuiltin "github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore/builtin"
filestoreplugin "github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore/plugin"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/logging"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore/filestore"
managerplugin "github.com/ipfs-force-community/damocles/manager-plugin"

"github.com/urfave/cli/v2"
)

Expand All @@ -45,6 +48,10 @@ var utilStorageAttachCmd = &cli.Command{
&cli.StringFlag{
Name: "name",
},
&cli.StringFlag{
Name: "plugin-name",
Usage: "the filestore plugin name, the plugin file should exist in 'Plugin.Dir'",
},
&cli.BoolFlag{
Name: "strict",
},
Expand Down Expand Up @@ -80,27 +87,20 @@ var utilStorageAttachCmd = &cli.Command{
}

verbose := cctx.Bool("verbose")
name := cctx.String("name")
strict := cctx.Bool("strict")
name := cctx.String("name")
readOnly := cctx.Bool("read-only")
allowSplitted := cctx.Bool("allow-splitted")
pattern := cctx.String("pattern")
pluginName := cctx.String("plugin-name")

scfg := objstore.DefaultConfig(abs, readOnly)
scfg := filestore.DefaultConfig(abs, readOnly)
scfg.Name = name
scfg.Strict = &strict

store, err := filestore.Open(scfg, false)
if err != nil {
return fmt.Errorf("open file store: %w", err)
}

name = store.Instance(gctx)
logger := Log.With("name", name, "strict", strict, "read-only", readOnly, "splitted", allowSplitted)

cfgExample := struct {
Common struct {
PersistStores []objstore.Config
PersistStores []filestore.Config
}
}{}

Expand All @@ -116,14 +116,15 @@ var utilStorageAttachCmd = &cli.Command{

var indexer core.SectorIndexer
var chainAPI chain.API
var loadedPlugins *managerplugin.LoadedPlugins

stopper, err := dix.New(
gctx,
DepsFromCLICtx(cctx),
dep.Product(),
dix.Override(new(dep.GlobalContext), gctx),
dix.Override(new(dep.ListenAddress), dep.ListenAddress(cctx.String(SealerListenFlag.Name))),
dix.Populate(dep.InvokePopulate, &indexer, &chainAPI),
dix.Populate(dep.InvokePopulate, &indexer, &chainAPI, &loadedPlugins),
)

if err != nil {
Expand All @@ -132,6 +133,24 @@ var utilStorageAttachCmd = &cli.Command{

defer stopper(gctx) // nolint:errcheck

var store filestore.Store
if pluginName == "" {
// use builtin fs filestore
store, err = filestorebuiltin.New(scfg)
if err != nil {
return fmt.Errorf("open builtin file store: %w", err)
}
} else {
// use plugin filestore
store, err = filestoreplugin.OpenPluginFileStore(pluginName, scfg, loadedPlugins)
if err != nil {
return fmt.Errorf("open plugin file store: %w", err)
}
}

name = store.Instance(gctx)
logger := Log.With("name", name, "strict", strict, "read-only", readOnly, "splitted", allowSplitted)

cacheInfo := &cachedInfoForScanning{
capi: chainAPI,
ssizes: make(map[abi.ActorID]abi.SectorSize),
Expand Down Expand Up @@ -408,7 +427,7 @@ var utilStorageFindCmd = &cli.Command{
continue
}

if errors.Is(err, objstore.ErrObjectStoreInstanceNotFound) {
if errors.Is(err, filestore.ErrFileStoreInstanceNotFound) {
iLog.Warn("store instance not found, check your config file")
continue
}
Expand Down
4 changes: 4 additions & 0 deletions damocles-manager/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs-force-community/damocles/damocles-manager/modules"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/extproc/stage"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore"
)

//go:generate go run gen.go -interface=SealerAPI,SealerCliAPI,RandomnessAPI,MinerAPI,WorkerWdPoStAPI
Expand Down Expand Up @@ -83,6 +84,9 @@ type SealerAPI interface {
AllocateUnsealSector(ctx context.Context, spec AllocateSectorSpec) (*SectorUnsealInfo, error)
AchieveUnsealSector(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error)
AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error)

StoreSectorSubPaths(ctx context.Context, storeName string, pathType filestore.PathType, minerID uint64, sectorNumbers []abi.SectorNumber) ([]string, error)
StoreCustomSubPath(ctx context.Context, storeName string, custom string) (string, error)
}

type SealerCliAPI interface {
Expand Down
4 changes: 4 additions & 0 deletions damocles-manager/core/client_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type SealerAPIClient struct {
AllocateUnsealSector func(ctx context.Context, spec AllocateSectorSpec) (*SectorUnsealInfo, error)
AchieveUnsealSector func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error)
AcquireUnsealDest func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error)
StoreUri func(ctx context.Context, storeName, resource []string) (map[string]string, error)
}

var UnavailableSealerAPIClient = SealerAPIClient{
Expand Down Expand Up @@ -108,6 +109,9 @@ var UnavailableSealerAPIClient = SealerAPIClient{
AcquireUnsealDest: func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) {
panic("SealerAPI client unavailable")
},
StoreUri: func(ctx context.Context, storeName, resource []string) (map[string]string, error) {
panic("SealerAPI client unavailable")
},
}

// SealerCliAPIClient is generated client for SealerCliAPI interface.
Expand Down
4 changes: 2 additions & 2 deletions damocles-manager/core/ifaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/filecoin-project/venus/venus-shared/actors/builtin"
gtypes "github.com/filecoin-project/venus/venus-shared/types/gateway"

"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore"
)

type SectorManager interface {
Expand Down Expand Up @@ -57,7 +57,7 @@ type SectorTypedIndexer interface {
type SectorIndexer interface {
Normal() SectorTypedIndexer
Upgrade() SectorTypedIndexer
StoreMgr() objstore.Manager
StoreMgr() filestore.Manager
}

type SectorTracker interface {
Expand Down
12 changes: 6 additions & 6 deletions damocles-manager/core/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ type SectorOnChainInfo = miner.SectorOnChainInfo

type PrivateSectorInfo struct {
Accesses SectorAccessStores
CacheDirPath string
SealedSectorPath string
CacheDirFullPath string
SealedFullPath string

CacheDirURI string
SealedSectorURI string
CacheDirSubPath string
SealedSubPath string
}

func (p PrivateSectorInfo) ToFFI(sector SectorInfo, proofType abi.RegisteredPoStProof) FFIPrivateSectorInfo {
return FFIPrivateSectorInfo{
SectorInfo: sector,
CacheDirPath: p.CacheDirPath,
CacheDirPath: p.CacheDirFullPath,
PoStProofType: proofType,
SealedSectorPath: p.SealedSectorPath,
SealedSectorPath: p.SealedFullPath,
}
}

Expand Down
4 changes: 2 additions & 2 deletions damocles-manager/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
vtypes "github.com/filecoin-project/venus/venus-shared/types"
gtypes "github.com/filecoin-project/venus/venus-shared/types/gateway"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore"
"github.com/ipfs/go-cid"
)

Expand Down Expand Up @@ -346,7 +346,7 @@ type StoreDetailedInfo struct {
ReservedBy []ReservedItem
}

type ReservedItem = objstore.StoreReserved
type ReservedItem = filestore.StoreReserved

type RebuildOptions struct {
PiecesAvailable bool
Expand Down
2 changes: 1 addition & 1 deletion damocles-manager/dep/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Product() dix.Option {
dix.Override(new(core.CommitmentManager), BuildCommitmentManager),
dix.Override(new(messager.API), BuildMessagerClient),
dix.Override(new(chain.API), BuildChainClient),
dix.Override(new(PersistedObjectStoreManager), BuildPersistedFileStoreMgr),
dix.Override(new(PersistedFileStoreManager), BuildPersistedFileStoreMgr),
dix.Override(new(core.SectorIndexer), BuildSectorIndexer),
dix.Override(new(*chain.EventBus), BuildChainEventBus),

Expand Down
64 changes: 33 additions & 31 deletions damocles-manager/dep/sealer_constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/modules/policy"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/confmgr"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore"
filestorebuiltin "github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore/builtin"
filestoreplugin "github.com/ipfs-force-community/damocles/damocles-manager/pkg/filestore/plugin"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/homedir"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/kvstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/market"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/messager"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore/filestore"
objstoreplugin "github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore/plugin"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/piecestore"
)

type (
UnderlyingDB kvstore.DB
OnlineMetaStore kvstore.KVStore
OfflineMetaStore kvstore.KVStore
PersistedObjectStoreManager objstore.Manager
SectorIndexMetaStore kvstore.KVStore
SnapUpMetaStore kvstore.KVStore
ListenAddress string
ProxyAddress string
WorkerMetaStore kvstore.KVStore
ConfDirPath string
CommonMetaStore kvstore.KVStore
UnderlyingDB kvstore.DB
OnlineMetaStore kvstore.KVStore
OfflineMetaStore kvstore.KVStore
PersistedFileStoreManager filestore.Manager
SectorIndexMetaStore kvstore.KVStore
SnapUpMetaStore kvstore.KVStore
ListenAddress string
ProxyAddress string
WorkerMetaStore kvstore.KVStore
ConfDirPath string
CommonMetaStore kvstore.KVStore
)

func BuildLocalSectorManager(scfg *modules.SafeConfig, mapi core.MinerAPI, numAlloc core.SectorNumberAllocator) (core.SectorManager, error) {
Expand Down Expand Up @@ -444,38 +444,40 @@ func BuildSnapUpMetaStore(gctx GlobalContext, db UnderlyingDB) (SnapUpMetaStore,
return db.OpenCollection(gctx, "snapup")
}

func openObjStore(cfg objstore.Config, pluginName string, loadedPlugins *managerplugin.LoadedPlugins) (st objstore.Store, err error) {
func openFileStore(cfg filestore.Config, pluginName string, loadedPlugins *managerplugin.LoadedPlugins) (st filestore.Ext, err error) {
if cfg.Name == "" {
cfg.Name = cfg.Path
}

var innerStore filestore.Store
if pluginName == "" {
// use embed fs objstore
st, err = filestore.Open(cfg, false)
// use embed fs filestore
innerStore, err = filestorebuiltin.New(cfg)
} else {
// use plugin objstore
st, err = objstoreplugin.OpenPluginObjStore(pluginName, cfg, loadedPlugins)
// use plugin filestore
innerStore, err = filestoreplugin.OpenPluginFileStore(pluginName, cfg, loadedPlugins)
}

if err != nil {
return
}
st = filestore.NewExt(innerStore)
log.Infow("store constructed", "type", st.Type(), "ver", st.Version(), "instance", st.Instance(context.Background()))

return
}

func BuildPersistedFileStoreMgr(scfg *modules.SafeConfig, globalStore CommonMetaStore, loadedPlugins *managerplugin.LoadedPlugins) (PersistedObjectStoreManager, error) {
func BuildPersistedFileStoreMgr(scfg *modules.SafeConfig, globalStore CommonMetaStore, loadedPlugins *managerplugin.LoadedPlugins) (PersistedFileStoreManager, error) {
persistCfg := scfg.MustCommonConfig().GetPersistStores()

stores := make([]objstore.Store, 0, len(persistCfg))
policy := map[string]objstore.StoreSelectPolicy{}
stores := make([]filestore.Ext, 0, len(persistCfg))
policy := map[string]filestore.StoreSelectPolicy{}
for pi := range persistCfg {
// For compatibility with v0.5
if persistCfg[pi].PluginName == "" && persistCfg[pi].Plugin != "" {
persistCfg[pi].PluginName = persistCfg[pi].Plugin
}
st, err := openObjStore(persistCfg[pi].Config, persistCfg[pi].PluginName, loadedPlugins)
st, err := openFileStore(persistCfg[pi].Config, persistCfg[pi].PluginName, loadedPlugins)
if err != nil {
return nil, fmt.Errorf("construct #%d persist store: %w", pi, err)
}
Expand All @@ -486,13 +488,13 @@ func BuildPersistedFileStoreMgr(scfg *modules.SafeConfig, globalStore CommonMeta

wrapped, err := kvstore.NewWrappedKVStore([]byte("objstore"), globalStore)
if err != nil {
return nil, fmt.Errorf("construct wrapped kv store for objstore: %w", err)
return nil, fmt.Errorf("construct wrapped kv store for filestore: %w", err)
}

return objstore.NewStoreManager(stores, policy, wrapped)
return filestore.NewStoreManager(stores, policy, wrapped)
}

func BuildSectorIndexer(storeMgr PersistedObjectStoreManager, kv SectorIndexMetaStore) (core.SectorIndexer, error) {
func BuildSectorIndexer(storeMgr PersistedFileStoreManager, kv SectorIndexMetaStore) (core.SectorIndexer, error) {
upgrade, err := kvstore.NewWrappedKVStore([]byte("sector-upgrade"), kv)
if err != nil {
return nil, fmt.Errorf("wrap kvstore for sector-upgrade: %w", err)
Expand All @@ -501,7 +503,7 @@ func BuildSectorIndexer(storeMgr PersistedObjectStoreManager, kv SectorIndexMeta
return sectors.NewIndexer(storeMgr, kv, upgrade)
}

func BuildSectorProving(tracker core.SectorTracker, state core.SectorStateManager, storeMgr PersistedObjectStoreManager, prover core.Prover, capi chain.API, scfg *modules.SafeConfig) (core.SectorProving, error) {
func BuildSectorProving(tracker core.SectorTracker, state core.SectorStateManager, storeMgr PersistedFileStoreManager, prover core.Prover, capi chain.API, scfg *modules.SafeConfig) (core.SectorProving, error) {
return sectors.NewProving(tracker, state, storeMgr, prover, capi, scfg.MustCommonConfig().Proving)
}

Expand Down Expand Up @@ -568,10 +570,10 @@ func BuildMarketAPIRelated(gctx GlobalContext, lc fx.Lifecycle, scfg *modules.Sa
pieceStoreCfg := scfg.Common.PieceStores
scfg.Unlock()

stores := make([]objstore.Store, 0, len(pieceStoreCfg))
stores := make([]filestore.Ext, 0, len(pieceStoreCfg))
for pi := range pieceStoreCfg {
pcfg := pieceStoreCfg[pi]
cfg := objstore.Config{
cfg := filestore.Config{
Name: pcfg.Name,
Path: pcfg.Path,
Meta: pcfg.Meta,
Expand All @@ -581,7 +583,7 @@ func BuildMarketAPIRelated(gctx GlobalContext, lc fx.Lifecycle, scfg *modules.Sa
if pcfg.PluginName == "" && pcfg.Plugin != "" {
pcfg.PluginName = pcfg.Plugin
}
st, err := openObjStore(cfg, pcfg.PluginName, loadedPlugins)
st, err := openFileStore(cfg, pcfg.PluginName, loadedPlugins)
if err != nil {
return MarketAPIRelatedComponents{}, fmt.Errorf("construct #%d piece store: %w", pi, err)
}
Expand Down Expand Up @@ -688,7 +690,7 @@ func BuildWorkerManager(meta WorkerMetaStore) (core.WorkerManager, error) {
return worker.NewManager(meta)
}

func BuildProxiedSectorIndex(client *core.SealerCliAPIClient, storeMgr PersistedObjectStoreManager) (core.SectorIndexer, error) {
func BuildProxiedSectorIndex(client *core.SealerCliAPIClient, storeMgr PersistedFileStoreManager) (core.SectorIndexer, error) {
log.Debug("build proxied sector indexer")
return sectors.NewProxiedIndexer(client, storeMgr)
}
Expand Down
Loading