Skip to content

Commit 69f7e3f

Browse files
committed
bug fix on get auto config hint
1 parent 009c352 commit 69f7e3f

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

internal/internal_worker_base.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -425,12 +425,12 @@ func (bw *baseWorker) Stop() {
425425

426426
func getAutoConfigHint(task interface{}) *shared.AutoConfigHint {
427427
switch t := task.(type) {
428-
case workflowTask:
428+
case *workflowTask:
429429
if t.task == nil {
430430
return nil
431431
}
432432
return t.task.AutoConfigHint
433-
case activityTask:
433+
case *activityTask:
434434
if t.task == nil {
435435
return nil
436436
}

internal/internal_worker_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -1498,3 +1498,39 @@ func TestTestValidateFnFormat_Workflow(t *testing.T) {
14981498
})
14991499
}
15001500
}
1501+
1502+
func TestGetTaskAutoConfigHint(t *testing.T) {
1503+
1504+
hint := shared.AutoConfigHint{
1505+
EnableAutoConfig: common.BoolPtr(true),
1506+
PollerWaitTimeInMs: common.Int64Ptr(100),
1507+
}
1508+
1509+
for _, tt := range []struct {
1510+
name string
1511+
task interface{}
1512+
want *shared.AutoConfigHint
1513+
}{
1514+
{
1515+
"decision task",
1516+
&workflowTask{
1517+
&shared.PollForDecisionTaskResponse{AutoConfigHint: &hint}, nil, nil, nil},
1518+
&hint,
1519+
},
1520+
{
1521+
"activity task",
1522+
&activityTask{
1523+
&shared.PollForActivityTaskResponse{AutoConfigHint: &hint}, time.Now()},
1524+
&hint,
1525+
},
1526+
{
1527+
"localactivity task",
1528+
&localActivityTask{},
1529+
nil,
1530+
},
1531+
} {
1532+
t.Run(tt.name, func(t *testing.T) {
1533+
assert.Equal(t, tt.want, getAutoConfigHint(tt.task))
1534+
})
1535+
}
1536+
}

internal/worker/concurrency_auto_scaler.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
)
3535

3636
const (
37-
defaultAutoScalerUpdateTick = time.Second
37+
defaultAutoScalerUpdateTick = time.Second
3838
targetPollerWaitTimeInMsLog2 = 4 // 16 ms
3939
numberOfPollsInRollingAverage = 20
4040

@@ -50,9 +50,9 @@ const (
5050
autoScalerEventLogMsg string = "concurrency auto scaler event"
5151
testTimeFormat string = "15:04:05"
5252

53-
metricsEnabled = "enabled"
54-
metricsDisabled = "disabled"
55-
metricsPollerQuota = "poller-quota"
53+
metricsEnabled = "enabled"
54+
metricsDisabled = "disabled"
55+
metricsPollerQuota = "poller-quota"
5656
metricsPollerWaitTime = "poller-wait-time"
5757
)
5858

@@ -69,14 +69,14 @@ type (
6969
updateTick time.Duration
7070

7171
// state of autoscaler
72-
lock sync.RWMutex
72+
lock sync.RWMutex
7373
enabled bool
7474

7575
// poller
7676
pollerInitCount int
7777
pollerMaxCount int
7878
pollerMinCount int
79-
pollerWaitTime *rollingAverage[time.Duration]
79+
pollerWaitTime *rollingAverage[time.Duration]
8080
pollerPermitLastUpdate time.Time
8181
}
8282

@@ -111,7 +111,7 @@ func NewConcurrencyAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAuto
111111
pollerInitCount: input.Concurrency.PollerPermit.Quota(),
112112
pollerMaxCount: input.PollerMaxCount,
113113
pollerMinCount: input.PollerMinCount,
114-
pollerWaitTime: newRollingAverage[time.Duration](numberOfPollsInRollingAverage),
114+
pollerWaitTime: newRollingAverage[time.Duration](numberOfPollsInRollingAverage),
115115
pollerPermitLastUpdate: input.Clock.Now(),
116116
}
117117
}
@@ -157,7 +157,7 @@ func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) {
157157
}
158158
if hint.PollerWaitTimeInMs != nil {
159159
waitTimeInMs := *hint.PollerWaitTimeInMs
160-
c.pollerWaitTime.Add(time.Millisecond*time.Duration(waitTimeInMs))
160+
c.pollerWaitTime.Add(time.Millisecond * time.Duration(waitTimeInMs))
161161
}
162162

163163
var shouldEnable bool
@@ -209,7 +209,7 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
209209
currentQuota := c.concurrency.PollerPermit.Quota()
210210
// smoothing the scaling through log2
211211
newQuota := int(math.Round(float64(currentQuota) * targetPollerWaitTimeInMsLog2 / math.Log2(
212-
1+float64(c.pollerWaitTime.Average()/time.Millisecond)) ))
212+
1+float64(c.pollerWaitTime.Average()/time.Millisecond))))
213213
if newQuota < c.pollerMinCount {
214214
newQuota = c.pollerMinCount
215215
}

0 commit comments

Comments
 (0)