-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtransform.go
132 lines (119 loc) · 3.57 KB
/
transform.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package jpipe
import (
"time"
"github.com/junitechnology/jpipe/item"
"github.com/junitechnology/jpipe/options"
)
// Map transforms every input value with a mapper function and sends the results to the output channel.
//
// Example:
//
// output := Map(input, func(i int) int { return i + 10 })
//
// input : 0--1--2--3--4--5--X
// output: 10-11-12-13-14-15-X
func Map[T any, R any](input *Channel[T], mapper func(T) R, opts ...options.MapOption) *Channel[R] {
var processor processor[T, R] = func(value T) (R, bool) {
return mapper(value), true
}
worker := processor.PooledWorker(getPooledWorkerOptions(opts)...)
_, output := newLinearPipelineNode("Map", input, worker, getNodeOptions(opts)...)
return output
}
// FlatMap transforms every input value into a Channel and for each of those, it sends all values to the output channel.
//
// Example:
//
// output := FlatMap(input, func(i int) *Channel[int] { return FromSlice([]int{i, i + 10}) })
//
// input : 0------1------2------3------4------5------X
// output: 0-10---1-11---2-12---3-13---4-14---5-15---X
func FlatMap[T any, R any](input *Channel[T], mapper func(T) *Channel[R], opts ...options.FlatMapOption) *Channel[R] {
var worker worker[T, R] = func(node workerNode[T, R]) {
node.LoopInput(0, func(value T) bool {
mappedChannel := mapper(value)
for outputValue := range mappedChannel.getChannel() {
if !node.Send(outputValue) {
mappedChannel.unsubscribe()
return false
}
}
return true
})
}
worker = worker.Pooled(getPooledWorkerOptions(opts)...)
_, output := newLinearPipelineNode("FlatMap", input, worker, getNodeOptions(opts)...)
return output
}
// Batch batches input values in slices and sends those slices to the output channel
// Batches can be limited by size and by time.
// Size/time are ignored if they are 0
//
// Example:
//
// output := Batch(input, 3, 0)
//
// input : 0--1----2----------3------4--5----------6--7----X
// output: --------{1-2-3}--------------{3-4-5}-------{6-7}X
func Batch[T any](input *Channel[T], size int, timeout time.Duration) *Channel[[]T] {
nextTimeout := func() <-chan time.Time {
if timeout > 0 {
return time.After(timeout)
}
return make(<-chan time.Time)
}
worker := func(node workerNode[T, []T]) {
batch := []T{}
timeout := nextTimeout()
for {
var flush, done bool
select {
case <-node.QuitSignal(): // the nested select gives priority to the quit signal, so we always exit early if needed
flush = true
done = true
default:
select {
case <-node.QuitSignal():
flush = true
done = true
case value, open := <-node.Inputs()[0].getChannel():
if !open {
flush = true
done = true
break
}
batch = append(batch, value)
if len(batch) == size {
flush = true
}
case <-timeout:
flush = true
}
}
if flush {
if !node.Send(batch) {
return
}
batch = []T{}
timeout = nextTimeout()
}
if done {
return
}
}
}
_, output := newLinearPipelineNode("Batch", input, worker)
return output
}
// Wrap wraps every input value T in an Item[T] and sends it to the output channel.
// Item[T] is used mostly to represent items that can have either a value or an error.
// Another use for Item[T] is using the Context in it and enrich it in successive operators.
func Wrap[T any](input *Channel[T]) *Channel[item.Item[T]] {
worker := func(node workerNode[T, item.Item[T]]) {
node.LoopInput(0, func(value T) bool {
return node.Send(item.Item[T]{Value: value})
})
}
_, output := newLinearPipelineNode("Wrap", input, worker)
return output
}