Skip to content

Commit 009c352

Browse files
committed
use lock instead for transactions
1 parent 334c86f commit 009c352

File tree

3 files changed

+52
-38
lines changed

3 files changed

+52
-38
lines changed

Diff for: internal/common/metrics/constants.go

+3
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,7 @@ const (
114114
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
115115
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"
116116
PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage"
117+
118+
// Concurrency Auto Scaler
119+
ConcurrencyAutoScalerScope = CadenceMetricsPrefix + "concurrency-auto-scaler"
117120
)

Diff for: internal/worker/concurrency_auto_scaler.go

+47-36
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,18 @@ package worker
2323
import (
2424
"math"
2525
"sync"
26-
"sync/atomic"
2726
"time"
2827

2928
"github.com/jonboulle/clockwork"
3029
"github.com/uber-go/tally"
3130
"go.uber.org/zap"
3231

3332
"go.uber.org/cadence/.gen/go/shared"
33+
"go.uber.org/cadence/internal/common/metrics"
3434
)
3535

3636
const (
3737
defaultAutoScalerUpdateTick = time.Second
38-
// concurrencyAutoScalerObservabilityTick = time.Millisecond * 500
3938
targetPollerWaitTimeInMsLog2 = 4 // 16 ms
4039
numberOfPollsInRollingAverage = 20
4140

@@ -50,6 +49,11 @@ const (
5049
autoScalerEventStop = "stop"
5150
autoScalerEventLogMsg string = "concurrency auto scaler event"
5251
testTimeFormat string = "15:04:05"
52+
53+
metricsEnabled = "enabled"
54+
metricsDisabled = "disabled"
55+
metricsPollerQuota = "poller-quota"
56+
metricsPollerWaitTime = "poller-wait-time"
5357
)
5458

5559
type (
@@ -64,14 +68,15 @@ type (
6468
cooldown time.Duration
6569
updateTick time.Duration
6670

67-
// enable auto scaler on concurrency or not
68-
enable atomic.Bool
71+
// state of autoscaler
72+
lock sync.RWMutex
73+
enabled bool
6974

7075
// poller
7176
pollerInitCount int
7277
pollerMaxCount int
7378
pollerMinCount int
74-
pollerWaitTimeInMsLog2 *rollingAverage // log2(pollerWaitTimeInMs+1) for smoothing (ideal value is 0)
79+
pollerWaitTime *rollingAverage[time.Duration]
7580
pollerPermitLastUpdate time.Time
7681
}
7782

@@ -98,15 +103,15 @@ func NewConcurrencyAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAuto
98103
shutdownChan: make(chan struct{}),
99104
concurrency: input.Concurrency,
100105
cooldown: input.Cooldown,
101-
log: input.Logger,
102-
scope: input.Scope,
106+
log: input.Logger.Named(metrics.ConcurrencyAutoScalerScope),
107+
scope: input.Scope.SubScope(metrics.ConcurrencyAutoScalerScope),
103108
clock: input.Clock,
104109
updateTick: tick,
105-
enable: atomic.Bool{}, // initial value should be false and is only turned on from auto config hint
110+
enabled: false, // initial value should be false and is only turned on from auto config hint
106111
pollerInitCount: input.Concurrency.PollerPermit.Quota(),
107112
pollerMaxCount: input.PollerMaxCount,
108113
pollerMinCount: input.PollerMinCount,
109-
pollerWaitTimeInMsLog2: newRollingAverage(numberOfPollsInRollingAverage),
114+
pollerWaitTime: newRollingAverage[time.Duration](numberOfPollsInRollingAverage),
110115
pollerPermitLastUpdate: input.Clock.Now(),
111116
}
112117
}
@@ -126,7 +131,9 @@ func (c *ConcurrencyAutoScaler) Start() {
126131
return
127132
case <-ticker.Chan():
128133
c.logEvent(autoScalerEventMetrics)
134+
c.lock.Lock()
129135
c.updatePollerPermit()
136+
c.lock.Unlock()
130137
}
131138
}
132139
}()
@@ -138,27 +145,27 @@ func (c *ConcurrencyAutoScaler) Stop() {
138145
c.logEvent(autoScalerEventStop)
139146
}
140147

141-
// ProcessPollerHint reads the poller response hint and take actions
148+
// ProcessPollerHint reads the poller response hint and take actions in a transactional way
142149
// 1. update poller wait time
143150
// 2. enable/disable auto scaler
144151
func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) {
152+
c.lock.Lock()
153+
defer c.lock.Unlock()
154+
145155
if hint == nil {
146-
c.log.Warn("auto config hint is nil, this results in no action")
147156
return
148157
}
149158
if hint.PollerWaitTimeInMs != nil {
150159
waitTimeInMs := *hint.PollerWaitTimeInMs
151-
c.pollerWaitTimeInMsLog2.Add(math.Log2(float64(waitTimeInMs + 1)))
160+
c.pollerWaitTime.Add(time.Millisecond*time.Duration(waitTimeInMs))
152161
}
153162

154-
/*
155-
Atomically compare and switch the auto scaler enable flag. If auto scaler is turned off, IMMEDIATELY reset the concurrency limits.
156-
*/
157163
var shouldEnable bool
158164
if hint.EnableAutoConfig != nil && *hint.EnableAutoConfig {
159165
shouldEnable = true
160166
}
161-
if switched := c.enable.CompareAndSwap(!shouldEnable, shouldEnable); switched {
167+
if shouldEnable != c.enabled { // flag switched
168+
c.enabled = shouldEnable
162169
if shouldEnable {
163170
c.logEvent(autoScalerEventEnable)
164171
} else {
@@ -174,25 +181,23 @@ func (c *ConcurrencyAutoScaler) resetConcurrency() {
174181
}
175182

176183
func (c *ConcurrencyAutoScaler) logEvent(event autoScalerEvent) {
177-
if c.enable.Load() {
178-
c.scope.Counter("concurrency_auto_scaler.enabled").Inc(1)
184+
if c.enabled {
185+
c.scope.Counter(metricsEnabled).Inc(1)
179186
} else {
180-
c.scope.Counter("concurrency_auto_scaler.disabled").Inc(1)
187+
c.scope.Counter(metricsDisabled).Inc(1)
181188
}
182-
c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Count()))
183-
c.scope.Gauge("poller_quota").Update(float64(c.concurrency.PollerPermit.Quota()))
184-
c.scope.Gauge("poller_wait_time").Update(math.Exp2(c.pollerWaitTimeInMsLog2.Average()))
189+
c.scope.Gauge(metricsPollerQuota).Update(float64(c.concurrency.PollerPermit.Quota()))
190+
c.scope.Timer(metricsPollerWaitTime).Record(c.pollerWaitTime.Average())
185191
c.log.Debug(autoScalerEventLogMsg,
186192
zap.Time("time", c.clock.Now()),
187193
zap.String("event", string(event)),
188-
zap.Bool("enabled", c.enable.Load()),
194+
zap.Bool("enabled", c.enabled),
189195
zap.Int("poller_quota", c.concurrency.PollerPermit.Quota()),
190-
zap.Int("poller_in_action", c.concurrency.PollerPermit.Count()),
191196
)
192197
}
193198

194199
func (c *ConcurrencyAutoScaler) updatePollerPermit() {
195-
if !c.enable.Load() { // skip update if auto scaler is disabled
200+
if !c.enabled { // skip update if auto scaler is disabled
196201
c.logEvent(autoScalerEventPollerSkipUpdateNotEnabled)
197202
return
198203
}
@@ -202,7 +207,9 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
202207
return
203208
}
204209
currentQuota := c.concurrency.PollerPermit.Quota()
205-
newQuota := int(math.Round(float64(currentQuota) * targetPollerWaitTimeInMsLog2 / c.pollerWaitTimeInMsLog2.Average() ))
210+
// smoothing the scaling through log2
211+
newQuota := int(math.Round(float64(currentQuota) * targetPollerWaitTimeInMsLog2 / math.Log2(
212+
1+float64(c.pollerWaitTime.Average()/time.Millisecond)) ))
206213
if newQuota < c.pollerMinCount {
207214
newQuota = c.pollerMinCount
208215
}
@@ -218,22 +225,26 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
218225
c.logEvent(autoScalerEventPollerUpdate)
219226
}
220227

221-
type rollingAverage struct {
228+
type number interface {
229+
int64 | float64 | time.Duration
230+
}
231+
232+
type rollingAverage[T number] struct {
222233
mu sync.RWMutex
223-
window []float64
234+
window []T
224235
index int
225-
sum float64
236+
sum T
226237
count int
227238
}
228239

229-
func newRollingAverage(capacity int) *rollingAverage {
230-
return &rollingAverage{
231-
window: make([]float64, capacity),
240+
func newRollingAverage[T number](capacity int) *rollingAverage[T] {
241+
return &rollingAverage[T]{
242+
window: make([]T, capacity),
232243
}
233244
}
234245

235246
// Add always add positive numbers
236-
func (r *rollingAverage) Add(value float64) {
247+
func (r *rollingAverage[T]) Add(value T) {
237248
r.mu.Lock()
238249
defer r.mu.Unlock()
239250

@@ -243,21 +254,21 @@ func (r *rollingAverage) Add(value float64) {
243254
}
244255

245256
// replace the old value with the new value
246-
r.index %= len(r.window)
247257
r.sum += value - r.window[r.index]
248258
r.window[r.index] = value
249259
r.index++
260+
r.index %= len(r.window)
250261

251262
if r.count < len(r.window) {
252263
r.count++
253264
}
254265
}
255266

256-
func (r *rollingAverage) Average() float64 {
267+
func (r *rollingAverage[T]) Average() T {
257268
r.mu.RLock()
258269
defer r.mu.RUnlock()
259270
if r.count == 0 {
260271
return 0
261272
}
262-
return r.sum / float64(r.count)
273+
return r.sum / T(r.count)
263274
}

Diff for: internal/worker/concurrency_auto_scaler_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func TestRollingAverage(t *testing.T) {
290290
} {
291291
t.Run(tt.name, func(t *testing.T) {
292292
defer goleak.VerifyNone(t)
293-
r := newRollingAverage(tt.cap)
293+
r := newRollingAverage[float64](tt.cap)
294294
for i := range tt.input {
295295
r.Add(tt.input[i])
296296
assert.Equal(t, tt.expected[i], r.Average())
@@ -301,7 +301,7 @@ func TestRollingAverage(t *testing.T) {
301301

302302
func TestRollingAverage_Race(t *testing.T) {
303303
total := 100000
304-
r := newRollingAverage(total)
304+
r := newRollingAverage[float64](total)
305305
trueSum := atomic.NewFloat64(0)
306306
var wg sync.WaitGroup
307307
for i := 0; i < total; i++ {

0 commit comments

Comments
 (0)