Skip to content

Commit 212135d

Browse files
authored
Merge pull request #1126 from shiv-amz/concurrent_worker
Enable concurrent workers for tagging controller
2 parents 3bfc188 + b586c99 commit 212135d

File tree

4 files changed

+21
-8
lines changed

4 files changed

+21
-8
lines changed

pkg/controllers/options/tagging_controller.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import (
2121
// TaggingControllerOptions contains the inputs that can
2222
// be used in the tagging controller
2323
type TaggingControllerOptions struct {
24-
Tags map[string]string
25-
Resources []string
26-
RateLimit float64
27-
BurstLimit int
24+
Tags map[string]string
25+
Resources []string
26+
RateLimit float64
27+
BurstLimit int
28+
WorkerCount int
2829
}
2930

3031
// AddFlags add the additional flags for the controller
@@ -35,6 +36,8 @@ func (o *TaggingControllerOptions) AddFlags(fs *pflag.FlagSet) {
3536
"Steady-state rate limit (per sec) at which the controller processes items in its queue. A value of zero (default) disables rate limiting.")
3637
fs.IntVar(&o.BurstLimit, "tagging-controller-burst-limit", o.BurstLimit,
3738
"Burst limit at which the controller processes items in its queue. A value of zero (default) disables rate limiting.")
39+
fs.IntVar(&o.WorkerCount, "tagging-controller-concurrent-node-syncs", 1,
40+
"The number of workers concurrently synchronizing nodes")
3841
}
3942

4043
// Validate checks for errors from user input
@@ -55,6 +58,10 @@ func (o *TaggingControllerOptions) Validate() error {
5558
return fmt.Errorf("--tagging-controller-burst-limit should not be less than zero")
5659
}
5760

61+
if o.WorkerCount <= 0 {
62+
return fmt.Errorf("--tagging-controller-concurrent-node-syncs must be a positive number")
63+
}
64+
5865
for _, r := range o.Resources {
5966
for _, resource := range SupportedResources {
6067
if r != resource {

pkg/controllers/tagging/tagging_controller.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ type Controller struct {
101101
resources []string
102102

103103
rateLimitEnabled bool
104+
workerCount int
104105
}
105106

106107
// NewTaggingController creates a NewTaggingController object
@@ -112,7 +113,8 @@ func NewTaggingController(
112113
tags map[string]string,
113114
resources []string,
114115
rateLimit float64,
115-
burstLimit int) (*Controller, error) {
116+
burstLimit int,
117+
workerCount int) (*Controller, error) {
116118

117119
awsCloud, ok := cloud.(*awsv1.Cloud)
118120
if !ok {
@@ -149,6 +151,7 @@ func NewTaggingController(
149151
nodesSynced: nodeInformer.Informer().HasSynced,
150152
nodeMonitorPeriod: nodeMonitorPeriod,
151153
rateLimitEnabled: rateLimitEnabled,
154+
workerCount: workerCount,
152155
}
153156

154157
// Use shared informer to listen to add/update/delete of nodes. Note that any nodes
@@ -194,7 +197,9 @@ func (tc *Controller) Run(stopCh <-chan struct{}) {
194197
}
195198

196199
klog.Infof("Starting the tagging controller")
197-
go wait.Until(tc.work, tc.nodeMonitorPeriod, stopCh)
200+
for i := 0; i < tc.workerCount; i++ {
201+
go wait.Until(tc.work, tc.nodeMonitorPeriod, stopCh)
202+
}
198203

199204
<-stopCh
200205
}

pkg/controllers/tagging/tagging_controller_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func TestMultipleEnqueues(t *testing.T) {
301301
t.Errorf("unexpected error: %v", err)
302302
}
303303

304-
tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0)
304+
tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0, 10)
305305
if err != nil {
306306
t.Errorf("unexpected error: %v", err)
307307
}

pkg/controllers/tagging/tagging_controller_wrapper.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ func (tc *ControllerWrapper) startTaggingController(ctx context.Context, initCon
4848
tc.Options.Tags,
4949
tc.Options.Resources,
5050
tc.Options.RateLimit,
51-
tc.Options.BurstLimit)
51+
tc.Options.BurstLimit,
52+
tc.Options.WorkerCount)
5253

5354
if err != nil {
5455
klog.Warningf("failed to start tagging controller: %s", err)

0 commit comments

Comments
 (0)