Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make shard store as interface #121

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
carindex "github.com/ipld/go-car/v2/index"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"

Expand Down Expand Up @@ -72,7 +70,7 @@ type DAGStore struct {
shards map[shard.Key]*Shard
config Config
indices index.FullIndexRepo
store ds.Datastore
store PersistStore

// TopLevelIndex is the top level (cid -> []shards) index that maps a cid to all the shards that is present in.
TopLevelIndex index.Inverted
Expand Down Expand Up @@ -149,7 +147,7 @@ type Config struct {
TopLevelIndex index.Inverted

// Datastore is the datastore where shard state will be persisted.
Datastore ds.Datastore
Datastore PersistStore

// MountRegistry contains the set of recognized mount types.
MountRegistry *mount.Registry
Expand Down Expand Up @@ -211,12 +209,9 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
// handle the datastore.
if cfg.Datastore == nil {
log.Warnf("no datastore provided; falling back to in-mem datastore; shard state will not survive restarts")
cfg.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) // TODO can probably remove mutex wrap, since access is single-threaded
cfg.Datastore = NewDsPersistStore(dssync.MutexWrap(ds.NewMapDatastore())) // TODO can probably remove mutex wrap, since access is single-threaded
}

// namespace all store operations.
cfg.Datastore = namespace.Wrap(cfg.Datastore, StoreNamespace)

if cfg.MountRegistry == nil {
cfg.MountRegistry = mount.NewRegistry()
}
Expand Down Expand Up @@ -544,7 +539,7 @@ func (d *DAGStore) GC(ctx context.Context) (*GCResult, error) {
func (d *DAGStore) Close() error {
d.cancelFn()
d.wg.Wait()
_ = d.store.Sync(context.TODO(), ds.Key{})
_ = d.store.Close(context.TODO())
return nil
}

Expand All @@ -558,25 +553,22 @@ func (d *DAGStore) queueTask(tsk *task, ch chan<- *task) error {
}

func (d *DAGStore) restoreState() error {
results, err := d.store.Query(d.ctx, query.Query{})
results, err := d.store.List(d.ctx)
if err != nil {
return fmt.Errorf("failed to recover dagstore state from store: %w", err)
}
for {
res, ok := results.NextSync()
if !ok {
return nil
}
s := &Shard{d: d}
if err := s.UnmarshalJSON(res.Value); err != nil {
log.Warnf("failed to recover state of shard %s: %s; skipping", shard.KeyFromString(res.Key), err)
for _, persistedShard := range results {
s, err := fromPersistedShard(d.ctx, d, persistedShard)
if err != nil {
log.Warnf("failed to recover state of shard %s: %s; skipping", shard.KeyFromString(persistedShard.Key), err)
continue
}

log.Debugw("restored shard state on dagstore startup", "shard", s.key, "shard state", s.state, "shard error", s.err,
"shard lazy", s.lazy)
d.shards[s.key] = s
}
return nil
}

// ensureDir checks whether the specified path is a directory, and if not it
Expand Down
32 changes: 16 additions & 16 deletions dagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestRegisterUsingExistingTransient(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: ds,
Datastore: NewDsPersistStore(ds),
})
require.NoError(t, err)

Expand All @@ -67,7 +67,7 @@ func TestRegisterWithaNilResponseChannel(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: ds,
Datastore: NewDsPersistStore(ds),
})
require.NoError(t, err)

Expand Down Expand Up @@ -102,7 +102,7 @@ func TestRegisterCarV1(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: ds,
Datastore: NewDsPersistStore(ds),
})
require.NoError(t, err)

Expand Down Expand Up @@ -137,7 +137,7 @@ func TestRegisterCarV2(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: datastore.NewMapDatastore(),
Datastore: NewDsPersistStore(datastore.NewMapDatastore()),
})
require.NoError(t, err)

Expand Down Expand Up @@ -192,7 +192,7 @@ func TestRegisterConcurrentShards(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: store,
Datastore: NewDsPersistStore(store),
})
require.NoError(t, err)

Expand All @@ -214,7 +214,7 @@ func TestAcquireInexistentShard(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: datastore.NewMapDatastore(),
Datastore: NewDsPersistStore(datastore.NewMapDatastore()),
})
require.NoError(t, err)

Expand All @@ -231,7 +231,7 @@ func TestAcquireAfterRegisterWait(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: datastore.NewMapDatastore(),
Datastore: NewDsPersistStore(datastore.NewMapDatastore()),
})
require.NoError(t, err)

Expand Down Expand Up @@ -307,7 +307,7 @@ func TestRestartRestoresState(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: dir,
Datastore: store,
Datastore: NewDsPersistStore(store),
IndexRepo: idx,
})
require.NoError(t, err)
Expand All @@ -334,7 +334,7 @@ func TestRestartRestoresState(t *testing.T) {
dagst, err = NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: dir,
Datastore: store,
Datastore: NewDsPersistStore(store),
IndexRepo: idx,
})
require.NoError(t, err)
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestRestartResumesRegistration(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: r,
TransientsDir: dir,
Datastore: store,
Datastore: NewDsPersistStore(store),
TraceCh: sink,
})
require.NoError(t, err)
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestRestartResumesRegistration(t *testing.T) {
dagst, err = NewDAGStore(Config{
MountRegistry: r,
TransientsDir: dir,
Datastore: store,
Datastore: NewDsPersistStore(store),
TraceCh: sink,
})
require.NoError(t, err)
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestLazyInitialization(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: dir,
Datastore: store,
Datastore: NewDsPersistStore(store),
TraceCh: sink,
})
require.NoError(t, err)
Expand Down Expand Up @@ -947,7 +947,7 @@ func TestRecoveryOnStart(t *testing.T) {
TransientsDir: dir,
TraceCh: sink,
FailureCh: failures,
Datastore: ds,
Datastore: NewDsPersistStore(ds),
}
dagst, err := NewDAGStore(config)
require.NoError(t, err)
Expand Down Expand Up @@ -1175,7 +1175,7 @@ func TestTransientReusedOnRestart(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: r,
TransientsDir: dir,
Datastore: ds,
Datastore: NewDsPersistStore(ds),
IndexRepo: idx,
})
require.NoError(t, err)
Expand All @@ -1199,7 +1199,7 @@ func TestTransientReusedOnRestart(t *testing.T) {
dagst, err = NewDAGStore(Config{
MountRegistry: r,
TransientsDir: dir,
Datastore: ds,
Datastore: NewDsPersistStore(ds),
IndexRepo: idx,
})
require.NoError(t, err)
Expand Down Expand Up @@ -1232,7 +1232,7 @@ func TestAcquireFailsWhenIndexGone(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: r,
TransientsDir: dir,
Datastore: ds,
Datastore: NewDsPersistStore(ds),
IndexRepo: idx,
})
require.NoError(t, err)
Expand Down
126 changes: 84 additions & 42 deletions shard_persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
)

// PersistedShard is the persistent representation of the Shard.
Expand All @@ -22,13 +24,83 @@ type PersistedShard struct {
Error string `json:"e"`
}

// MarshalJSON returns a serialized representation of the state. It must be
// called with a shard lock (read, at least), such as from inside the event
// loop, as it accesses mutable state.
func (s *Shard) MarshalJSON() ([]byte, error) {
type PersistStore interface {
Save(context.Context, PersistedShard) error
Get(context.Context, shard.Key) (*PersistedShard, error)
List(context.Context) ([]*PersistedShard, error)
Close(context.Context) error
}

var _ PersistStore = (*DsPersistStore)(nil)

type DsPersistStore struct {
store ds.Datastore
}

func NewDsPersistStore(ds ds.Batching) PersistStore {
// namespace all store operations.
return &DsPersistStore{namespace.Wrap(ds, StoreNamespace)}
}

func (dsPersistStore *DsPersistStore) Save(ctx context.Context, ps PersistedShard) error {
psBytes, err := json.Marshal(ps)
if err != nil {
return fmt.Errorf("failed to serialize shard state: %w", err)
}
// assuming that the datastore is namespaced if need be.
k := ds.NewKey(ps.Key)
if err := dsPersistStore.store.Put(ctx, k, psBytes); err != nil {
return fmt.Errorf("failed to put shard state: %w", err)
}
if err := dsPersistStore.store.Sync(ctx, ds.Key{}); err != nil {
return fmt.Errorf("failed to sync shard state to store: %w", err)
}
return nil
}

func (dsPersistStore *DsPersistStore) Get(ctx context.Context, key shard.Key) (*PersistedShard, error) {
k := ds.NewKey(key.String())
shardBytes, err := dsPersistStore.store.Get(ctx, k)
if err != nil {
return nil, fmt.Errorf("unable to get shard of key %s %w", key, err)
}
s := &PersistedShard{}
if err := json.Unmarshal(shardBytes, s); err != nil {
return nil, fmt.Errorf("unable to unmarshal shard %w", err)
}
return s, nil
}

func (dsPersistStore *DsPersistStore) List(ctx context.Context) ([]*PersistedShard, error) {
queryResults, err := dsPersistStore.store.Query(ctx, query.Query{})
if err != nil {
return nil, fmt.Errorf("failed to recover dagstore state from store: %w", err)
}
var result []*PersistedShard
for {
res, ok := queryResults.NextSync()
if !ok {
return result, nil
}
s := &PersistedShard{}
if err := json.Unmarshal(res.Value, s); err != nil {
log.Warnf("failed to load shard %s: %s; skipping", shard.KeyFromString(res.Key), err)
continue
}
result = append(result, s)
}
}

func (dsPersistStore *DsPersistStore) Close(ctx context.Context) error {
return dsPersistStore.store.Sync(ctx, ds.Key{})
}

// persist persists the shard's state into the supplied Datastore. It calls
// MarshalJSON, which requires holding a shard lock to be safe.
func (s *Shard) persist(ctx context.Context, store PersistStore) error {
u, err := s.d.mounts.Represent(s.mount)
if err != nil {
return nil, fmt.Errorf("failed to encode mount: %w", err)
return fmt.Errorf("failed to encode mount: %w", err)
}
ps := PersistedShard{
Key: s.key.String(),
Expand All @@ -40,22 +112,11 @@ func (s *Shard) MarshalJSON() ([]byte, error) {
if s.err != nil {
ps.Error = s.err.Error()
}

return json.Marshal(ps)
// TODO maybe switch to CBOR, as it's probably faster.
// var b bytes.Buffer
// if err := ps.MarshalCBOR(&b); err != nil {
// return nil, err
// }
// return b.Bytes(), nil
return store.Save(ctx, ps)
}

func (s *Shard) UnmarshalJSON(b []byte) error {
var ps PersistedShard // TODO try to avoid this alloc by marshalling/unmarshalling directly.
if err := json.Unmarshal(b, &ps); err != nil {
return err
}

func fromPersistedShard(ctx context.Context, d *DAGStore, ps *PersistedShard) (*Shard, error) {
s := &Shard{d: d}
// restore basics.
s.key = shard.KeyFromString(ps.Key)
s.state = ps.State
Expand All @@ -67,34 +128,15 @@ func (s *Shard) UnmarshalJSON(b []byte) error {
// restore mount.
u, err := url.Parse(ps.URL)
if err != nil {
return fmt.Errorf("failed to parse mount URL: %w", err)
return nil, fmt.Errorf("failed to parse mount URL: %w", err)
}
mnt, err := s.d.mounts.Instantiate(u)
if err != nil {
return fmt.Errorf("failed to instantiate mount from URL: %w", err)
return nil, fmt.Errorf("failed to instantiate mount from URL: %w", err)
}
s.mount, err = mount.Upgrade(mnt, s.d.throttleReaadyFetch, s.d.config.TransientsDir, s.key.String(), ps.TransientPath)
if err != nil {
return fmt.Errorf("failed to apply mount upgrader: %w", err)
return nil, fmt.Errorf("failed to apply mount upgrader: %w", err)
}

return nil
}

// persist persists the shard's state into the supplied Datastore. It calls
// MarshalJSON, which requires holding a shard lock to be safe.
func (s *Shard) persist(ctx context.Context, store ds.Datastore) error {
ps, err := s.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to serialize shard state: %w", err)
}
// assuming that the datastore is namespaced if need be.
k := ds.NewKey(s.key.String())
if err := store.Put(ctx, k, ps); err != nil {
return fmt.Errorf("failed to put shard state: %w", err)
}
if err := store.Sync(ctx, ds.Key{}); err != nil {
return fmt.Errorf("failed to sync shard state to store: %w", err)
}
return nil
return s, nil
}