Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vyzo's review of the dagstore #93

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
// RecoverOnAcquire will automatically queue a recovery for a failed shard
// on the first acquire attempt, and will park that acquire while recovery
// is in progress.
// vyzo: I would call this RecoverLazy because that's the standard terminology
// for this mode of operation.
RecoverOnAcquire

// RecoverNow will eagerly trigger a recovery for all failed shards
Expand Down Expand Up @@ -258,6 +260,7 @@ func (d *DAGStore) Start(ctx context.Context) error {
case ShardStateErrored:
switch d.config.RecoverOnStart {
case DoNotRecover:
// vyzo: warn this
log.Infow("start: skipping recovery of shard in errored state", "shard", s.key, "error", s.err)
case RecoverOnAcquire:
log.Infow("start: failed shard will recover on next acquire", "shard", s.key, "error", s.err)
Expand Down Expand Up @@ -490,6 +493,9 @@ func (d *DAGStore) AllShardsInfo() AllShardsInfo {
//
// GC runs with exclusivity from the event loop.
func (d *DAGStore) GC(ctx context.Context) (*GCResult, error) {
// vyzo: this needs to be buffered with len=1 so that GC can send a response back without blocking
// Note: upon reading the GC code, it also short-circuits on a (presumably the same) context.
// But still making this buffered would not have me guessing and having to read the code for gc.
ch := make(chan *GCResult)
select {
case d.gcCh <- ch:
Expand All @@ -500,6 +506,7 @@ func (d *DAGStore) GC(ctx context.Context) (*GCResult, error) {
select {
case res := <-ch:
return res, nil
// vyzo: here we can short-circuit the receive, hence the channel should be buffered.
case <-ctx.Done():
return nil, ctx.Err()
}
Expand All @@ -512,6 +519,7 @@ func (d *DAGStore) Close() error {
return nil
}

// vyzo: _en_queueTask or pushTask is the standard terminology; just queue is not a verb :)
func (d *DAGStore) queueTask(tsk *task, ch chan<- *task) error {
select {
case <-d.ctx.Done():
Expand Down Expand Up @@ -539,6 +547,10 @@ func (d *DAGStore) restoreState() error {

log.Debugw("restored shard state on dagstore startup", "shard", s.key, "shard state", s.state, "shard error", s.err,
"shard lazy", s.lazy)
// vyzo: is this lock protected? does it have to?
// add some comment here; it seems this method is only called on start, but what about
// concurrent calls _while we are starting_?
// I think it should be lock protected.
d.shards[s.key] = s
}
}
Expand Down
5 changes: 5 additions & 0 deletions dagstore_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ func (d *DAGStore) acquireAsync(ctx context.Context, w *waiter, s *Shard, mnt mo
log.Warnw("acquire: failed to fetch from mount upgrader", "shard", s.key, "error", err)

// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
// vyzo: swallowed error
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)

// fail the shard
// vyzo: swallowed error
_ = d.failShard(s, d.completionCh, "failed to acquire reader of mount so we can return the accessor: %w", err)

// send the shard error to the caller.
Expand All @@ -43,9 +45,11 @@ func (d *DAGStore) acquireAsync(ctx context.Context, w *waiter, s *Shard, mnt mo
}

// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
// vyzo: swallowed error
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)

// fail the shard
// vyzo: swallowed error
_ = d.failShard(s, d.completionCh, "failed to recover index for shard %s: %w", k, err)

// send the shard error to the caller.
Expand Down Expand Up @@ -95,5 +99,6 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun
return
}

// vyzo: swallowed error
_ = d.queueTask(&task{op: OpShardMakeAvailable, shard: s}, d.completionCh)
}
21 changes: 21 additions & 0 deletions dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (d *DAGStore) control() {
if err == context.Canceled {
log.Infow("dagstore closed")
} else {
// vyzo: will a user actually see this? maybe it is appropraite to panic here instead
// of leaving a dangling process that can't make progress.
log.Errorw("consuming next task failed; aborted event loop; dagstore unoperational", "error", err)
}
return
Expand All @@ -61,6 +63,13 @@ func (d *DAGStore) control() {
s := tsk.shard
log.Debugw("processing task", "op", tsk.op, "shard", tsk.shard.key, "error", tsk.err)

// vyzo: this lock is very long-lived; I would make the following logic a method
// and lock/defer unlock in there.
// Extending this with regards to the queueTask dance (and the swallowed errors)
// I think that the code would be much cleaner if the the task processing
// is abstracted in a method that returns optionally a next task (for avoiding the
// internalCh indirection) and an error to break the loop.
// This will also allow us to remove the huge lock scope without defer smell.
s.lk.Lock()
prevState := s.state

Expand All @@ -86,6 +95,11 @@ func (d *DAGStore) control() {

// otherwise, park the registration channel and queue the init.
s.wRegister = tsk.waiter
// vyzo: why is the error swallowed? shouldn't we catch+log+exit if this fails?
// also, note: I don't like the bit with having an internalCh here, as it is error prone.
// the code is fine with the way it processes the internal channel, but it would be
// much simpler to make this a method that returns the next event instead of going
// through a channel.
_ = d.queueTask(&task{op: OpShardInitialize, shard: s, waiter: tsk.waiter}, d.internalCh)

case OpShardInitialize:
Expand All @@ -94,6 +108,7 @@ func (d *DAGStore) control() {
// if we already have the index for this shard, there's nothing to do here.
if istat, err := d.indices.StatFullIndex(s.key); err == nil && istat.Exists {
log.Debugw("already have an index for shard being initialized, nothing to do", "shard", s.key)
// vyzo: same here, swallowed error, internalCh
_ = d.queueTask(&task{op: OpShardMakeAvailable, shard: s}, d.internalCh)
break
}
Expand Down Expand Up @@ -147,6 +162,7 @@ func (d *DAGStore) control() {
// to avoid the first context cancellation interrupting the
// recovery that may be blocking other acquirers with longer
// contexts.
// vyzo: swallowed error, internalCh
_ = d.queueTask(&task{op: OpShardRecover, shard: s, waiter: &waiter{ctx: d.ctx}}, d.internalCh)
} else {
err := fmt.Errorf("shard is in errored state; err: %w", s.err)
Expand All @@ -171,6 +187,7 @@ func (d *DAGStore) control() {
// if the first one cancels, the entire job would be cancelled.
w := *tsk.waiter
w.ctx = context.Background()
// vyzo: swallowed error, internalCh
_ = d.queueTask(&task{op: OpShardInitialize, shard: s, waiter: &w}, d.internalCh)
}

Expand Down Expand Up @@ -252,6 +269,7 @@ func (d *DAGStore) control() {
res := &ShardResult{Key: s.key, Error: s.err}
d.dispatchFailuresCh <- &dispatch{res: res, w: wFailure}
}
// vyzo: log it if not, this swallowed here and it has to at least be in the logs.

case OpShardRecover:
if s.state != ShardStateErrored {
Expand All @@ -274,12 +292,14 @@ func (d *DAGStore) control() {
// transient to "" always.
if err := s.mount.DeleteTransient(); err != nil {
log.Warnw("recovery: failed to delete transient", "shard", s.key, "error", err)
// vyzo: is it safe to proceed if this happens? should we break?
}

// attempt to drop the index.
dropped, err := d.indices.DropFullIndex(s.key)
if err != nil {
log.Warnw("recovery: failed to drop index for shard", "shard", s.key, "error", err)
// vyzo: is it safe to initialize if this happens? should we break?
} else if !dropped {
log.Debugw("recovery: no index dropped for shard", "shard", s.key)
}
Expand All @@ -299,6 +319,7 @@ func (d *DAGStore) control() {
delete(d.shards, s.key)
d.lk.Unlock()
// TODO are we guaranteed that there are no queued items for this shard?
// vyzo: no, doesn't look like it by reading this code.

default:
panic(fmt.Sprintf("unrecognized shard operation: %d", tsk.op))
Expand Down
4 changes: 4 additions & 0 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (d *DAGStore) dispatchResult(res *ShardResult, waiters ...*waiter) {
// no return channel; skip.
continue
}
// vyzo: this can block and take the event loop down with it; it can happen if the context
// is done in dispatcher.
// this needs the context to select on, and return an error if it is done so that
// the event loop can exit
d.dispatchResultsCh <- &dispatch{w: w, res: res}
}
}