-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfig.go
159 lines (133 loc) · 4.05 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package mapper
import (
"time"
)
type (
// Config stores mapper configuration settings
Config struct {
// Path is the mount point for the server
Path string
// DatastorePrefix is added to the beginning of every mapreduce collection name
DatastorePrefix string
// DefaultQueue is the default queue to use for mapreduce tasks if not
DefaultQueue string
// Shards is the default number of shards to use
Shards int
// Oversampling is a factor to increase the number of scatter samples
// and helps achieve more even shard distribution with 'clumpy' data
// (clumpy is definitely a technical term)
Oversampling int
// LeaseDuration is how long a worker will hold a lock for
LeaseDuration time.Duration
// LeaseTimeout is the time considered to be a timeout
LeaseTimeout time.Duration
// TaskTimeout is the time to execute a task for.
// For frontend instances the limit is 10 minutes
TaskTimeout time.Duration
// CursorTimeout is the time to use a cursor for before requerying
// The default limit is 50 seconds
CursorTimeout time.Duration
// Retries is the maximum number of times to retry a failing task
Retries int
// LogVerbose controls verbose logging output
LogVerbose bool
// Host sets the host header on tasks (if set)
Host string
}
// Option is the signature for mapper configuration options
Option func(*Config) error
)
// newConfig creates a new config with default values
func newConfig() *Config {
// TODO: use config as default, allow setting some values per-job
// and prevent config changes affecting already-running tasks
return &Config{
Path: DefaultPath,
DatastorePrefix: "MP_",
DefaultQueue: "",
Shards: 8,
Oversampling: 32,
LeaseDuration: time.Duration(30) * time.Second,
LeaseTimeout: time.Duration(10)*time.Minute + time.Duration(30)*time.Second,
TaskTimeout: time.Duration(10)*time.Minute - time.Duration(30)*time.Second,
CursorTimeout: time.Duration(50) * time.Second,
Retries: 31,
LogVerbose: false,
Host: "",
}
}
// DatastorePrefix sets the prefix for mapper datastore collections
func DatastorePrefix(prefix string) func(*Config) error {
return func(c *Config) error {
c.DatastorePrefix = prefix
return nil
}
}
// DefaultQueue sets the default taskqueue to use when scheduling mapper tasks
func DefaultQueue(queue string) func(*Config) error {
return func(c *Config) error {
c.DefaultQueue = queue
return nil
}
}
// Shards sets the default target number of shards to use
func Shards(shards int) func(*Config) error {
return func(c *Config) error {
c.Shards = shards
return nil
}
}
// Oversampling sets the factor to use to even out sampling
func Oversampling(factor int) func(*Config) error {
return func(c *Config) error {
c.Oversampling = factor
return nil
}
}
// LeaseDuration sets how long a worker will hold a lock for
func LeaseDuration(duration time.Duration) func(*Config) error {
return func(c *Config) error {
c.LeaseDuration = duration
return nil
}
}
// LeaseTimeout sets how long before a lock will be considered timedout
func LeaseTimeout(duration time.Duration) func(*Config) error {
return func(c *Config) error {
c.LeaseTimeout = duration
return nil
}
}
// TaskTimeout sets how long a task is allowed to execute
func TaskTimeout(duration time.Duration) func(*Config) error {
return func(c *Config) error {
c.TaskTimeout = duration
return nil
}
}
// CursorTimeout sets how long a datastore cursor is allowed to run
func CursorTimeout(duration time.Duration) func(*Config) error {
return func(c *Config) error {
c.CursorTimeout = duration
return nil
}
}
// Retries sets how many times to attempt retrying a failing task
func Retries(retries int) func(*Config) error {
return func(c *Config) error {
c.Retries = retries
return nil
}
}
// LogVerbose sets verbos logging
func LogVerbose(c *Config) error {
c.LogVerbose = true
return nil
}
// Host sets the task host header
func Host(host string) func(*Config) error {
return func(c *Config) error {
c.Host = host
return nil
}
}