Skip to content

bias to scaleout #3676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/autoscaler/api/policyvalidator/policy_json.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
"type": "integer",
"title": "Maximum how many instances of application can be provisioned as part of application scaling"
},
"scaling_rule_evaluation": {
"type": "string",
"enum": ["first_matching", "biased_to_scale_out"],
"default": "first_matching",
"title": "The Scaling Rule Evaluation Schema"
},
"scaling_rules": {
"$id": "#/properties/scaling_rules",
"type": "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("MetricPoller", func() {
BeforeEach(func() {
metricFetcher, err := metric.NewLogCacheFetcherFactory(metric.StandardLogCacheFetcherCreator).CreateFetcher(logger, config.Config{
MetricCollector: config.MetricCollectorConfig{
MetricCollectorURL: "this.endpoint.does.not.exist:1234",
MetricCollectorURL: "this.endpoint.is.invalid:1234",
},
})
Expect(err).ToNot(HaveOccurred())
Expand Down
4 changes: 2 additions & 2 deletions src/autoscaler/eventgenerator/cmd/eventgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {

appManager := aggregator.NewAppManager(logger, egClock, conf.Aggregator.PolicyPollerInterval, len(conf.Server.NodeAddrs), conf.Server.NodeIndex, conf.Aggregator.MetricCacheSizePerApp, policyDb, appMetricDB)

triggersChan := make(chan []*models.Trigger, conf.Evaluator.TriggerArrayChannelSize)
triggersChan := make(chan *models.DynamicScalingRules, conf.Evaluator.TriggerArrayChannelSize)

evaluationManager, err := generator.NewAppEvaluationManager(logger, conf.Evaluator.EvaluationManagerInterval, egClock, triggersChan, appManager.GetPolicies, conf.CircuitBreaker)
if err != nil {
Expand Down Expand Up @@ -190,7 +190,7 @@ func loadConfig(path string) (*config.Config, error) {
return conf, nil
}

func createEvaluators(logger lager.Logger, conf *config.Config, triggersChan chan []*models.Trigger, queryMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) ([]*generator.Evaluator, error) {
func createEvaluators(logger lager.Logger, conf *config.Config, triggersChan chan *models.DynamicScalingRules, queryMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) ([]*generator.Evaluator, error) {
count := conf.Evaluator.EvaluatorCount

seClient, err := helpers.CreateHTTPSClient(&conf.ScalingEngine.TLSClientCerts, helpers.DefaultClientConfig(), logger.Session("scaling_client"))
Expand Down
19 changes: 11 additions & 8 deletions src/autoscaler/eventgenerator/generator/appEvaluationManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
circuit "github.com/rubyist/circuitbreaker"
)

type ConsumeAppMonitorMap func(map[string][]*models.Trigger, chan []*models.Trigger)
type ConsumeAppMonitorMap func(map[string]*models.DynamicScalingRules, chan *models.DynamicScalingRules)

type AppEvaluationManager struct {
evaluateInterval time.Duration
logger lager.Logger
emClock clock.Clock
doneChan chan bool
triggerChan chan []*models.Trigger
triggerChan chan *models.DynamicScalingRules
getPolicies aggregator.GetPoliciesFunc
breakerConfig config.CircuitBreakerConfig
breakers map[string]*circuit.Breaker
Expand All @@ -31,7 +31,7 @@ type AppEvaluationManager struct {
}

func NewAppEvaluationManager(logger lager.Logger, evaluateInterval time.Duration, emClock clock.Clock,
triggerChan chan []*models.Trigger, getPolicies aggregator.GetPoliciesFunc,
triggerChan chan *models.DynamicScalingRules, getPolicies aggregator.GetPoliciesFunc,
breakerConfig config.CircuitBreakerConfig) (*AppEvaluationManager, error) {
return &AppEvaluationManager{
evaluateInterval: evaluateInterval,
Expand All @@ -47,11 +47,11 @@ func NewAppEvaluationManager(logger lager.Logger, evaluateInterval time.Duration
}, nil
}

func (a *AppEvaluationManager) getTriggers(policyMap map[string]*models.AppPolicy) map[string][]*models.Trigger {
func (a *AppEvaluationManager) getTriggers(policyMap map[string]*models.AppPolicy) map[string]*models.DynamicScalingRules {
if policyMap == nil {
return nil
}
triggersByApp := make(map[string][]*models.Trigger)
triggersByApp := make(map[string]*models.DynamicScalingRules)
for appID, policy := range policyMap {
now := a.emClock.Now().UnixNano()
a.cooldownLock.RLock()
Expand All @@ -74,7 +74,10 @@ func (a *AppEvaluationManager) getTriggers(policyMap map[string]*models.AppPolic
Adjustment: rule.Adjustment,
})
}
triggersByApp[appID] = triggers
triggersByApp[appID] = &models.DynamicScalingRules{
Triggers: triggers,
ScalingRuleEvaluation: policy.ScalingPolicy.ScalingRuleEvaluation,
}
}
return triggersByApp
}
Expand Down Expand Up @@ -122,8 +125,8 @@ func (a *AppEvaluationManager) doEvaluate() {
a.breakerLock.Unlock()

triggers := a.getTriggers(policies)
for _, triggerArray := range triggers {
a.triggerChan <- triggerArray
for _, dynamicScalingRules := range triggers {
a.triggerChan <- dynamicScalingRules
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("AppEvaluationManager", func() {
fclock *fakeclock.FakeClock
manager *AppEvaluationManager
testEvaluateInterval time.Duration
triggerArrayChan chan []*models.Trigger
triggerArrayChan chan *models.DynamicScalingRules
testAppId1 = "testAppId1"
testAppId2 = "testAppId2"
testMetricName = "Test-Metric-Name"
Expand Down Expand Up @@ -73,7 +73,7 @@ var _ = Describe("AppEvaluationManager", func() {
fclock = fakeclock.NewFakeClock(fakeTime)
testEvaluateInterval = 1 * time.Second
logger = lagertest.NewTestLogger("ApplicationManager-test")
triggerArrayChan = make(chan []*models.Trigger, 10)
triggerArrayChan = make(chan *models.DynamicScalingRules, 10)
})

Describe("Start", func() {
Expand Down Expand Up @@ -103,32 +103,32 @@ var _ = Describe("AppEvaluationManager", func() {

It("should add triggers to evaluate", func() {
fclock.Increment(10 * testEvaluateInterval)
var arr []*models.Trigger
var triggerArray = [][]*models.Trigger{}
var arr *models.DynamicScalingRules
var triggerArray = []*models.DynamicScalingRules{}
Eventually(triggerArrayChan).Should(Receive(&arr))
triggerArray = append(triggerArray, arr)
Eventually(triggerArrayChan).Should(Receive(&arr))
triggerArray = append(triggerArray, arr)
Expect(triggerArray).Should(ContainElement(
[]*models.Trigger{{
&models.DynamicScalingRules{Triggers: []*models.Trigger{{
AppId: testAppId1,
MetricType: testMetricName,
BreachDurationSeconds: 200,
CoolDownSeconds: 200,
Threshold: 80,
Operator: ">=",
Adjustment: "1",
}}))
}}}))
Expect(triggerArray).Should(ContainElement(
[]*models.Trigger{{
&models.DynamicScalingRules{Triggers: []*models.Trigger{{
AppId: testAppId2,
MetricType: testMetricName,
BreachDurationSeconds: 300,
CoolDownSeconds: 300,
Threshold: 20,
Operator: "<=",
Adjustment: "-1",
}}))
}}}))
})
})

Expand All @@ -147,8 +147,8 @@ var _ = Describe("AppEvaluationManager", func() {
})

It("should add triggers to evaluate after cooldown expired", func() {
var arr []*models.Trigger
var triggerArray = [][]*models.Trigger{}
var arr *models.DynamicScalingRules
var triggerArray = []*models.DynamicScalingRules{}
fclock.Increment(10 * testEvaluateInterval)
Consistently(triggerArrayChan).ShouldNot(Receive())

Expand All @@ -160,15 +160,15 @@ var _ = Describe("AppEvaluationManager", func() {

triggerArray = append(triggerArray, arr)
Expect(triggerArray).Should(ContainElement(
[]*models.Trigger{{
&models.DynamicScalingRules{Triggers: []*models.Trigger{{
AppId: testAppId2,
MetricType: testMetricName,
BreachDurationSeconds: 300,
CoolDownSeconds: 300,
Threshold: 20,
Operator: "<=",
Adjustment: "-1",
}}))
}}}))
})
})
})
Expand Down
110 changes: 71 additions & 39 deletions src/autoscaler/eventgenerator/generator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"slices"
"strconv"
"time"

Expand All @@ -24,15 +25,15 @@ type Evaluator struct {
logger lager.Logger
httpClient *http.Client
scalingEngineUrl string
triggerChan chan []*models.Trigger
triggerChan chan *models.DynamicScalingRules
doneChan chan bool
defaultBreachDurationSecs int
queryAppMetrics aggregator.QueryAppMetricsFunc
getBreaker func(string) *circuit.Breaker
setCoolDownExpired func(string, int64)
}

func NewEvaluator(logger lager.Logger, httpClient *http.Client, scalingEngineUrl string, triggerChan chan []*models.Trigger,
func NewEvaluator(logger lager.Logger, httpClient *http.Client, scalingEngineUrl string, triggerChan chan *models.DynamicScalingRules,
defaultBreachDurationSecs int, queryAppMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) *Evaluator {
return &Evaluator{
logger: logger.Session("Evaluator"),
Expand All @@ -57,8 +58,8 @@ func (e *Evaluator) start() {
select {
case <-e.doneChan:
return
case triggerArray := <-e.triggerChan:
e.doEvaluate(triggerArray)
case dynamicScalingRules := <-e.triggerChan:
e.doEvaluate(dynamicScalingRules)
}
}
}
Expand All @@ -69,18 +70,45 @@ func (e *Evaluator) Stop() {
e.logger.Info("stopped")
}

func (e *Evaluator) doEvaluate(triggerArray []*models.Trigger) {
for _, trigger := range triggerArray {
func (e *Evaluator) filterOutInvalidTriggers(rules *models.DynamicScalingRules) {
validTriggers := rules.Triggers[:0]
for _, trigger := range rules.Triggers {
if trigger.BreachDurationSeconds <= 0 {
trigger.BreachDurationSeconds = e.defaultBreachDurationSecs
}
threshold := trigger.Threshold
operator := trigger.Operator
if !e.isValidOperator(operator) {
if !e.hasValidOperator(trigger) {
e.logger.Error("operator-is-invalid", nil, lager.Data{"trigger": trigger})
continue
}
validTriggers = append(validTriggers, trigger)
}
rules.Triggers = validTriggers
}

func (e *Evaluator) doEvaluate(rules *models.DynamicScalingRules) {
e.filterOutInvalidTriggers(rules)
if rules.ScalingRuleEvaluation == models.BiasedToScaleOut {
slices.SortStableFunc(rules.Triggers, func(a, b *models.Trigger) int {
adj1, err1 := models.ParseAdjustment(a.Adjustment)
if err1 != nil {
e.logger.Error("do-evaluate", err1)
}

adj2, err2 := models.ParseAdjustment(b.Adjustment)
if err2 != nil {
e.logger.Error("do-evaluate", err2)
}

// sort in descending order, so that the biggest scale out adjustment is evaluated first
return -models.CompareAdjustments(adj1, adj2)
})
}

e.evaluateFirstMatching(rules)
}

func (e *Evaluator) evaluateFirstMatching(rules *models.DynamicScalingRules) {
for _, trigger := range rules.Triggers {
appMetricList, err := e.retrieveAppMetrics(trigger)
if err != nil {
continue
Expand All @@ -90,66 +118,70 @@ func (e *Evaluator) doEvaluate(triggerArray []*models.Trigger) {
continue
}

isBreached, appMetric := checkForBreach(appMetricList, e, trigger, operator, threshold)

if isBreached {
if e.isBreached(trigger, appMetricList) {
trigger.MetricUnit = appMetricList[0].Unit
e.logger.Info("send trigger alarm to scaling engine", lager.Data{"trigger": trigger, "last_metric": appMetric})

if appBreaker := e.getBreaker(trigger.AppId); appBreaker != nil {
if appBreaker.Tripped() {
e.logger.Info("circuit-tripped", lager.Data{"appId": trigger.AppId, "consecutiveFailures": appBreaker.ConsecFailures()})
}
err = appBreaker.Call(func() error { return e.sendTriggerAlarm(trigger) }, 0)
if err != nil {
e.logger.Error("circuit-alarm-failed", err, lager.Data{"appId": trigger.AppId})
}
} else {
err = e.sendTriggerAlarm(trigger)
if err != nil {
e.logger.Error("circuit-alarm-failed", err, lager.Data{"appId": trigger.AppId})
}
}
e.logger.Info("send trigger alarm to scaling engine", lager.Data{"trigger": trigger, "last_metric": appMetricList[len(appMetricList)-1]})
e.sendToScalingEngine(trigger)
return
}
}
}

func checkForBreach(appMetricList []*models.AppMetric, e *Evaluator, trigger *models.Trigger, operator string, threshold int64) (bool, *models.AppMetric) {
func (e *Evaluator) sendToScalingEngine(trigger *models.Trigger) {
if appBreaker := e.getBreaker(trigger.AppId); appBreaker != nil {
if appBreaker.Tripped() {
e.logger.Info("circuit-tripped", lager.Data{"appId": trigger.AppId, "consecutiveFailures": appBreaker.ConsecFailures()})
}
err := appBreaker.Call(func() error { return e.sendTriggerAlarm(trigger) }, 0)
if err != nil {
e.logger.Error("circuit-alarm-failed", err, lager.Data{"appId": trigger.AppId})
}
} else {
err := e.sendTriggerAlarm(trigger)
if err != nil {
e.logger.Error("circuit-alarm-failed", err, lager.Data{"appId": trigger.AppId})
}
}
}

func (e *Evaluator) isBreached(trigger *models.Trigger, appMetricList []*models.AppMetric) bool {
operator := trigger.Operator
threshold := trigger.Threshold

var appMetric *models.AppMetric
for _, appMetric = range appMetricList {
if appMetric.Value == "" {
e.logger.Debug("should not send trigger alarm to scaling engine because there is empty value metric", lager.Data{"trigger": trigger, "appMetric": appMetric})
return false, appMetric
return false
}
value, err := strconv.ParseInt(appMetric.Value, 10, 64)
if err != nil {
e.logger.Debug("should not send trigger alarm to scaling engine because parse metric value fails", lager.Data{"trigger": trigger, "appMetric": appMetric})
return false, appMetric
return false
}
if operator == ">" {
if value <= threshold {
e.logger.Debug("should not send trigger alarm to scaling engine", lager.Data{"trigger": trigger, "appMetric": appMetric})
return false, appMetric
return false
}
} else if operator == ">=" {
if value < threshold {
e.logger.Debug("should not send trigger alarm to scaling engine", lager.Data{"trigger": trigger, "appMetric": appMetric})
return false, appMetric
return false
}
} else if operator == "<" {
if value >= threshold {
e.logger.Debug("should not send trigger alarm to scaling engine", lager.Data{"trigger": trigger, "appMetric": appMetric})
return false, appMetric
return false
}
} else if operator == "<=" {
if value > threshold {
e.logger.Debug("should not send trigger alarm to scaling engine", lager.Data{"trigger": trigger, "appMetric": appMetric})
return false, appMetric
return false
}
}
}
return true, appMetric
return true
}

func (e *Evaluator) retrieveAppMetrics(trigger *models.Trigger) ([]*models.AppMetric, error) {
Expand Down Expand Up @@ -202,7 +234,7 @@ func (e *Evaluator) sendTriggerAlarm(trigger *models.Trigger) error {
return err
}

defer resp.Body.Close()
defer func() { _ = resp.Body.Close() }()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -227,9 +259,9 @@ func (e *Evaluator) sendTriggerAlarm(trigger *models.Trigger) error {
return err
}

func (e *Evaluator) isValidOperator(operator string) bool {
func (e *Evaluator) hasValidOperator(trigger *models.Trigger) bool {
for _, o := range validOperators {
if o == operator {
if o == trigger.Operator {
return true
}
}
Expand Down
Loading
Loading