Skip to content

introduce GoForEach functions #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
go-bench:
strategy:
matrix:
go-version: [ '1.20', 'stable' ]
go-version: [ '1.23', 'stable' ]
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
build:
strategy:
matrix:
go-version: ['1.20', 'stable']
go-version: ['1.23', 'stable']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
go-bench:
strategy:
matrix:
go-version: [ '1.20', 'stable' ]
go-version: [ '1.23', 'stable' ]
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/sourcegraph/conc

go 1.20
go 1.23

require github.com/stretchr/testify v1.8.1

Expand Down
10 changes: 10 additions & 0 deletions pool/context_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"iter"
)

// ContextPool is a pool that runs tasks that take a context.
Expand Down Expand Up @@ -49,6 +50,15 @@ func (p *ContextPool) Go(f func(ctx context.Context) error) {
})
}

// GoForEach executes the given function concurrently for each element in the iterator.
// It maintains the order of the input iterator in the execution. This method is useful
// for parallel processing of iterable data structures while preserving their original sequence.
func (p *ContextPool) GoForEach(seq iter.Seq[func(context.Context) error]) {
for f := range seq {
p.Go(f)
}
}

// Wait cleans up all spawned goroutines, propagates any panics, and
// returns an error if any of the tasks errored.
func (p *ContextPool) Wait() error {
Expand Down
29 changes: 29 additions & 0 deletions pool/context_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"strconv"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -36,6 +37,34 @@ func ExampleContextPool_WithCancelOnError() {
// I will cancel all other tasks!
}

func ExampleContextPool_GoForEach() {
seq := func() iter.Seq[func(context.Context) error] {
return func(yield func(func(context.Context) error) bool) {
for i := 0; i < 3; i++ {
if !yield(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
}) {
return
}
}
}
}

p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
p.GoForEach(seq())
err := p.Wait()
fmt.Println(err)
// Output:
// I will cancel all other tasks!
}

func TestContextPool(t *testing.T) {
t.Parallel()

Expand Down
10 changes: 10 additions & 0 deletions pool/error_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pool
import (
"context"
"errors"
"iter"
"sync"
)

Expand Down Expand Up @@ -30,6 +31,15 @@ func (p *ErrorPool) Go(f func() error) {
})
}

// GoForEach executes the given function concurrently for each element in the iterator.
// It maintains the order of the input iterator in the execution. This method is useful
// for parallel processing of iterable data structures while preserving their original sequence.
func (p *ErrorPool) GoForEach(seq iter.Seq[func() error]) {
for f := range seq {
p.Go(f)
}
}

// Wait cleans up any spawned goroutines, propagating any panics and
// returning any errors from tasks.
func (p *ErrorPool) Wait() error {
Expand Down
25 changes: 25 additions & 0 deletions pool/error_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pool_test
import (
"errors"
"fmt"
"iter"
"strconv"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -30,6 +31,30 @@ func ExampleErrorPool() {
// oh no!
}

func ExampleErrorPool_GoForEach() {
seq := func() iter.Seq[func() error] {
return func(yield func(func() error) bool) {
for i := 0; i < 3; i++ {
if !yield(func() error {
if i == 2 {
return errors.New("oh no!")
}
return nil
}) {
return
}
}
}
}

p := pool.New().WithErrors()
p.GoForEach(seq())
err := p.Wait()
fmt.Println(err)
// Output:
// oh no!
}

func TestErrorPool(t *testing.T) {
t.Parallel()

Expand Down
25 changes: 25 additions & 0 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool_test

import (
"fmt"
"iter"
"strconv"
"sync/atomic"
"testing"
Expand All @@ -28,6 +29,30 @@ func ExamplePool() {
// conc
}

func ExamplePool_GoForEach() {
seq := func(count int) iter.Seq[func()] {
return func(yield func(func()) bool) {
for i := 0; i < count; i++ {
if !yield(func() {
fmt.Println("conc")
}) {
return
}
}
}
}

p := pool.New().WithMaxGoroutines(3)
p.GoForEach(seq(5))
p.Wait()
// Output:
// conc
// conc
// conc
// conc
// conc
}

func TestPool(t *testing.T) {
t.Parallel()

Expand Down
10 changes: 10 additions & 0 deletions pool/result_context_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"iter"
)

// ResultContextPool is a pool that runs tasks that take a context and return a
Expand All @@ -28,6 +29,15 @@ func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
})
}

// GoForEach executes the given function concurrently for each element in the iterator.
// It maintains the order of the input iterator in the execution. This method is useful
// for parallel processing of iterable data structures while preserving their original sequence.
func (p *ResultContextPool[T]) GoForEach(seq iter.Seq[func(context.Context) (T, error)]) {
for f := range seq {
p.Go(f)
}
}

// Wait cleans up all spawned goroutines, propagates any panics, and
// returns an error if any of the tasks errored.
func (p *ResultContextPool[T]) Wait() ([]T, error) {
Expand Down
25 changes: 25 additions & 0 deletions pool/result_context_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"strconv"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -244,4 +245,28 @@ func TestResultContextPool(t *testing.T) {
require.ErrorIs(t, errs2, err2)
require.NotErrorIs(t, errs2, err1)
})

t.Run("GoForEach", func(t *testing.T) {
t.Parallel()
seq := func() iter.Seq[func(context.Context) (int, error)] {
return func(yield func(func(context.Context) (int, error)) bool) {
if !yield(func(context.Context) (int, error) { return 0, err1 }) {
return
}
if !yield(func(context.Context) (int, error) { return 0, nil }) {
return
}
if !yield(func(context.Context) (int, error) { return 0, err2 }) {
return
}
}
}

g := pool.NewWithResults[int]().WithErrors().WithContext(context.Background())
g.GoForEach(seq())
res, err := g.Wait()
require.Len(t, res, 1)
require.ErrorIs(t, err, err1)
require.ErrorIs(t, err, err2)
})
}
10 changes: 10 additions & 0 deletions pool/result_error_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"iter"
)

// ResultErrorPool is a pool that executes tasks that return a generic result
Expand Down Expand Up @@ -30,6 +31,15 @@ func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
})
}

// GoForEach executes the given function concurrently for each element in the iterator.
// It maintains the order of the input iterator in the execution. This method is useful
// for parallel processing of iterable data structures while preserving their original sequence.
func (p *ResultErrorPool[T]) GoForEach(seq iter.Seq[func() (T, error)]) {
for f := range seq {
p.Go(f)
}
}

// Wait cleans up any spawned goroutines, propagating any panics and
// returning the results and any errors from tasks.
func (p *ResultErrorPool[T]) Wait() ([]T, error) {
Expand Down
10 changes: 10 additions & 0 deletions pool/result_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"iter"
"sort"
"sync"
)
Expand Down Expand Up @@ -36,6 +37,15 @@ func (p *ResultPool[T]) Go(f func() T) {
})
}

// GoForEach executes the given function concurrently for each element in the iterator.
// It maintains the order of the input iterator in the execution. This method is useful
// for parallel processing of iterable data structures while preserving their original sequence.
func (p *ResultPool[T]) GoForEach(seq iter.Seq[func() T]) {
for f := range seq {
p.Go(f)
}
}

// Wait cleans up all spawned goroutines, propagating any panics, and returning
// a slice of results from tasks that did not panic.
func (p *ResultPool[T]) Wait() []T {
Expand Down
22 changes: 22 additions & 0 deletions pool/result_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool_test

import (
"fmt"
"iter"
"math/rand"
"strconv"
"sync/atomic"
Expand All @@ -28,6 +29,27 @@ func ExampleResultPool() {
// [0 2 4 6 8 10 12 14 16 18]
}

func ExampleResultPool_GoForEach() {
seq := func() iter.Seq[func() int] {
return func(yield func(func() int) bool) {
for i := 0; i < 10; i++ {
if !yield(func() int {
return i * 2
}) {
return
}
}
}
}
p := pool.NewWithResults[int]()
p.GoForEach(seq())
res := p.Wait()
fmt.Println(res)

// Output:
// [0 2 4 6 8 10 12 14 16 18]
}

func TestResultGroup(t *testing.T) {
t.Parallel()

Expand Down
10 changes: 10 additions & 0 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package stream

import (
"iter"
"sync"

"github.com/sourcegraph/conc"
Expand Down Expand Up @@ -86,6 +87,15 @@ func (s *Stream) Go(f Task) {
})
}

// GoForEach executes the given function concurrently for each element in the iterator.
// It maintains the order of the input iterator in the execution. This method is useful
// for parallel processing of iterable data structures while preserving their original sequence.
func (s *Stream) GoForEach(seq iter.Seq[Task]) {
for f := range seq {
s.Go(f)
}
}

// Wait signals to the stream that all tasks have been submitted. Wait will
// not return until all tasks and callbacks have been run.
func (s *Stream) Wait() {
Expand Down
Loading