Skip to content

Commit

Permalink
sentinel error for group done so we can still cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
mumbleskates committed Sep 23, 2024
1 parent 467247c commit fa0441e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
22 changes: 16 additions & 6 deletions collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,16 @@ func FeedWithErrs[T any](executor Executor, receiver func(context.Context, T) er
return making
}

// groupError returns the error associated with a group's context; if the error
// was errGroupDone, that doesn't count as an error and nil is returned instead.
func groupError(ctx context.Context) error {
err := context.Cause(ctx)
if err == errGroupDone {
return nil
}
return err
}

var _ ErrGroupExecutor = &errGroup{}

type errGroup struct {
Expand All @@ -242,7 +252,7 @@ func (eg *errGroup) Go(op func(context.Context) error) {
func (eg *errGroup) Wait() error {
eg.g.Wait()
ctx, _ := eg.g.getContext()
return context.Cause(ctx)
return groupError(ctx)
}

func makePipeGroup[T any, R any](executor Executor) *pipeGroup[T, R] {
Expand Down Expand Up @@ -329,7 +339,7 @@ func (cg collectingGroup[T]) Go(op func(context.Context) (T, error)) {
func (cg collectingGroup[T]) Wait() ([]T, error) {
cg.doWait()
ctx, _ := cg.g.getContext()
if err := context.Cause(ctx); err != nil {
if err := groupError(ctx); err != nil {
// We have an error; return it
return nil, err
}
Expand Down Expand Up @@ -358,7 +368,7 @@ func (fg feedingGroup[T]) Go(op func(context.Context) (T, error)) {
func (fg feedingGroup[T]) Wait() error {
fg.doWait()
ctx, _ := fg.g.getContext()
return context.Cause(ctx)
return groupError(ctx)
}

var _ AllErrsExecutor = multiErrGroup{}
Expand All @@ -381,7 +391,7 @@ func (meg multiErrGroup) Wait() MultiError {
meg.doWait()
err := CombineErrors(*meg.res...)
ctx, _ := meg.g.getContext()
if cause := context.Cause(ctx); cause != nil {
if cause := groupError(ctx); cause != nil {
return CombineErrors(cause, err)
}
return err
Expand Down Expand Up @@ -415,7 +425,7 @@ func (ceg collectingMultiErrGroup[T]) Wait() ([]T, MultiError) {
ceg.doWait()
res, err := ceg.res.values, CombineErrors(ceg.res.errs...)
ctx, _ := ceg.g.getContext()
if cause := context.Cause(ctx); cause != nil {
if cause := groupError(ctx); cause != nil {
return res, CombineErrors(cause, err)
}
return res, err
Expand All @@ -439,7 +449,7 @@ func (feg feedingMultiErrGroup[T]) Wait() MultiError {
feg.doWait()
err := CombineErrors(*feg.res...)
ctx, _ := feg.g.getContext()
if cause := context.Cause(ctx); cause != nil {
if cause := groupError(ctx); cause != nil {
return CombineErrors(cause, err)
}
return err
Expand Down
19 changes: 13 additions & 6 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ const bufferSize = 8

const misuseMessage = "parallel executor misuse: don't reuse executors"

var errPanicked = errors.New("panicked")
var (
errPanicked = errors.New("panicked")
errGroupDone = errors.New("executor done")
errGroupAbandoned = errors.New("executor abandoned")

// Contexts are canceled with this error when executors are awaited.
GroupDoneError = errGroupDone
)

// NOTE: If you want to really get crazy with it, it IS permissible and safe to
// call Go(...) from multiple threads without additional synchronization, on
Expand Down Expand Up @@ -63,7 +70,7 @@ func Limited(ctx context.Context, maxGoroutines int) Executor {
gctx, cancel := context.WithCancelCause(ctx)
g := &runner{ctx: gctx, cancel: cancel}
runtime.SetFinalizer(g, func(doomed *runner) {
doomed.cancel(nil)
doomed.cancel(errGroupAbandoned)
})
return g
}
Expand All @@ -74,7 +81,7 @@ func Limited(ctx context.Context, maxGoroutines int) Executor {
}
runtime.SetFinalizer(making, func(doomed *limitedGroup) {
close(doomed.ops)
doomed.g.cancel(nil)
doomed.g.cancel(errGroupAbandoned)
})
return making
}
Expand Down Expand Up @@ -102,7 +109,7 @@ func (n *runner) Wait() {
n.awaited.Store(true)
// We are ending the group's lifetime within this function call; defer the
// cancelation and unset our finalizer.
n.cancel(nil)
n.cancel(errGroupDone)
runtime.SetFinalizer(n, nil)
}

Expand All @@ -113,7 +120,7 @@ func (n *runner) getContext() (context.Context, context.CancelCauseFunc) {
func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group {
g := &group{runner: runner{ctx: ctx, cancel: cancel}}
runtime.SetFinalizer(g, func(doomed *group) {
doomed.cancel(nil)
doomed.cancel(errGroupAbandoned)
})
return g
}
Expand Down Expand Up @@ -161,7 +168,7 @@ func (g *group) Wait() {
g.awaited.Store(true)
// We are ending the group's lifetime within this function call; defer the
// cancelation and unset our finalizer.
defer g.cancel(nil)
defer g.cancel(errGroupDone)
runtime.SetFinalizer(g, nil)
g.wg.Wait()
g.checkPanic()
Expand Down

0 comments on commit fa0441e

Please sign in to comment.