Skip to content

Commit 8ce37ff

Browse files
committed
feat(scaling): introduce biased-to-scaleout
1 parent 629d811 commit 8ce37ff

File tree

10 files changed

+418
-135
lines changed

10 files changed

+418
-135
lines changed

src/autoscaler/api/policyvalidator/policy_json.schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@
5858
"type": "integer",
5959
"title": "Maximum how many instances of application can be provisioned as part of application scaling"
6060
},
61+
"scaling_rule_evaluation": {
62+
"type": "string",
63+
"enum": ["first_matching", "biased_to_scale_out"],
64+
"default": "first_matching",
65+
"title": "The Scaling Rule Evaluation Schema"
66+
},
6167
"scaling_rules": {
6268
"$id": "#/properties/scaling_rules",
6369
"type": "array",

src/autoscaler/eventgenerator/aggregator/metric_poller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ var _ = Describe("MetricPoller", func() {
5353
BeforeEach(func() {
5454
metricFetcher, err := metric.NewLogCacheFetcherFactory(metric.StandardLogCacheFetcherCreator).CreateFetcher(logger, config.Config{
5555
MetricCollector: config.MetricCollectorConfig{
56-
MetricCollectorURL: "this.endpoint.does.not.exist:1234",
56+
MetricCollectorURL: "this.endpoint.is.invalid:1234",
5757
},
5858
})
5959
Expect(err).ToNot(HaveOccurred())

src/autoscaler/eventgenerator/cmd/eventgenerator/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func main() {
6666

6767
appManager := aggregator.NewAppManager(logger, egClock, conf.Aggregator.PolicyPollerInterval, len(conf.Server.NodeAddrs), conf.Server.NodeIndex, conf.Aggregator.MetricCacheSizePerApp, policyDb, appMetricDB)
6868

69-
triggersChan := make(chan []*models.Trigger, conf.Evaluator.TriggerArrayChannelSize)
69+
triggersChan := make(chan *models.DynamicScalingRules, conf.Evaluator.TriggerArrayChannelSize)
7070

7171
evaluationManager, err := generator.NewAppEvaluationManager(logger, conf.Evaluator.EvaluationManagerInterval, egClock, triggersChan, appManager.GetPolicies, conf.CircuitBreaker)
7272
if err != nil {
@@ -190,7 +190,7 @@ func loadConfig(path string) (*config.Config, error) {
190190
return conf, nil
191191
}
192192

193-
func createEvaluators(logger lager.Logger, conf *config.Config, triggersChan chan []*models.Trigger, queryMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) ([]*generator.Evaluator, error) {
193+
func createEvaluators(logger lager.Logger, conf *config.Config, triggersChan chan *models.DynamicScalingRules, queryMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) ([]*generator.Evaluator, error) {
194194
count := conf.Evaluator.EvaluatorCount
195195

196196
seClient, err := helpers.CreateHTTPSClient(&conf.ScalingEngine.TLSClientCerts, helpers.DefaultClientConfig(), logger.Session("scaling_client"))

src/autoscaler/eventgenerator/generator/appEvaluationManager.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ import (
1414
circuit "github.com/rubyist/circuitbreaker"
1515
)
1616

17-
type ConsumeAppMonitorMap func(map[string][]*models.Trigger, chan []*models.Trigger)
17+
type ConsumeAppMonitorMap func(map[string]*models.DynamicScalingRules, chan *models.DynamicScalingRules)
1818

1919
type AppEvaluationManager struct {
2020
evaluateInterval time.Duration
2121
logger lager.Logger
2222
emClock clock.Clock
2323
doneChan chan bool
24-
triggerChan chan []*models.Trigger
24+
triggerChan chan *models.DynamicScalingRules
2525
getPolicies aggregator.GetPoliciesFunc
2626
breakerConfig config.CircuitBreakerConfig
2727
breakers map[string]*circuit.Breaker
@@ -31,7 +31,7 @@ type AppEvaluationManager struct {
3131
}
3232

3333
func NewAppEvaluationManager(logger lager.Logger, evaluateInterval time.Duration, emClock clock.Clock,
34-
triggerChan chan []*models.Trigger, getPolicies aggregator.GetPoliciesFunc,
34+
triggerChan chan *models.DynamicScalingRules, getPolicies aggregator.GetPoliciesFunc,
3535
breakerConfig config.CircuitBreakerConfig) (*AppEvaluationManager, error) {
3636
return &AppEvaluationManager{
3737
evaluateInterval: evaluateInterval,
@@ -47,11 +47,11 @@ func NewAppEvaluationManager(logger lager.Logger, evaluateInterval time.Duration
4747
}, nil
4848
}
4949

50-
func (a *AppEvaluationManager) getTriggers(policyMap map[string]*models.AppPolicy) map[string][]*models.Trigger {
50+
func (a *AppEvaluationManager) getTriggers(policyMap map[string]*models.AppPolicy) map[string]*models.DynamicScalingRules {
5151
if policyMap == nil {
5252
return nil
5353
}
54-
triggersByApp := make(map[string][]*models.Trigger)
54+
triggersByApp := make(map[string]*models.DynamicScalingRules)
5555
for appID, policy := range policyMap {
5656
now := a.emClock.Now().UnixNano()
5757
a.cooldownLock.RLock()
@@ -74,7 +74,10 @@ func (a *AppEvaluationManager) getTriggers(policyMap map[string]*models.AppPolic
7474
Adjustment: rule.Adjustment,
7575
})
7676
}
77-
triggersByApp[appID] = triggers
77+
triggersByApp[appID] = &models.DynamicScalingRules{
78+
Triggers: triggers,
79+
ScalingRuleEvaluation: policy.ScalingPolicy.ScalingRuleEvaluation,
80+
}
7881
}
7982
return triggersByApp
8083
}
@@ -122,8 +125,8 @@ func (a *AppEvaluationManager) doEvaluate() {
122125
a.breakerLock.Unlock()
123126

124127
triggers := a.getTriggers(policies)
125-
for _, triggerArray := range triggers {
126-
a.triggerChan <- triggerArray
128+
for _, dynamicScalingRules := range triggers {
129+
a.triggerChan <- dynamicScalingRules
127130
}
128131
}
129132
}

src/autoscaler/eventgenerator/generator/appEvaluationManager_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ var _ = Describe("AppEvaluationManager", func() {
2525
fclock *fakeclock.FakeClock
2626
manager *AppEvaluationManager
2727
testEvaluateInterval time.Duration
28-
triggerArrayChan chan []*models.Trigger
28+
triggerArrayChan chan *models.DynamicScalingRules
2929
testAppId1 = "testAppId1"
3030
testAppId2 = "testAppId2"
3131
testMetricName = "Test-Metric-Name"
@@ -73,7 +73,7 @@ var _ = Describe("AppEvaluationManager", func() {
7373
fclock = fakeclock.NewFakeClock(fakeTime)
7474
testEvaluateInterval = 1 * time.Second
7575
logger = lagertest.NewTestLogger("ApplicationManager-test")
76-
triggerArrayChan = make(chan []*models.Trigger, 10)
76+
triggerArrayChan = make(chan *models.DynamicScalingRules, 10)
7777
})
7878

7979
Describe("Start", func() {
@@ -103,32 +103,32 @@ var _ = Describe("AppEvaluationManager", func() {
103103

104104
It("should add triggers to evaluate", func() {
105105
fclock.Increment(10 * testEvaluateInterval)
106-
var arr []*models.Trigger
107-
var triggerArray = [][]*models.Trigger{}
106+
var arr *models.DynamicScalingRules
107+
var triggerArray = []*models.DynamicScalingRules{}
108108
Eventually(triggerArrayChan).Should(Receive(&arr))
109109
triggerArray = append(triggerArray, arr)
110110
Eventually(triggerArrayChan).Should(Receive(&arr))
111111
triggerArray = append(triggerArray, arr)
112112
Expect(triggerArray).Should(ContainElement(
113-
[]*models.Trigger{{
113+
&models.DynamicScalingRules{Triggers: []*models.Trigger{{
114114
AppId: testAppId1,
115115
MetricType: testMetricName,
116116
BreachDurationSeconds: 200,
117117
CoolDownSeconds: 200,
118118
Threshold: 80,
119119
Operator: ">=",
120120
Adjustment: "1",
121-
}}))
121+
}}}))
122122
Expect(triggerArray).Should(ContainElement(
123-
[]*models.Trigger{{
123+
&models.DynamicScalingRules{Triggers: []*models.Trigger{{
124124
AppId: testAppId2,
125125
MetricType: testMetricName,
126126
BreachDurationSeconds: 300,
127127
CoolDownSeconds: 300,
128128
Threshold: 20,
129129
Operator: "<=",
130130
Adjustment: "-1",
131-
}}))
131+
}}}))
132132
})
133133
})
134134

@@ -147,8 +147,8 @@ var _ = Describe("AppEvaluationManager", func() {
147147
})
148148

149149
It("should add triggers to evaluate after cooldown expired", func() {
150-
var arr []*models.Trigger
151-
var triggerArray = [][]*models.Trigger{}
150+
var arr *models.DynamicScalingRules
151+
var triggerArray = []*models.DynamicScalingRules{}
152152
fclock.Increment(10 * testEvaluateInterval)
153153
Consistently(triggerArrayChan).ShouldNot(Receive())
154154

@@ -160,15 +160,15 @@ var _ = Describe("AppEvaluationManager", func() {
160160

161161
triggerArray = append(triggerArray, arr)
162162
Expect(triggerArray).Should(ContainElement(
163-
[]*models.Trigger{{
163+
&models.DynamicScalingRules{Triggers: []*models.Trigger{{
164164
AppId: testAppId2,
165165
MetricType: testMetricName,
166166
BreachDurationSeconds: 300,
167167
CoolDownSeconds: 300,
168168
Threshold: 20,
169169
Operator: "<=",
170170
Adjustment: "-1",
171-
}}))
171+
}}}))
172172
})
173173
})
174174
})

src/autoscaler/eventgenerator/generator/evaluator.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9+
"slices"
910
"strconv"
1011
"time"
1112

@@ -24,15 +25,15 @@ type Evaluator struct {
2425
logger lager.Logger
2526
httpClient *http.Client
2627
scalingEngineUrl string
27-
triggerChan chan []*models.Trigger
28+
triggerChan chan *models.DynamicScalingRules
2829
doneChan chan bool
2930
defaultBreachDurationSecs int
3031
queryAppMetrics aggregator.QueryAppMetricsFunc
3132
getBreaker func(string) *circuit.Breaker
3233
setCoolDownExpired func(string, int64)
3334
}
3435

35-
func NewEvaluator(logger lager.Logger, httpClient *http.Client, scalingEngineUrl string, triggerChan chan []*models.Trigger,
36+
func NewEvaluator(logger lager.Logger, httpClient *http.Client, scalingEngineUrl string, triggerChan chan *models.DynamicScalingRules,
3637
defaultBreachDurationSecs int, queryAppMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) *Evaluator {
3738
return &Evaluator{
3839
logger: logger.Session("Evaluator"),
@@ -57,8 +58,8 @@ func (e *Evaluator) start() {
5758
select {
5859
case <-e.doneChan:
5960
return
60-
case triggerArray := <-e.triggerChan:
61-
e.doEvaluate(triggerArray)
61+
case dynamicScalingRules := <-e.triggerChan:
62+
e.doEvaluate(dynamicScalingRules)
6263
}
6364
}
6465
}
@@ -69,24 +70,45 @@ func (e *Evaluator) Stop() {
6970
e.logger.Info("stopped")
7071
}
7172

72-
func (e *Evaluator) filterOutInvalidTriggers(triggers []*models.Trigger) []*models.Trigger {
73-
result := make([]*models.Trigger, len(triggers))
74-
for _, trigger := range triggers {
73+
func (e *Evaluator) filterOutInvalidTriggers(rules *models.DynamicScalingRules) {
74+
validTriggers := rules.Triggers[:0]
75+
for _, trigger := range rules.Triggers {
7576
if trigger.BreachDurationSeconds <= 0 {
7677
trigger.BreachDurationSeconds = e.defaultBreachDurationSecs
7778
}
7879
if !e.hasValidOperator(trigger) {
7980
e.logger.Error("operator-is-invalid", nil, lager.Data{"trigger": trigger})
8081
continue
8182
}
82-
result = append(result, trigger)
83+
validTriggers = append(validTriggers, trigger)
8384
}
84-
return result
85+
rules.Triggers = validTriggers
8586
}
8687

87-
func (e *Evaluator) doEvaluate(triggerArray []*models.Trigger) {
88-
triggers := e.filterOutInvalidTriggers(triggerArray)
89-
for _, trigger := range triggers {
88+
func (e *Evaluator) doEvaluate(rules *models.DynamicScalingRules) {
89+
e.filterOutInvalidTriggers(rules)
90+
if rules.ScalingRuleEvaluation == models.BiasedToScaleOut {
91+
slices.SortStableFunc(rules.Triggers, func(a, b *models.Trigger) int {
92+
adj1, err1 := models.ParseAdjustment(a.Adjustment)
93+
if err1 != nil {
94+
e.logger.Error("do-evaluate", err1)
95+
}
96+
97+
adj2, err2 := models.ParseAdjustment(b.Adjustment)
98+
if err2 != nil {
99+
e.logger.Error("do-evaluate", err2)
100+
}
101+
102+
// sort in descending order, so that the biggest scale out adjustment is evaluated first
103+
return -models.CompareAdjustments(adj1, adj2)
104+
})
105+
}
106+
107+
e.evaluateFirstMatching(rules)
108+
}
109+
110+
func (e *Evaluator) evaluateFirstMatching(rules *models.DynamicScalingRules) {
111+
for _, trigger := range rules.Triggers {
90112
appMetricList, err := e.retrieveAppMetrics(trigger)
91113
if err != nil {
92114
continue

0 commit comments

Comments
 (0)