diff --git a/collect.go b/collect.go index 902341f..75a6c04 100644 --- a/collect.go +++ b/collect.go @@ -223,6 +223,16 @@ func FeedWithErrs[T any](executor Executor, receiver func(context.Context, T) er return making } +// groupError returns the error associated with a group's context; if the error +// was errGroupDone, that doesn't count as an error and nil is returned instead. +func groupError(ctx context.Context) error { + err := context.Cause(ctx) + if err == errGroupDone { + return nil + } + return err +} + var _ ErrGroupExecutor = &errGroup{} type errGroup struct { @@ -242,7 +252,7 @@ func (eg *errGroup) Go(op func(context.Context) error) { func (eg *errGroup) Wait() error { eg.g.Wait() ctx, _ := eg.g.getContext() - return context.Cause(ctx) + return groupError(ctx) } func makePipeGroup[T any, R any](executor Executor) *pipeGroup[T, R] { @@ -329,7 +339,7 @@ func (cg collectingGroup[T]) Go(op func(context.Context) (T, error)) { func (cg collectingGroup[T]) Wait() ([]T, error) { cg.doWait() ctx, _ := cg.g.getContext() - if err := context.Cause(ctx); err != nil { + if err := groupError(ctx); err != nil { // We have an error; return it return nil, err } @@ -358,7 +368,7 @@ func (fg feedingGroup[T]) Go(op func(context.Context) (T, error)) { func (fg feedingGroup[T]) Wait() error { fg.doWait() ctx, _ := fg.g.getContext() - return context.Cause(ctx) + return groupError(ctx) } var _ AllErrsExecutor = multiErrGroup{} @@ -381,7 +391,7 @@ func (meg multiErrGroup) Wait() MultiError { meg.doWait() err := CombineErrors(*meg.res...) ctx, _ := meg.g.getContext() - if cause := context.Cause(ctx); cause != nil { + if cause := groupError(ctx); cause != nil { return CombineErrors(cause, err) } return err @@ -415,7 +425,7 @@ func (ceg collectingMultiErrGroup[T]) Wait() ([]T, MultiError) { ceg.doWait() res, err := ceg.res.values, CombineErrors(ceg.res.errs...) ctx, _ := ceg.g.getContext() - if cause := context.Cause(ctx); cause != nil { + if cause := groupError(ctx); cause != nil { return res, CombineErrors(cause, err) } return res, err @@ -439,7 +449,7 @@ func (feg feedingMultiErrGroup[T]) Wait() MultiError { feg.doWait() err := CombineErrors(*feg.res...) ctx, _ := feg.g.getContext() - if cause := context.Cause(ctx); cause != nil { + if cause := groupError(ctx); cause != nil { return CombineErrors(cause, err) } return err diff --git a/group.go b/group.go index b155f7b..befcde0 100644 --- a/group.go +++ b/group.go @@ -15,7 +15,14 @@ const bufferSize = 8 const misuseMessage = "parallel executor misuse: don't reuse executors" -var errPanicked = errors.New("panicked") +var ( + errPanicked = errors.New("panicked") + errGroupDone = errors.New("executor done") + errGroupAbandoned = errors.New("executor abandoned") + + // Contexts are canceled with this error when executors are awaited. + GroupDoneError = errGroupDone +) // NOTE: If you want to really get crazy with it, it IS permissible and safe to // call Go(...) from multiple threads without additional synchronization, on @@ -63,7 +70,7 @@ func Limited(ctx context.Context, maxGoroutines int) Executor { gctx, cancel := context.WithCancelCause(ctx) g := &runner{ctx: gctx, cancel: cancel} runtime.SetFinalizer(g, func(doomed *runner) { - doomed.cancel(nil) + doomed.cancel(errGroupAbandoned) }) return g } @@ -74,7 +81,7 @@ func Limited(ctx context.Context, maxGoroutines int) Executor { } runtime.SetFinalizer(making, func(doomed *limitedGroup) { close(doomed.ops) - doomed.g.cancel(nil) + doomed.g.cancel(errGroupAbandoned) }) return making } @@ -102,7 +109,7 @@ func (n *runner) Wait() { n.awaited.Store(true) // We are ending the group's lifetime within this function call; defer the // cancelation and unset our finalizer. - n.cancel(nil) + n.cancel(errGroupDone) runtime.SetFinalizer(n, nil) } @@ -113,7 +120,7 @@ func (n *runner) getContext() (context.Context, context.CancelCauseFunc) { func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group { g := &group{runner: runner{ctx: ctx, cancel: cancel}} runtime.SetFinalizer(g, func(doomed *group) { - doomed.cancel(nil) + doomed.cancel(errGroupAbandoned) }) return g } @@ -161,7 +168,7 @@ func (g *group) Wait() { g.awaited.Store(true) // We are ending the group's lifetime within this function call; defer the // cancelation and unset our finalizer. - defer g.cancel(nil) + defer g.cancel(errGroupDone) runtime.SetFinalizer(g, nil) g.wg.Wait() g.checkPanic()