Skip to content

refactor metrics event processing #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions cmd/metrics/event_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metrics
// Linux perf event output, i.e., from 'perf stat' parsing and processing helper functions

import (
"bytes"
"encoding/json"
"fmt"
"log/slog"
Expand Down Expand Up @@ -233,9 +234,6 @@ func coalesceEvents(allEvents []Event, scope string, granularity string, metadat
}
} else {
numCPUs = metadata.SocketCount * metadata.CoresPerSocket * metadata.ThreadsPerCore
if err != nil {
return nil, fmt.Errorf("failed to parse cpu range: %w", err)
}
cpuMap = make(map[int]int, numCPUs)
for i := 0; i < numCPUs; i++ {
cpuMap[i] = i
Expand Down Expand Up @@ -404,7 +402,7 @@ func collapseUncoreGroups(inGroups []EventGroup, firstIdx int, count int) (outGr
func parseEventJSON(rawEvent []byte) (Event, error) {
var event Event
if err := json.Unmarshal(rawEvent, &event); err != nil {
err = fmt.Errorf("unrecognized event format: \"%s\"", rawEvent)
err = fmt.Errorf("unrecognized event format")
return event, err
}
if !strings.Contains(event.CounterValue, "not counted") && !strings.Contains(event.CounterValue, "not supported") {
Expand All @@ -416,3 +414,41 @@ func parseEventJSON(rawEvent []byte) (Event, error) {
}
return event, nil
}

// extractInterval parses the interval value from a JSON perf event line
// Returns the interval as a float64, or -1 if parsing fails
func extractInterval(line []byte) float64 {
// Look for the interval field in the JSON: "interval" : 5.005073756
intervalPattern := []byte(`"interval" : `)
intervalStart := bytes.Index(line, intervalPattern)
if intervalStart == -1 {
return -1
}

// Move to the start of the number
intervalStart += len(intervalPattern)
if intervalStart >= len(line) {
return -1
}

// Find the end of the number (comma, space, or closing brace)
intervalEnd := intervalStart
for intervalEnd < len(line) {
ch := line[intervalEnd]
if ch == ',' || ch == ' ' || ch == '}' {
break
}
intervalEnd++
}
if intervalEnd == intervalStart {
return -1
}

// Parse the number directly from bytes
interval, err := strconv.ParseFloat(string(line[intervalStart:intervalEnd]), 64)
if err != nil {
return -1
}

return interval
}
59 changes: 59 additions & 0 deletions cmd/metrics/event_frame_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package metrics

// Copyright (C) 2021-2025 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause

import "testing"

func TestExtractInterval(t *testing.T) {
tests := []struct {
name string
line []byte
want float64
}{
{
name: "ValidJSON",
line: []byte(`{"interval" : 5.005073756, "cpu": "0"}`),
want: 5.005073756,
},
{
name: "ValidJSONWithSpaces",
line: []byte(`{ "interval" : 42.12345 }`),
want: 42.12345,
},
{
name: "MissingInterval",
line: []byte(`{"cpu": "0"}`),
want: -1,
},
{
name: "EmptyLine",
line: []byte(``),
want: -1,
},
{
name: "InvalidNumber",
line: []byte(`{"interval" : not_a_number, "cpu": "0"}`),
want: -1,
},
{
name: "IntervalAtEnd",
line: []byte(`{"interval" : 123.456}`),
want: 123.456,
},
{
name: "IntervalWithTrailingSpace",
line: []byte(`{"interval" : 77.88 , "cpu": "0"}`),
want: 77.88,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := extractInterval(tt.line)
if got != tt.want {
t.Errorf("extractInterval() = %v, want %v", got, tt.want)
}
})
}
}
162 changes: 106 additions & 56 deletions cmd/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,8 @@ func collectOnTarget(targetContext *targetContext, localTempDir string, localOut
func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec.Cmd, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, metadata Metadata, localTempDir string, outputDir string, frameChannel chan []MetricFrame, errorChannel chan error) {
// start perf
perfCommand := strings.Join(cmd.Args, " ")
stdoutChannel := make(chan string)
stderrChannel := make(chan string)
stdoutChannel := make(chan []byte)
stderrChannel := make(chan []byte)
exitcodeChannel := make(chan int)
scriptErrorChannel := make(chan error)
cmdChannel := make(chan *exec.Cmd)
Expand Down Expand Up @@ -1421,14 +1421,9 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
}
}
// Start a goroutine to wait for and then process perf output
// Use a timer to determine when we received an entire frame of events from perf
// The timer will expire when no lines (events) have been received from perf for more than 100ms. This
// works because perf writes the events to stderr in a burst every collection interval, e.g., 5 seconds.
// When the timer expires, this code assumes that perf is done writing events to stderr.
const perfEventWaitTime = time.Duration(100 * time.Millisecond) // 100ms is somewhat arbitrary, but is long enough for perf to print a frame of events
perfOutputTimer := time.NewTimer(time.Duration(2 * flagPerfPrintInterval * 1000)) // #nosec G115
const perfEventWaitTime = time.Duration(100 * time.Millisecond) // fallback timer for final interval
perfProcessingContext, cancelPerfProcessing := context.WithCancel(context.Background())
outputLines := make([][]byte, 0)
intervalBatchChannel := make(chan [][]byte, 10) // channel to send interval batches (all events for a particular interval) for processing
donePerfProcessingChannel := make(chan struct{}) // channel to wait for processPerfOutput to finish
go processPerfOutput(
perfProcessingContext,
Expand All @@ -1440,32 +1435,65 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
processes,
cgroupTimeout,
startPerfTimestamp,
perfOutputTimer,
&outputLines,
intervalBatchChannel,
frameChannel,
donePerfProcessingChannel,
)
// receive perf output

// receive perf output and batch by interval
var currentInterval float64 = -1
const initialBatchSize = 1000 // initial size of the batch
currentBatch := make([][]byte, 0, initialBatchSize)
done := false
for !done {
select {
case line := <-stderrChannel: // perf output comes in on this channel, one line at a time
perfOutputTimer.Stop()
perfOutputTimer.Reset(perfEventWaitTime)
// accumulate the lines, they will be processed in the goroutine when the timer expires
outputLines = append(outputLines, []byte(line))
case lineBytes := <-stderrChannel: // perf output comes in on this channel, one line at a time
interval := extractInterval(lineBytes)
if interval < 0 {
// If the interval is negative, it means the line is not a valid perf event line, skip it
slog.Warn("skipping invalid perf event line", slog.String("line", string(lineBytes)))
continue
}
// Handle interval change or first line
if interval != currentInterval {
// If we have accumulated lines from a previous interval, send them for processing
if len(currentBatch) > 0 {
// Send the batch and create a new one to avoid race conditions
batchToSend := currentBatch
currentBatch = make([][]byte, 0, len(currentBatch)) // batch sizes are typically the same
select {
case intervalBatchChannel <- batchToSend:
case <-perfProcessingContext.Done():
done = true
continue
}
}
// Start a new batch for the new interval
currentInterval = interval
}
// Add the line to the current batch
currentBatch = append(currentBatch, lineBytes)
case exitCode := <-exitcodeChannel: // when perf exits, the exit code comes to this channel
slog.Debug("perf exited", slog.Int("exit code", exitCode))
time.Sleep(perfEventWaitTime) // wait for timer to expire so that last events can be processed
done = true // exit the loop
// Send the final batch if we have accumulated lines
if len(currentBatch) > 0 {
batchToSend := currentBatch
currentBatch = nil // clear reference since we're done
select {
case intervalBatchChannel <- batchToSend:
case <-time.After(perfEventWaitTime): // timeout to avoid blocking
}
}
done = true // exit the loop
case err := <-scriptErrorChannel: // if there is an error running perf, it comes here
if err != nil {
slog.Error("error from perf", slog.String("error", err.Error()))
}
done = true // exit the loop
}
}
perfOutputTimer.Stop()
// Close the interval batch channel to signal no more batches
close(intervalBatchChannel)
// cancel the context to stop processPerfOutput
cancelPerfProcessing()
// wait for processPerfOutput to finish
Expand All @@ -1489,8 +1517,7 @@ func processPerfOutput(
processes []Process,
cgroupTimeout int,
startPerfTimestamp time.Time,
perfOutputTimer *time.Timer,
outputLines *[][]byte,
intervalBatchChannel chan [][]byte,
frameChannel chan []MetricFrame,
doneChannel chan struct{},
) {
Expand All @@ -1501,44 +1528,47 @@ func processPerfOutput(
const maxConsecutiveProcessEventErrors = 2
for !contextCancelled {
select {
case <-perfOutputTimer.C: // waits for timer to expire the process the events in outputLines
case <-ctx.Done(): // context cancellation
contextCancelled = true // exit the loop after one more pass
}
if contextCancelled {
break
}
if len(*outputLines) != 0 {
// write the events to a file
if flagWriteEventsToFile {
if err := writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.jsonl", *outputLines); err != nil {
slog.Error("failed to write events to file", slog.String("error", err.Error()))
}
case batchLines, ok := <-intervalBatchChannel:
if !ok {
// Channel closed, no more batches
contextCancelled = true
break
}
// process the events
var metricFrames []MetricFrame
var err error
metricFrames, frameTimestamp, err = ProcessEvents(*outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata)
if err != nil {
slog.Error(err.Error())
numConsecutiveProcessEventErrors++
if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors {
slog.Error("too many consecutive errors processing events, killing perf", slog.Int("max errors", maxConsecutiveProcessEventErrors))
// signaling self with SIGUSR1 will signal child processes to exit, which will cancel the context and let this function exit
err := util.SignalSelf(syscall.SIGUSR1)
if err != nil {
slog.Error("failed to signal self", slog.String("error", err.Error()))
// Process the interval batch
if len(batchLines) > 0 {
// write the events to a file
if flagWriteEventsToFile {
if err := writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.jsonl", batchLines); err != nil {
slog.Error("failed to write events to file", slog.String("error", err.Error()))
}
}
*outputLines = [][]byte{} // empty it
} else {
// send the metrics frames out to be printed
frameChannel <- metricFrames
// empty the outputLines
*outputLines = [][]byte{}
// reset the error count
numConsecutiveProcessEventErrors = 0
// process the events
var metricFrames []MetricFrame
var err error
metricFrames, frameTimestamp, err = ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata)
if err != nil {
slog.Error(err.Error())
numConsecutiveProcessEventErrors++
if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors {
slog.Error("too many consecutive errors processing events, killing perf", slog.Int("max errors", maxConsecutiveProcessEventErrors))
// signaling self with SIGUSR1 will signal child processes to exit, which will cancel the context and let this function exit
err := util.SignalSelf(syscall.SIGUSR1)
if err != nil {
slog.Error("failed to signal self", slog.String("error", err.Error()))
}
}
} else {
// send the metrics frames out to be printed
frameChannel <- metricFrames
// reset the error count
numConsecutiveProcessEventErrors = 0
}
}
case <-ctx.Done(): // context cancellation
contextCancelled = true
}
if contextCancelled {
break
}
// for cgroup scope, terminate perf if refresh timeout is reached
if flagScope == scopeCgroup && cgroupTimeout != 0 {
Expand All @@ -1552,4 +1582,24 @@ func processPerfOutput(
}
}
}
// Drain any remaining batches from the channel
for {
select {
case batchLines, ok := <-intervalBatchChannel:
if !ok {
return // Channel closed and drained
}
// Process final batches if they exist
if len(batchLines) > 0 {
if metricFrames, _, err := ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata); err == nil {
select {
case frameChannel <- metricFrames:
default: // Don't block if frameChannel is full
}
}
}
default:
return // No more batches
}
}
}
2 changes: 1 addition & 1 deletion internal/script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func RunScripts(myTarget target.Target, scripts []ScriptDefinition, ignoreScript
}

// RunScriptStream runs a script on the specified target and streams the output to the specified channels.
func RunScriptStream(myTarget target.Target, script ScriptDefinition, localTempDir string, stdoutChannel chan string, stderrChannel chan string, exitcodeChannel chan int, errorChannel chan error, cmdChannel chan *exec.Cmd) {
func RunScriptStream(myTarget target.Target, script ScriptDefinition, localTempDir string, stdoutChannel chan []byte, stderrChannel chan []byte, exitcodeChannel chan int, errorChannel chan error, cmdChannel chan *exec.Cmd) {
targetArchitecture, err := myTarget.GetArchitecture()
if err != nil {
err = fmt.Errorf("error getting target architecture: %v", err)
Expand Down
14 changes: 9 additions & 5 deletions internal/target/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func runLocalCommandWithInputWithTimeout(cmd *exec.Cmd, input string, timeout in
//
// Returns:
// - err: An error if the command fails to start or if there are issues with pipes.
func runLocalCommandWithInputWithTimeoutAsync(cmd *exec.Cmd, stdoutChannel chan string, stderrChannel chan string, exitcodeChannel chan int, input string, timeout int) (err error) {
func runLocalCommandWithInputWithTimeoutAsync(cmd *exec.Cmd, stdoutChannel chan []byte, stderrChannel chan []byte, exitcodeChannel chan int, input string, timeout int) (err error) {
logInput := ""
if input != "" {
logInput = "******"
Expand Down Expand Up @@ -163,14 +163,18 @@ func runLocalCommandWithInputWithTimeoutAsync(cmd *exec.Cmd, stdoutChannel chan
}
go func() {
for stdoutScanner.Scan() {
text := stdoutScanner.Text()
stdoutChannel <- text
internalBuf := stdoutScanner.Bytes()
sendBuf := make([]byte, len(internalBuf))
copy(sendBuf, internalBuf)
stdoutChannel <- sendBuf
}
}()
go func() {
for stderrScanner.Scan() {
text := stderrScanner.Text()
stderrChannel <- text
internalBuf := stderrScanner.Bytes()
sendBuf := make([]byte, len(internalBuf))
copy(sendBuf, internalBuf)
stderrChannel <- sendBuf
}
}()
err = cmd.Wait()
Expand Down
Loading