Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,16 @@ curl -s localhost:9464/metrics | grep oz_worker_

```bash
export OTEL_METRICS_EXPORTER=otlp
export OTEL_TRACES_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector.observability.svc:4318
oz-agent-worker --api-key "$WARP_API_KEY" --worker-id "my-worker"
```

Tracing is opt-in. Set `OTEL_TRACES_EXPORTER` to a non-`none` exporter such as
`otlp` to emit per-task spans and lifecycle events; leave it unset to export
metrics only.

### Helm

```bash
Expand Down Expand Up @@ -275,6 +280,8 @@ metrics:
enabled: true
exporter: otlp
extraEnv:
- name: OTEL_TRACES_EXPORTER
value: otlp
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: http://otel-collector.observability.svc:4318
```
Expand All @@ -293,17 +300,20 @@ shows up as a distinct series.
`count(oz_worker_tasks_active > 0)`.
- `oz_worker_tasks_max_concurrent` (gauge): configured concurrency limit
(`0` means unlimited).
- `oz_worker_tasks_claimed_total` (counter): total tasks accepted by the
worker since process start.
- `oz_worker_tasks_rejected_total{reason}` (counter): tasks the worker
declined, e.g. `reason="at_capacity"`.
- `oz_worker_tasks_completed_total{result}` (counter): completed tasks
labeled `result="succeeded"` or `result="failed"`. Success rate over 5m:
labeled `result="succeeded"`, `result="failed"`, or `result="cancelled"`.
Success rate over 5m:
`sum(rate(oz_worker_tasks_completed_total{result="succeeded"}[5m])) /
sum(rate(oz_worker_tasks_completed_total[5m]))`.
- `oz_worker_task_duration_seconds{result}` (histogram): wall-clock task
duration on the worker. p95: `histogram_quantile(0.95,
sum by (le) (rate(oz_worker_task_duration_seconds_bucket[5m])))`.
- `oz_worker_task_failures_total{phase,reason}` (counter): bounded failure
classification for task failures, such as `phase="backend"` with
`reason="image_pull"`, `reason="unschedulable"`, or
`reason="container_oom"`.
- `oz_worker_websocket_reconnects_total{reason}` (counter): reconnect
attempts; spikes indicate flapping workers.
- `oz_worker_info{version,backend,worker_id}` (gauge, value `1`): build and
Expand All @@ -321,6 +331,8 @@ Direct mappings for the questions enterprise operators most commonly ask:
sum(oz_worker_tasks_max_concurrent > 0)`
- **Failure rate:**
`sum(rate(oz_worker_tasks_completed_total{result="failed"}[5m]))`
- **Failure modes:**
`sum by (phase, reason) (rate(oz_worker_task_failures_total[5m]))`
- **Reconnect storms:**
`sum(rate(oz_worker_websocket_reconnects_total[5m])) > 0.1`

Expand Down
212 changes: 186 additions & 26 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import (
"errors"
"fmt"
"os"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/contrib/exporters/autoexport"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
metricnoop "go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)

// scopeName is the instrumentation-scope name used for all worker metrics.
Expand Down Expand Up @@ -70,6 +74,7 @@ type instruments struct {
tasksRejected metric.Int64Counter
tasksCompleted metric.Int64Counter
taskDuration metric.Float64Histogram
taskFailures metric.Int64Counter
wsReconnects metric.Int64Counter
workerInfo metric.Int64Gauge
}
Expand All @@ -86,6 +91,74 @@ const (
RejectReasonAtCapacity = "at_capacity"
WSReconnectReasonDialFailed = "dial_failed"
WSReconnectReasonRemoteClose = "remote_close"

TaskFailurePhaseAssignment = "assignment"
TaskFailurePhaseBackend = "backend"
TaskFailurePhaseCleanup = "cleanup"

TaskFailureReasonUnknown = "unknown"
TaskFailureReasonTaskTimeout = "task_timeout"
TaskFailureReasonTaskCancelled = "task_cancelled"
TaskFailureReasonImagePull = "image_pull"
TaskFailureReasonSidecarPrep = "sidecar_prep"
TaskFailureReasonContainerCreate = "container_create"
TaskFailureReasonContainerStart = "container_start"
TaskFailureReasonContainerWait = "container_wait"
TaskFailureReasonContainerExit = "container_exit"
TaskFailureReasonContainerOOM = "container_oom"
TaskFailureReasonWorkspaceSetup = "workspace_setup"
TaskFailureReasonSetupCommand = "setup_command"
TaskFailureReasonAgentInvocation = "agent_invocation"
TaskFailureReasonTeardownCommand = "teardown_command"
TaskFailureReasonJobCreate = "job_create"
TaskFailureReasonJobWatch = "job_watch"
TaskFailureReasonJobFailed = "job_failed"
TaskFailureReasonPodWatch = "pod_watch"
TaskFailureReasonUnschedulable = "unschedulable"
TaskFailureReasonVolumeMount = "volume_mount"
TaskFailureReasonInitContainer = "init_container"
TaskFailureReasonInvalidImage = "invalid_image"
TaskFailureReasonActiveDeadline = "active_deadline"
TaskFailureReasonCleanup = "cleanup"
)

var (
taskResults = []TaskResult{
TaskResultSucceeded,
TaskResultFailed,
TaskResultCancelled,
}
taskFailurePhases = []string{
TaskFailurePhaseAssignment,
TaskFailurePhaseBackend,
TaskFailurePhaseCleanup,
}
taskFailureReasons = []string{
TaskFailureReasonUnknown,
TaskFailureReasonTaskTimeout,
TaskFailureReasonTaskCancelled,
TaskFailureReasonImagePull,
TaskFailureReasonSidecarPrep,
TaskFailureReasonContainerCreate,
TaskFailureReasonContainerStart,
TaskFailureReasonContainerWait,
TaskFailureReasonContainerExit,
TaskFailureReasonContainerOOM,
TaskFailureReasonWorkspaceSetup,
TaskFailureReasonSetupCommand,
TaskFailureReasonAgentInvocation,
TaskFailureReasonTeardownCommand,
TaskFailureReasonJobCreate,
TaskFailureReasonJobWatch,
TaskFailureReasonJobFailed,
TaskFailureReasonPodWatch,
TaskFailureReasonUnschedulable,
TaskFailureReasonVolumeMount,
TaskFailureReasonInitContainer,
TaskFailureReasonInvalidImage,
TaskFailureReasonActiveDeadline,
TaskFailureReasonCleanup,
}
)

func init() {
Expand Down Expand Up @@ -116,38 +189,79 @@ func init() {
// stops the exporter; it is safe to call after Init returns an error.
func Init(ctx context.Context, cfg Config) (shutdown func(context.Context) error, err error) {
noop := func(context.Context) error { return nil }

if os.Getenv("OTEL_METRICS_EXPORTER") == "none" {
// Explicit opt-out: keep the no-op instruments installed by init().
return noop, nil
var shutdowns []func(context.Context) error
var initErr error
var res *resource.Resource
getResource := func() (*resource.Resource, error) {
if res != nil {
return res, nil
}
var err error
res, err = newResource(ctx, cfg)
return res, err
}

reader, err := autoexport.NewMetricReader(ctx)
if err != nil {
return noop, err
}
if os.Getenv("OTEL_METRICS_EXPORTER") != "none" {
reader, err := autoexport.NewMetricReader(ctx)
if err != nil {
initErr = errors.Join(initErr, err)
} else {
res, err := getResource()
if err != nil {
initErr = errors.Join(initErr, errors.Join(err, reader.Shutdown(ctx)))
} else {
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithResource(res),
)

res, err := newResource(ctx, cfg)
if err != nil {
// We have a reader but failed to build a resource; close the reader
// and surface the error.
return noop, errors.Join(err, reader.Shutdown(ctx))
meter := provider.Meter(scopeName)
set, err := buildInstruments(meter)
if err != nil {
initErr = errors.Join(initErr, errors.Join(err, provider.Shutdown(ctx)))
} else {
activeInstruments.Store(set)
primeInstruments(ctx, set)
shutdowns = append(shutdowns, provider.Shutdown)
}
}
}
}

provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithResource(res),
)
if shouldInitTraces() {
exporter, err := autoexport.NewSpanExporter(ctx)
if err != nil {
initErr = errors.Join(initErr, err)
} else {
res, err := getResource()
if err != nil {
initErr = errors.Join(initErr, errors.Join(err, exporter.Shutdown(ctx)))
} else {
provider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
)
otel.SetTracerProvider(provider)
shutdowns = append(shutdowns, provider.Shutdown)
}
}
}

meter := provider.Meter(scopeName)
set, err := buildInstruments(meter)
if err != nil {
return noop, errors.Join(err, provider.Shutdown(ctx))
if len(shutdowns) == 0 {
return noop, initErr
}
activeInstruments.Store(set)
primeInstruments(ctx, set)
return func(ctx context.Context) error {
var err error
for i := len(shutdowns) - 1; i >= 0; i-- {
err = errors.Join(err, shutdowns[i](ctx))
}
return err
}, initErr
}

return provider.Shutdown, nil
func shouldInitTraces() bool {
exporter := strings.TrimSpace(os.Getenv("OTEL_TRACES_EXPORTER"))
return exporter != "" && !strings.EqualFold(exporter, "none")
}

// primeInstruments records a zero observation against every known
Expand All @@ -165,11 +279,21 @@ func primeInstruments(ctx context.Context, set *instruments) {
set.tasksRejected.Add(ctx, 0,
metric.WithAttributes(attribute.String("reason", RejectReasonAtCapacity)),
)
for _, r := range []TaskResult{TaskResultSucceeded, TaskResultFailed} {
for _, r := range taskResults {
set.tasksCompleted.Add(ctx, 0,
metric.WithAttributes(attribute.String("result", string(r))),
)
}
for _, phase := range taskFailurePhases {
for _, reason := range taskFailureReasons {
set.taskFailures.Add(ctx, 0,
metric.WithAttributes(
attribute.String("phase", phase),
attribute.String("reason", reason),
),
)
}
}
for _, r := range []string{WSReconnectReasonDialFailed, WSReconnectReasonRemoteClose} {
set.wsReconnects.Add(ctx, 0,
metric.WithAttributes(attribute.String("reason", r)),
Expand Down Expand Up @@ -248,6 +372,13 @@ func buildInstruments(m metric.Meter) (*instruments, error) {
if err != nil {
return nil, err
}
taskFailures, err := m.Int64Counter(
"oz_worker_task_failures_total",
metric.WithDescription("Task failures labeled by bounded execution phase and reason."),
)
if err != nil {
return nil, err
}
wsReconnects, err := m.Int64Counter(
"oz_worker_websocket_reconnects_total",
metric.WithDescription("Total WebSocket reconnect attempts since process start."),
Expand All @@ -270,6 +401,7 @@ func buildInstruments(m metric.Meter) (*instruments, error) {
tasksRejected: tasksRejected,
tasksCompleted: tasksCompleted,
taskDuration: taskDuration,
taskFailures: taskFailures,
wsReconnects: wsReconnects,
workerInfo: workerInfo,
}, nil
Expand Down Expand Up @@ -305,7 +437,7 @@ func SetMaxConcurrent(n int) {
current().tasksMaxConcurrent.Record(context.Background(), int64(n))
}

// RecordTaskClaim records a successful claim (worker has accepted a task).
// RecordTaskClaim records a successful task claim (the worker has accepted a task).
func RecordTaskClaim() {
current().tasksClaimed.Add(context.Background(), 1)
}
Expand All @@ -326,6 +458,7 @@ type TaskResult string
const (
TaskResultSucceeded TaskResult = "succeeded"
TaskResultFailed TaskResult = "failed"
TaskResultCancelled TaskResult = "cancelled"
)

// RecordTaskCompleted records a completed task, regardless of outcome, along
Expand All @@ -336,6 +469,16 @@ func RecordTaskCompleted(result TaskResult, duration time.Duration) {
current().taskDuration.Record(context.Background(), duration.Seconds(), attrs)
}

// RecordTaskFailure records a bounded failure mode for a task.
func RecordTaskFailure(phase, reason string) {
current().taskFailures.Add(context.Background(), 1,
metric.WithAttributes(
attribute.String("phase", phase),
attribute.String("reason", reason),
),
)
}

// RecordWebsocketReconnect records a reconnect attempt against warp-server.
// The reason label is intended to be a small bounded enum
// (e.g. "dial_failed", "remote_close").
Expand All @@ -356,3 +499,20 @@ func SetWorkerInfo(version, backend, workerID string) {
),
)
}

// StartTaskSpan starts a trace span for a single task execution. It is a no-op
// unless OTEL_TRACES_EXPORTER is configured by the operator.
func StartTaskSpan(ctx context.Context, taskID, title string) (context.Context, trace.Span) {
return otel.Tracer(scopeName).Start(ctx, "oz_worker.task",
trace.WithAttributes(
attribute.String("task.id", taskID),
attribute.String("task.title", title),
),
)
}

// AddTaskEvent records a task lifecycle event on the active span. High-cardinality
// identifiers belong here rather than in metric labels.
func AddTaskEvent(ctx context.Context, name string, attrs ...attribute.KeyValue) {
trace.SpanFromContext(ctx).AddEvent(name, trace.WithAttributes(attrs...))
}
Loading
Loading