Skip to content

Commit 382457c

Browse files
committed
WIP
1 parent d7c5033 commit 382457c

File tree

19 files changed

+165
-58
lines changed

19 files changed

+165
-58
lines changed

damocles-manager/core/api.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ type SealerAPI interface {
8484
AchieveUnsealSector(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error)
8585
AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error)
8686

87-
StoreUri(ctx context.Context, storeName string, resource string) (string, error)
87+
StoreUri(ctx context.Context, storeName, resource string, minerID uint64) (string, error)
8888
}
8989

9090
type SealerCliAPI interface {

damocles-manager/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ require (
2727
github.com/golang/mock v1.6.0
2828
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
2929
github.com/hashicorp/go-multierror v1.1.1
30-
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230901024916-a92456f84db2
30+
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230905053730-d7c5033eb6eb
3131
github.com/ipfs-force-community/venus-cluster-assets v0.1.0
3232
github.com/ipfs/boxo v0.10.1
3333
github.com/ipfs/go-cid v0.4.1

damocles-manager/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -572,8 +572,8 @@ github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lTo
572572
github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA=
573573
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
574574
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
575-
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230901024916-a92456f84db2 h1:iarakGdwdvuaGJv8cbV3NHv7RseaeuDD/PzPoeajy60=
576-
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230901024916-a92456f84db2/go.mod h1:me1u2cl7qdxBCZiVL0laDop8uBHDdUwlUNnQ7KkHF64=
575+
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230905053730-d7c5033eb6eb h1:GPWQEuzTnwDjaTeZxzM9sxsMFiHGA10jR3ZNNOoZOnA=
576+
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230905053730-d7c5033eb6eb/go.mod h1:EpGeK7b251iv7L5TnHl1PJGFH4KbliE03ctYt5thy6c=
577577
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4 h1:iu/3irYevdNpdc0B/gRi1vuS3+lRn+6Ro9G0FeBiAfE=
578578
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
579579
github.com/ipfs-force-community/venus-cluster-assets v0.1.0 h1:K/0+OV9Jm7HjSa7O9MAtgfLDIudQYZUTymhJsp8rGXg=

damocles-manager/modules/impl/mock/sealer.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
chainAPI "github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
2121
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
2222
"github.com/ipfs-force-community/damocles/damocles-manager/ver"
23+
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
2324
)
2425

2526
var _ core.SealerAPI = (*Sealer)(nil)
@@ -320,7 +321,10 @@ func (s *Sealer) Version(context.Context) (string, error) {
320321
return ver.VersionStr(), nil
321322
}
322323

323-
func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) {
324+
func (s *Sealer) StoreUri(ctx context.Context, storeName, resource string, minerID uint64) (string, error) {
325+
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
326+
ID: minerID,
327+
})
324328
store, err := s.persistedStoreManager.GetInstance(ctx, storeName)
325329
if err != nil {
326330
return "", err

damocles-manager/modules/impl/sectors/proving.go

+4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/ipfs-force-community/damocles/damocles-manager/modules"
1616
chainAPI "github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
1717
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
18+
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
1819
"github.com/ipfs/go-cid"
1920
)
2021

@@ -45,6 +46,9 @@ type Proving struct {
4546
}
4647

4748
func (p *Proving) SingleProvable(ctx context.Context, postProofType abi.RegisteredPoStProof, sref core.SectorRef, upgrade bool, locator core.SectorLocator, strict, stateCheck bool) error {
49+
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
50+
ID: uint64(sref.ID.Miner),
51+
})
4852
ssize, err := sref.ProofType.SectorSize()
4953
if err != nil {
5054
return fmt.Errorf("get sector size: %w", err)

damocles-manager/modules/impl/sectors/snapup_commit.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
2828
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/messager"
2929
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
30+
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
3031
)
3132

3233
func NewSnapUpCommitter(
@@ -591,8 +592,10 @@ func (h *snapupCommitHandler) cleanupForSector() error {
591592
ID: h.state.ID,
592593
ProofType: h.state.SectorType,
593594
}
594-
595-
privateInfo, err := h.committer.tracker.SinglePrivateInfo(h.committer.ctx, sref, false, nil)
595+
ctx := storeMiner.NewContext(h.committer.ctx, &storeMiner.MinerMeta{
596+
ID: uint64(sref.ID.Miner),
597+
})
598+
privateInfo, err := h.committer.tracker.SinglePrivateInfo(ctx, sref, false, nil)
596599
if err != nil {
597600
return fmt.Errorf("get private info from tracker: %w", err)
598601
}
@@ -613,7 +616,7 @@ func (h *snapupCommitHandler) cleanupForSector() error {
613616

614617
for ti := range cleanupTargets {
615618
storeInstance := cleanupTargets[ti].storeInstance
616-
store, err := h.committer.indexer.StoreMgr().GetInstance(h.committer.ctx, storeInstance)
619+
store, err := h.committer.indexer.StoreMgr().GetInstance(ctx, storeInstance)
617620
if err != nil {
618621
return fmt.Errorf("get store instance %s: %w", storeInstance, err)
619622
}
@@ -623,7 +626,7 @@ func (h *snapupCommitHandler) cleanupForSector() error {
623626
for fi := range fileURIs {
624627
uri := fileURIs[fi]
625628
errwg.Go(func() error {
626-
delErr := store.Del(h.committer.ctx, uri)
629+
delErr := store.Del(ctx, uri)
627630
if delErr == nil {
628631
log.Debugf("CC data cleaned: %s, store: %s", uri, storeInstance)
629632
return nil

damocles-manager/modules/impl/sectors/tracker.go

+4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/ipfs-force-community/damocles/damocles-manager/core"
1212
"github.com/ipfs-force-community/damocles/damocles-manager/modules/util"
1313
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
14+
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
1415
)
1516

1617
var _ core.SectorTracker = (*Tracker)(nil)
@@ -41,6 +42,9 @@ func (t *Tracker) SinglePubToPrivateInfo(ctx context.Context, mid abi.ActorID, s
4142
}
4243

4344
func (t *Tracker) getPrivateInfo(ctx context.Context, sref core.SectorRef, upgrade bool, locator core.SectorLocator) (*sectorStoreInstances, core.PrivateSectorInfo, error) {
45+
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
46+
ID: uint64(sref.ID.Miner),
47+
})
4448
objins, err := t.getObjInstanceForSector(ctx, sref.ID, locator, upgrade)
4549
if err != nil {
4650
return nil, core.PrivateSectorInfo{}, fmt.Errorf("get location for %s: %w", util.FormatSectorID(sref.ID), err)

damocles-manager/modules/sealer/sealer.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/logging"
2525
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
2626
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/piecestore"
27+
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
2728
)
2829

2930
var (
@@ -834,7 +835,10 @@ func (s *Sealer) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceC
834835
return s.unseal.AcquireDest(ctx, sid, pieceCid)
835836
}
836837

837-
func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) {
838+
func (s *Sealer) StoreUri(ctx context.Context, storeName, resource string, minerID uint64) (string, error) {
839+
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
840+
ID: minerID,
841+
})
838842
store, err := s.sectorIdxer.StoreMgr().GetInstance(ctx, storeName)
839843
if err != nil {
840844
return "", err

damocles-manager/modules/sealer/sealer_cli.go

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/kvstore"
2626
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
2727
"github.com/ipfs-force-community/damocles/damocles-manager/ver"
28+
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
2829
)
2930

3031
func (s *Sealer) ListSectors(ctx context.Context, ws core.SectorWorkerState, job core.SectorWorkerJob) ([]*core.SectorState, error) {
@@ -225,6 +226,10 @@ func (s *Sealer) RemoveSector(ctx context.Context, sid abi.SectorID) error {
225226
}
226227
}
227228

229+
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
230+
ID: uint64(sid.Miner),
231+
})
232+
228233
dest := s.sectorIdxer.Normal()
229234
if state.Upgraded {
230235
dest = s.sectorIdxer.Upgrade()

damocles-worker/src/infra/objstore/filestore.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,26 @@ impl ObjectStore for FileStore {
6969
self.instance.clone()
7070
}
7171

72-
fn uri(&self, resource_name: &str) -> ObjResult<String> {
73-
let uri = call_rpc! {
74-
self.rpc=>store_uri(
72+
fn uris(
73+
&self,
74+
miner_id: u64,
75+
resources: Vec<String>,
76+
) -> ObjResult<HashMap<String, String>> {
77+
let uris = call_rpc! {
78+
self.rpc=>store_uris(
7579
self.instance(),
76-
resource_name.to_string(),
80+
miner_id,
81+
resources.to_vec(),
7782
)
7883
}
7984
.map_err(|e| ObjectStoreError::Other(e.1))?;
80-
Ok(self.local_path.join(uri).display().to_string())
85+
86+
Ok(uris
87+
.into_iter()
88+
.map(|(resource, uri)| {
89+
(resource, self.local_path.join(uri).display().to_string())
90+
})
91+
.collect())
8192
}
8293

8394
fn readonly(&self) -> bool {

damocles-worker/src/infra/objstore/mod.rs

+24-3
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,31 @@ pub trait ObjectStore: Send + Sync {
7171
/// for fs-like stores, this should return an abs path.
7272
/// for other stores, this may return a url, or path part of a url.
7373
///
74-
/// the resource value looks like this:
75-
/// - "cache/sc-02-data-tree-r-last.dat"
76-
fn uri(&self, resource: &str) -> ObjResult<String>;
74+
/// the resource value looks like these:
75+
/// - "cache/s-t010001-101/sc-02-data-tree-r-last-0.dat"
76+
/// - "update-cache/s-t010001-101/sc-02-data-tree-r-last-0.dat"
77+
/// - "sealed/s-t010001-101"
78+
/// - "update/s-t010001-101"
79+
fn uris(
80+
&self,
81+
miner_id: u64,
82+
resources: Vec<String>,
83+
) -> ObjResult<HashMap<String, String>>;
7784

7885
/// if this instance is read-only
7986
fn readonly(&self) -> bool;
8087
}
88+
89+
/// Extension methods of Objstore
90+
pub trait ObjectStoreExt {
91+
/// Get a single uri
92+
fn uri(&self, miner_id: u64, resource: &str) -> ObjResult<String>;
93+
}
94+
95+
impl<T: ObjectStore + ?Sized> ObjectStoreExt for T {
96+
fn uri(&self, miner_id: u64, resource: &str) -> ObjResult<String> {
97+
let mut res = self.uris(miner_id, vec![resource.to_string()])?;
98+
res.remove(resource)
99+
.ok_or_else(|| ObjectStoreError::IO(io::ErrorKind::NotFound.into()))
100+
}
101+
}

damocles-worker/src/rpc/sealer/mod.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -617,10 +617,11 @@ pub trait Sealer {
617617
error_reason: String,
618618
) -> Result<()>;
619619

620-
#[rpc(name = "Venus.StoreUri")]
621-
fn store_uri(
620+
#[rpc(name = "Venus.StoreUris")]
621+
fn store_uris(
622622
&self,
623623
store_name: String,
624-
resources: String,
625-
) -> Result<String>;
624+
miner_id: u64,
625+
resources: Vec<String>,
626+
) -> Result<HashMap<String, String>>;
626627
}

damocles-worker/src/sealing/sealing_thread/planner/common/sealing.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
},
3232
},
3333
types::SIZE_32G,
34-
SealProof,
34+
SealProof, objstore::ObjectStoreExt,
3535
};
3636

3737
use super::task::Task;
@@ -582,6 +582,8 @@ pub(crate) fn persist_sector_files(
582582
.map(|fname| cache_dir.join(fname)),
583583
);
584584

585+
let miner_id = sector_id.miner;
586+
585587
let transfer_routes = wanted
586588
.into_iter()
587589
.map(|p| {
@@ -597,7 +599,7 @@ pub(crate) fn persist_sector_files(
597599
dest: TransferItem::Store {
598600
store: ins_name.clone(),
599601
uri: persist_store
600-
.uri(rel_path)
602+
.uri(miner_id, rel_path)
601603
.with_context(|| format!("get uri for {:?}", rel_path))
602604
.perm()?,
603605
resource_name: rel_path.to_string(),

damocles-worker/src/sealing/sealing_thread/planner/snapup.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use super::{
1212
},
1313
plan, PlannerTrait, PLANNER_NAME_SNAPUP,
1414
};
15+
use crate::infra::objstore::ObjectStoreExt;
1516
use crate::logging::{debug, warn};
1617
use crate::rpc::sealer::{
1718
AcquireDealsSpec, AllocateSectorSpec, AllocateSnapUpSpec,
@@ -221,8 +222,9 @@ impl<'t> SnapUp<'t> {
221222
})
222223
.crit()?;
223224

224-
let cache_dir = self.task.cache_dir(sector_id);
225+
let miner_id = sector_id.miner;
225226

227+
let cache_dir = self.task.cache_dir(sector_id);
226228
let cached_file_routes = cached_filenames_for_sector(proof_type.into())
227229
.into_iter()
228230
.map(|fname| {
@@ -242,7 +244,7 @@ impl<'t> SnapUp<'t> {
242244
src: TransferItem::Store {
243245
store: access_instance.clone(),
244246
uri: access_store
245-
.uri(cached_rel)
247+
.uri(miner_id, cached_rel)
246248
.with_context(|| {
247249
format!(
248250
"get uri for cache dir {:?} in {}",
@@ -265,7 +267,7 @@ impl<'t> SnapUp<'t> {
265267
src: TransferItem::Store {
266268
store: access_instance.clone(),
267269
uri: access_store
268-
.uri(sealed_rel)
270+
.uri(miner_id, sealed_rel)
269271
.with_context(|| {
270272
format!(
271273
"get uri for sealed file {:?} in {}",

damocles-worker/src/sealing/sealing_thread/planner/unseal.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use super::{
33
common::{event::Event, sector::State, task::Task},
44
plan, PlannerTrait, PLANNER_NAME_UNSEAL,
55
};
6-
use crate::logging::warn;
6+
use crate::{logging::warn, objstore::ObjectStoreExt};
77
use crate::rpc::sealer::AllocateSectorSpec;
88
use crate::sealing::failure::*;
99
use anyhow::{anyhow, Context, Result};
@@ -178,8 +178,9 @@ impl<'t> Unseal<'t> {
178178
})
179179
.crit()?;
180180

181+
let miner_id = sector_id.miner;
181182
let sealed_path = instance
182-
.uri(sealed_rel)
183+
.uri(miner_id, sealed_rel)
183184
.with_context(|| {
184185
format!(
185186
"get uri for sealed file {:?} in {}",
@@ -188,7 +189,7 @@ impl<'t> Unseal<'t> {
188189
})
189190
.perm()?;
190191
let cache_path = instance
191-
.uri(cache_rel)
192+
.uri(miner_id, cache_rel)
192193
.with_context(|| {
193194
format!(
194195
"get uri for cache file {:?} in {}",
@@ -416,7 +417,7 @@ impl<'t> Unseal<'t> {
416417
),
417418
dest: TransferItem::Store {
418419
store: ins_name.clone(),
419-
uri: access_store.uri(des_path).perm()?,
420+
uri: access_store.uri(sector_id.miner, des_path).perm()?,
420421
resource_name: des_path.to_string(),
421422
},
422423
opt: None,

0 commit comments

Comments
 (0)