-
Notifications
You must be signed in to change notification settings - Fork 138
Create separate worker usage data collection and move hardware emit there #1293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
f02ecae
e57ce8d
e8abca3
51cb988
4d28be1
ffd2d75
21fe267
51f7207
e304034
24b4a84
a89107f
a9c526f
fa1e190
3628eb9
96e4267
5092606
1a52b18
7f8a165
d0dac1c
64cddbc
cde3ba4
822564b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"github.com/shirou/gopsutil/cpu" | ||
"github.com/uber-go/tally" | ||
"go.uber.org/cadence/internal/common/metrics" | ||
"go.uber.org/zap" | ||
"runtime" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type ( | ||
workerUsageCollector struct { | ||
workerType string | ||
cooldownTime time.Duration | ||
logger *zap.Logger | ||
ctx context.Context | ||
wg *sync.WaitGroup // graceful stop | ||
cancel context.CancelFunc | ||
metricsScope tally.Scope | ||
} | ||
|
||
workerUsageCollectorOptions struct { | ||
Enabled bool | ||
Cooldown time.Duration | ||
MetricsScope tally.Scope | ||
WorkerType string | ||
} | ||
|
||
hardwareUsage struct { | ||
NumCPUCores int | ||
CPUPercent float64 | ||
NumGoRoutines int | ||
TotalMemory float64 | ||
MemoryUsedHeap float64 | ||
MemoryUsedStack float64 | ||
} | ||
) | ||
|
||
func newWorkerUsageCollector( | ||
options workerUsageCollectorOptions, | ||
logger *zap.Logger, | ||
) *workerUsageCollector { | ||
if !options.Enabled { | ||
return nil | ||
} | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
return &workerUsageCollector{ | ||
workerType: options.WorkerType, | ||
cooldownTime: options.Cooldown, | ||
metricsScope: options.MetricsScope, | ||
logger: logger, | ||
ctx: ctx, | ||
cancel: cancel, | ||
wg: &sync.WaitGroup{}, | ||
} | ||
} | ||
|
||
func (w *workerUsageCollector) Start() { | ||
w.wg.Add(1) | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to spawn a goroutine per worker? Why not ensure only 1 running? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only the hardware emitting is once per host, all other metrics will be worker-specific. (e.g activity poll response vs. decision poll response) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now I see only w.collectHardwareUsage() which will just spawn bunch of data into the same scope. I would suggest separating hardware emitter and worker specific metrics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's current design, for each type of metrics based on their origin, I will create a separate gorountine for each of them. But they would be contained under a single workerusagecollector so that their result can be collected and sent in one place |
||
defer func() { | ||
if p := recover(); p != nil { | ||
w.logger.Error("Unhandled panic in workerUsageCollector.") | ||
w.logger.Error(p.(error).Error()) | ||
timl3136 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}() | ||
defer w.wg.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are a few things problematic about this goroutine closure
|
||
ticker := time.NewTicker(w.cooldownTime) | ||
timl3136 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for { | ||
select { | ||
case <-w.ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
// Given that decision worker and activity worker are running in the same host, we only need to collect | ||
// hardware usage from one of them. | ||
if w.workerType == "DecisionWorker" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this might not be future proof and also if customer is running separate processes for decision and activity workers then we will not have the hardware usage of those hosts that only runs activity workers. we should also not create no-op There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We tried Sync.Once before, but that would cause issues with unit testing as it will just wait indefinitely for this routine to stop while blocking all other goroutine from closing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can override it in the unit tests
in typical startup this would be set to
in test code you can initialize this to a fake implementation
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your suggestion. I have implemented that in the latest commit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In our usecase, only the hardware info are once per host collected. Other worker type (decision worker and activity worker) should have different workerUsageCollector as they track different task type behaviors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what type of information are you planning to collect per worker basis in this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tasklist backlog/poll response since decision and activity worker have their own pollers and that need to be scaled independently |
||
hardwareUsageData := w.collectHardwareUsage() | ||
if w.metricsScope != nil { | ||
w.emitHardwareUsage(hardwareUsageData) | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
return | ||
} | ||
|
||
func (w *workerUsageCollector) Stop() { | ||
w.cancel() | ||
w.wg.Wait() | ||
} | ||
|
||
func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { | ||
cpuPercent, err := cpu.Percent(0, false) | ||
if err != nil { | ||
w.logger.Warn("Failed to get cpu percent", zap.Error(err)) | ||
} | ||
cpuCores, err := cpu.Counts(false) | ||
if err != nil { | ||
w.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) | ||
} | ||
|
||
var memStats runtime.MemStats | ||
runtime.ReadMemStats(&memStats) | ||
return hardwareUsage{ | ||
NumCPUCores: cpuCores, | ||
CPUPercent: cpuPercent[0], | ||
NumGoRoutines: runtime.NumGoroutine(), | ||
TotalMemory: float64(memStats.Sys), | ||
MemoryUsedHeap: float64(memStats.HeapAlloc), | ||
MemoryUsedStack: float64(memStats.StackInuse), | ||
} | ||
} | ||
|
||
// emitHardwareUsage emits collected hardware usage metrics to metrics scope | ||
func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { | ||
w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) | ||
w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) | ||
w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) | ||
w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) | ||
w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) | ||
w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.