Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,7 @@ GO_TARGETS = [
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream",
"//pkg/obs/logstream:logstream_test",
"//pkg/obs/workloadid:workloadid",
"//pkg/obs:obs",
"//pkg/raft/confchange:confchange",
"//pkg/raft/confchange:confchange_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/kv/kvpb",
"//pkg/obs/workloadid",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
kvbulk "github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -359,7 +360,8 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
log.Dev.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "consume events"))
}
}, "proc", fmt.Sprintf("%d", lrw.ProcessorID))
}, workloadid.ProfileTag, workloadid.WORKLOAD_NAME_LDR,
"proc", fmt.Sprintf("%d", lrw.ProcessorID))
return nil
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
Expand Down Expand Up @@ -235,7 +236,8 @@ func (o *offlineInitialScanProcessor) Start(ctx context.Context) {
if err := o.subscription.Err(); err != nil {
o.sendError(errors.Wrap(err, "subscription"))
}
}, "proc", fmt.Sprintf("%d", o.ProcessorID))
}, workloadid.ProfileTag, workloadid.WORKLOAD_NAME_LDR,
"proc", fmt.Sprintf("%d", o.ProcessorID))
return nil
})
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ func (f *RangeFeed) start(
// pprof.Do function does exactly what we do here, but it also results in
// pprof.Do function showing up in the stack traces -- so, just set and reset
// labels manually.
ctx, reset := pprofutil.SetProfilerLabels(ctx, append(f.extraPProfLabels, "rangefeed", f.name)...)
ctx, reset := pprofutil.SetProfilerLabels(
ctx, append(f.extraPProfLabels, "rangefeed", f.name)...,
)
defer reset()

if f.invoker != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ go_library(
"//pkg/multitenant",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer",
"//pkg/multitenant/tenantcostmodel",
"//pkg/obs/workloadid",
"//pkg/raft",
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/node_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand Down Expand Up @@ -931,7 +932,7 @@ func (t *RaftTransport) startProcessNewQueue(
}
go func(ctx context.Context) {
defer hdl.Activate(ctx).Release(ctx)
pprofutil.Do(ctx, worker, "remote_node_id", toNodeID.String())
pprofutil.Do(ctx, worker, workloadid.ProfileTag, workloadid.WORKLOAD_NAME_RAFT, "remote_node_id", toNodeID.String())
}(ctx)
return true
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -139,7 +140,21 @@ func (r *Replica) SendWithWriteBytes(
}
defer reset()
}

if trace.IsEnabled() {
foundLabel := ""
for i, l := range ba.ProfileLabels {
if i%2 == 0 && l == workloadid.ProfileTag && i < len(ba.ProfileLabels)-1 {
// This label is set in conn_executor_exec if tracing is active.
foundLabel = ba.ProfileLabels[i+1]
break
}
}
// This construction avoids calling `defer` in a loop which is
// not permitted by our linter.
if foundLabel != "" {
defer trace.StartRegion(ctx, foundLabel).End()
}
defer trace.StartRegion(ctx, r.rangeStr.String() /* cheap */).End()
}
// Add the range log tag.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/base",
"//pkg/keys",
"//pkg/kv/kvserver/storeliveness/storelivenesspb",
"//pkg/obs/workloadid",
"//pkg/roachpb",
"//pkg/rpc/nodedialer",
"//pkg/rpc/rpcbase",
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/storeliveness/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
Expand Down Expand Up @@ -566,7 +567,10 @@ func (t *Transport) startProcessNewQueue(
err := t.stopper.RunAsyncTask(
ctx, "storeliveness.Transport: sending messages",
func(ctx context.Context) {
pprofutil.Do(ctx, worker, "remote_node_id", toNodeID.String())
pprofutil.Do(ctx, worker,
workloadid.ProfileTag, workloadid.WORKLOAD_NAME_STORELIVENESS,
"remote_node_id", toNodeID.String(),
)
},
)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/obs/workloadid/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "workloadid",
srcs = ["workloadid.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/obs/workloadid",
visibility = ["//visibility:public"],
)
51 changes: 51 additions & 0 deletions pkg/obs/workloadid/workloadid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package workloadid

// This package defines CRDB-wide workload identifiers that we can
// define and use to attribute observability data.
//
// We will prefer using uint64 workload identifiers because those are
// cheaper to move around the system and can help us avoid allocation
// in hot paths.
//
// String-based identifiers are useful when we want to display the
// identifier to a human, or to label go profiles and execution traces,
// which only accept string values for tags.
//
// Currently, this definition is quite simple consisting only of a
// preset list of static identifiers below that can be used numerically
// or using static strings. Statement Fingerprint IDs are the only
// example here of a dynamic ID that can have high cardinality.
//
// It is expected that future work will expand the structure and
// expressivity of the workloadID. No ID stability is guaranteed.
// While we currently do persist statement fingerprint IDs in the SQL
// Activity tables, we advise users of these values to AVOID PERSISTING
// WORKLOAD IDs until they are stable.

const ProfileTag = "workload.id"

type WorkloadID uint64

// The IDs and Names below are currently not persisted anywhere and
// are subject to change.

const (
WORKLOAD_ID_UNKNOWN WorkloadID = iota
WORKLOAD_ID_LDR
WORKLOAD_ID_RAFT
WORKLOAD_ID_STORELIVENESS
WORKLOAD_ID_RPC_HEARTBEAT
)

const (
WORKLOAD_NAME_UNKNOWN = "UNKNOWN"
WORKLOAD_NAME_LDR = "LDR"
WORKLOAD_NAME_RAFT = "RAFT"
WORKLOAD_NAME_STORELIVENESS = "STORELIVENESS"
WORKLOAD_NAME_RPC_HEARTBEAT = "RPC_HEARTBEAT"
)
1 change: 1 addition & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/obs/workloadid",
"//pkg/roachpb",
"//pkg/rpc/rpcbase",
"//pkg/security",
Expand Down
5 changes: 4 additions & 1 deletion pkg/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/VividCortex/ewma"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -271,7 +272,9 @@ func newPeer[Conn rpcConn](rpcCtx *Context, k peerKey, peerOpts *peerOptions[Con
AsyncProbe: func(report func(error), done func()) {
pprofutil.Do(ctx, func(ctx context.Context) {
p.launch(ctx, report, done)
}, "tags", logtags.FromContext(ctx).String())
},
workloadid.ProfileTag, workloadid.WORKLOAD_NAME_RPC_HEARTBEAT,
"tags", logtags.FromContext(ctx).String())
},
})
p.b = b
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ go_library(
"//pkg/multitenant/multitenantcpu",
"//pkg/multitenant/tenantcapabilities",
"//pkg/multitenant/tenantcapabilitiespb",
"//pkg/obs/workloadid",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu"
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -4438,20 +4440,27 @@ func (ex *connExecutor) execWithProfiling(
ctx context.Context, ast tree.Statement, prepared *prep.Statement, op func(context.Context) error,
) error {
var err error
if ex.server.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
if ex.server.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels || log.HasSpan(ctx) {
remoteAddr := "internal"
if rAddr := ex.sessionData().RemoteAddr; rAddr != nil {
remoteAddr = rAddr.String()
}
// Compute stmtNoConstants with proper FmtFlags for consistency with
// makeStatement and ih.Setup.
fmtFlags := tree.FmtFlags(tree.QueryFormattingForFingerprintsMask.Get(&ex.server.cfg.Settings.SV))
var stmtNoConstants string
if prepared != nil {
stmtNoConstants = prepared.StatementNoConstants
} else {
stmtNoConstants = tree.FormatStatementHideConstants(ast)
stmtNoConstants = tree.FormatStatementHideConstants(ast, fmtFlags)
}
// Compute fingerprint ID here since ih.Setup hasn't been called yet.
fingerprintID := appstatspb.ConstructStatementFingerprintID(
stmtNoConstants, ex.implicitTxn(), ex.sessionData().Database)
pprofutil.Do(ctx, func(ctx context.Context) {
err = op(ctx)
},
workloadid.ProfileTag, sqlstatsutil.EncodeStmtFingerprintIDToString(fingerprintID),
"appname", ex.sessionData().ApplicationName,
"addr", remoteAddr,
"stmt.tag", ast.StatementTag(),
Expand Down
Loading