Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func (h *processInputHeap) Push(x interface{}) {
*h = append(*h, x.(*processInput))
}

func (s processInputHeap) Peek() (*processInput, bool) {
if len(s) > 0 {
return s[0], true
func (h processInputHeap) Peek() (*processInput, bool) {
if len(h) > 0 {
return h[0], true
}
return nil, false
}
Expand Down
6 changes: 2 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ type OrderedOutput struct {
}

// WorkFunction interface
type WorkFunction interface {
Run() interface{}
}
type WorkFunction func() interface{}

// Process processes work function based on input.
// It Accepts an WorkFunction read channel, work function and concurrent go routine pool size.
Expand Down Expand Up @@ -69,7 +67,7 @@ func Process(inputChan <-chan WorkFunction, options *Options) <-chan OrderedOutp
poolWg.Done()
}()
for input := range processChan {
input.value = input.workFn.Run()
input.value = input.workFn()
input.workFn = nil
aggregatorChan <- input
}
Expand Down
38 changes: 19 additions & 19 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
"time"
)

type zeroLoadWorker int

func (w zeroLoadWorker) Run() interface{} {
return w * 2
func zeroLoadWorker(w int) func() interface{} {
return func() interface{} {
return w * 2
}
}

type loadWorker int

func (w loadWorker) Run() interface{} {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
return w * 2
func loadWorker(w int) func() interface{} {
return func() interface{} {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
return w * 2
}
}

func Test1(t *testing.T) {
Expand All @@ -31,7 +31,7 @@ func Test1(t *testing.T) {
counter := 0
go func(t *testing.T) {
for out := range outChan {
if _, ok := out.Value.(loadWorker); !ok {
if _, ok := out.Value.(int); !ok {
t.Error("Invalid output")
} else {
counter++
Expand Down Expand Up @@ -64,7 +64,7 @@ func Test2(t *testing.T) {
counter := 0
go func(t *testing.T) {
for out := range outChan {
if _, ok := out.Value.(loadWorker); !ok {
if _, ok := out.Value.(int); !ok {
t.Error("Invalid output")
} else {
counter++
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test3(t *testing.T) {
counter := 0
go func(t *testing.T) {
for out := range outChan {
if _, ok := out.Value.(zeroLoadWorker); !ok {
if _, ok := out.Value.(int); !ok {
t.Error("Invalid output")
} else {
counter++
Expand Down Expand Up @@ -134,7 +134,7 @@ func Test4(t *testing.T) {
}()
counter := 0
for out := range output {
if _, ok := out.Value.(zeroLoadWorker); !ok {
if _, ok := out.Value.(int); !ok {
t.Error("Invalid output")
} else {
counter++
Expand All @@ -158,9 +158,9 @@ func TestSortedData(t *testing.T) {
}
close(inputChan)
}()
var res []loadWorker
var res []int
for out := range output {
res = append(res, out.Value.(loadWorker))
res = append(res, out.Value.(int))
}
isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
return res[i] < res[j]
Expand All @@ -184,9 +184,9 @@ func TestSortedDataMultiple(t *testing.T) {
}
close(inputChan)
}()
var res []loadWorker
var res []int
for out := range output {
res = append(res, out.Value.(loadWorker))
res = append(res, out.Value.(int))
}
isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
return res[i] < res[j]
Expand Down Expand Up @@ -222,11 +222,11 @@ func TestStreamingInput(t *testing.T) {
}
}()

var res []zeroLoadWorker
var res []int

go func() {
for out := range output {
res = append(res, out.Value.(zeroLoadWorker))
res = append(res, out.Value.(int))
wg.Done()
}
}()
Expand Down