Skip to content

Commit fe59e6d

Browse files
committed
as per DD_APM_PEER_TAGS_AGGREGATION in agent
1 parent 6aa8d69 commit fe59e6d

File tree

3 files changed

+137
-59
lines changed

3 files changed

+137
-59
lines changed

contrib/valkey-go/option.go

-17
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,12 @@ package valkey
99

1010
import (
1111
"math"
12-
"os"
1312

1413
"gopkg.in/DataDog/dd-trace-go.v1/internal"
1514
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
1615
)
1716

18-
const defaultServiceName = "valkey.client"
19-
2017
type clientConfig struct {
21-
serviceName string
2218
spanName string
2319
analyticsRate float64
2420
skipRaw bool
@@ -28,18 +24,12 @@ type clientConfig struct {
2824
type ClientOption func(*clientConfig)
2925

3026
func defaults(cfg *clientConfig) {
31-
cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
3227
cfg.spanName = namingschema.OpName(namingschema.ValkeyOutbound)
3328
if internal.BoolEnv("DD_TRACE_VALKEY_ANALYTICS_ENABLED", false) {
3429
cfg.analyticsRate = 1.0
3530
} else {
3631
cfg.analyticsRate = math.NaN()
3732
}
38-
if v := os.Getenv("DD_TRACE_VALKEY_SERVICE_NAME"); v == "" {
39-
cfg.serviceName = defaultServiceName
40-
} else {
41-
cfg.serviceName = v
42-
}
4333
cfg.skipRaw = internal.BoolEnv("DD_TRACE_VALKEY_SKIP_RAW_COMMAND", false)
4434
}
4535

@@ -52,13 +42,6 @@ func WithSkipRawCommand(skip bool) ClientOption {
5242
}
5343
}
5444

55-
// WithServiceName sets the given service name for the client.
56-
func WithServiceName(name string) ClientOption {
57-
return func(cfg *clientConfig) {
58-
cfg.serviceName = name
59-
}
60-
}
61-
6245
// WithAnalytics enables Trace Analytics for all started spans.
6346
func WithAnalytics(on bool) ClientOption {
6447
return func(cfg *clientConfig) {

contrib/valkey-go/valkey.go

+32-12
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,10 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client,
6262
for _, fn := range opts {
6363
fn(&cfg)
6464
}
65-
var host, portStr string
65+
var host string
6666
var port int
6767
if len(option.InitAddress) == 1 {
68-
host, portStr, err = net.SplitHostPort(option.InitAddress[0])
69-
if err != nil {
70-
log.Error("valkey.ClientOption.InitAddress contains invalid address: %s", err)
71-
}
72-
port, _ = strconv.Atoi(portStr)
68+
host, port = splitHostPort(option.InitAddress[0])
7369
}
7470
core := coreClient{
7571
Client: valkeyClient,
@@ -83,6 +79,16 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client,
8379
}, nil
8480
}
8581

82+
func splitHostPort(addr string) (string, int) {
83+
host, portStr, err := net.SplitHostPort(addr)
84+
if err != nil {
85+
log.Error("%q cannot be split: %s", addr, err)
86+
return "", 0
87+
}
88+
port, _ := strconv.Atoi(portStr)
89+
return host, port
90+
}
91+
8692
type commander interface {
8793
Commands() []string
8894
}
@@ -154,10 +160,26 @@ type buildStartSpanOptionsInput struct {
154160
skipRawCommand bool
155161
}
156162

163+
func (c *coreClient) peerTags() []tracer.StartSpanOption {
164+
ipAddr := net.ParseIP(c.host)
165+
var peerHostKey string
166+
if ipAddr == nil {
167+
peerHostKey = ext.PeerHostname
168+
} else if ipAddr.To4() != nil {
169+
peerHostKey = ext.PeerHostIPV4
170+
} else {
171+
peerHostKey = ext.PeerHostIPV6
172+
}
173+
return []tracer.StartSpanOption{
174+
tracer.Tag(ext.PeerService, ext.DBSystemValkey),
175+
tracer.Tag(peerHostKey, c.host),
176+
tracer.Tag(ext.PeerPort, c.port),
177+
}
178+
}
179+
157180
func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []tracer.StartSpanOption {
158181
opts := []tracer.StartSpanOption{
159182
tracer.SpanType(ext.SpanTypeValkey),
160-
tracer.ServiceName(c.clientConfig.serviceName),
161183
tracer.Tag(ext.TargetHost, c.host),
162184
tracer.Tag(ext.TargetPort, c.port),
163185
tracer.Tag(ext.ValkeyClientVersion, valkey.LibVer),
@@ -166,13 +188,15 @@ func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []t
166188
tracer.Tag(ext.SpanKind, ext.SpanKindClient),
167189
tracer.Tag(ext.DBType, ext.DBSystemValkey),
168190
tracer.Tag(ext.DBSystem, ext.DBSystemValkey),
191+
tracer.Tag(ext.DBInstance, ext.DBSystemValkey),
169192
tracer.Tag(ext.ValkeyDatabaseIndex, c.option.SelectDB),
170193
tracer.Tag(ext.ValkeyClientCommandWrite, input.isWrite),
171194
tracer.Tag(ext.ValkeyClientCommandBlock, input.isBlock),
172195
tracer.Tag(ext.ValkeyClientCommandMulti, input.isMulti),
173196
tracer.Tag(ext.ValkeyClientCommandStream, input.isStream),
174197
tracer.Tag(ext.ValkeyClientCommandWithPassword, c.option.Password != ""),
175198
}
199+
opts = append(opts, c.peerTags()...)
176200
if input.command != "" {
177201
opts = append(opts, []tracer.StartSpanOption{
178202
// valkeyotel tags
@@ -324,11 +348,7 @@ func (c *client) Dedicate() (client valkey.DedicatedClient, cancel func()) {
324348
func (c *client) Nodes() map[string]valkey.Client {
325349
nodes := c.Client.Nodes()
326350
for addr, valkeyClient := range nodes {
327-
host, portStr, err := net.SplitHostPort(addr)
328-
if err != nil {
329-
log.Error("invalid address is set to valkey client: %s", err)
330-
}
331-
port, _ := strconv.Atoi(portStr)
351+
host, port := splitHostPort(addr)
332352
nodes[addr] = &client{
333353
coreClient: coreClient{
334354
Client: valkeyClient,

contrib/valkey-go/valkey_test.go

+105-30
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// under the Apache License Version 2.0.
33
// This product includes software developed at Datadog (https://www.datadoghq.com/).
44
// Copyright 2016 Datadog, Inc.
5-
package valkey_test
5+
package valkey
66

77
import (
88
"context"
@@ -14,9 +14,10 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"github.com/valkey-io/valkey-go"
17-
valkeytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/valkey-go"
17+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
1818
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
1919
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
20+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
2021
)
2122

2223
var (
@@ -35,11 +36,89 @@ func TestMain(m *testing.M) {
3536
os.Exit(m.Run())
3637
}
3738

39+
func TestPeerTags(t *testing.T) {
40+
tests := []struct {
41+
initAddress string
42+
expectedTags map[string]interface{}
43+
}{
44+
{
45+
initAddress: "127.0.0.1:6379",
46+
expectedTags: map[string]interface{}{
47+
ext.PeerService: "valkey",
48+
ext.PeerHostIPV4: "127.0.0.1",
49+
ext.PeerPort: 6379,
50+
},
51+
},
52+
{
53+
initAddress: "[::1]:6379",
54+
expectedTags: map[string]interface{}{
55+
ext.PeerService: "valkey",
56+
ext.PeerHostIPV6: "::1",
57+
ext.PeerPort: 6379,
58+
},
59+
},
60+
{
61+
initAddress: "[2001:db8::2]:6379",
62+
expectedTags: map[string]interface{}{
63+
ext.PeerService: "valkey",
64+
ext.PeerHostIPV6: "2001:db8::2",
65+
ext.PeerPort: 6379,
66+
},
67+
},
68+
{
69+
initAddress: "[2001:db8::2%lo]:6379",
70+
expectedTags: map[string]interface{}{
71+
ext.PeerService: "valkey",
72+
ext.PeerHostname: "2001:db8::2%lo",
73+
ext.PeerPort: 6379,
74+
},
75+
},
76+
{
77+
initAddress: "::1:7777",
78+
expectedTags: map[string]interface{}{
79+
ext.PeerService: "valkey",
80+
ext.PeerHostname: "",
81+
ext.PeerPort: 0,
82+
},
83+
},
84+
{
85+
initAddress: ":::7777",
86+
expectedTags: map[string]interface{}{
87+
ext.PeerService: "valkey",
88+
ext.PeerHostname: "",
89+
ext.PeerPort: 0,
90+
},
91+
},
92+
{
93+
initAddress: "localhost:7777",
94+
expectedTags: map[string]interface{}{
95+
ext.PeerService: "valkey",
96+
ext.PeerHostname: "localhost",
97+
ext.PeerPort: 7777,
98+
},
99+
},
100+
}
101+
for _, tt := range tests {
102+
t.Run(tt.initAddress, func(t *testing.T) {
103+
host, port := splitHostPort(tt.initAddress)
104+
client := coreClient{
105+
host: host,
106+
port: port,
107+
}
108+
var startSpanConfig ddtrace.StartSpanConfig
109+
for _, tag := range client.peerTags() {
110+
tag(&startSpanConfig)
111+
}
112+
require.Equal(t, tt.expectedTags, startSpanConfig.Tags)
113+
})
114+
}
115+
}
116+
38117
func TestNewClient(t *testing.T) {
39118
tests := []struct {
40119
name string
41120
valkeyClientOptions valkey.ClientOption
42-
valkeytraceClientOptions []valkeytrace.ClientOption
121+
valkeytraceClientOptions []ClientOption
43122
valkeytraceClientEnvVars map[string]string
44123
createSpans func(*testing.T, context.Context, valkey.Client)
45124
assertNewClientError func(*testing.T, error)
@@ -74,19 +153,15 @@ func TestNewClient(t *testing.T) {
74153
Username: valkeyUsername,
75154
Password: valkeyPassword,
76155
},
77-
valkeytraceClientOptions: []valkeytrace.ClientOption{
78-
valkeytrace.WithServiceName("my-valkey-client"),
79-
valkeytrace.WithAnalytics(true),
80-
valkeytrace.WithSkipRawCommand(true),
156+
valkeytraceClientOptions: []ClientOption{
157+
WithAnalytics(true),
158+
WithSkipRawCommand(true),
81159
},
82160
createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) {
83161
assert.NoError(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error())
84162
},
85163
assertSpans: []func(t *testing.T, span mocktracer.Span){
86164
func(t *testing.T, span mocktracer.Span) {
87-
assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName))
88-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
89-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
90165
assert.Equal(t, "SET", span.Tag(ext.DBStatement))
91166
assert.Equal(t, "SET", span.Tag(ext.ResourceName))
92167
assert.Greater(t, span.Tag("db.stmt_size"), 0)
@@ -119,9 +194,6 @@ func TestNewClient(t *testing.T) {
119194
},
120195
assertSpans: []func(t *testing.T, span mocktracer.Span){
121196
func(t *testing.T, span mocktracer.Span) {
122-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
123-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
124-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
125197
assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.DBStatement))
126198
assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.ResourceName))
127199
assert.Greater(t, span.Tag("db.stmt_size"), 0)
@@ -155,9 +227,6 @@ func TestNewClient(t *testing.T) {
155227
},
156228
assertSpans: []func(t *testing.T, span mocktracer.Span){
157229
func(t *testing.T, span mocktracer.Span) {
158-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
159-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
160-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
161230
assert.Greater(t, span.Tag("db.stmt_size"), 0)
162231
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement))
163232
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName))
@@ -175,9 +244,6 @@ func TestNewClient(t *testing.T) {
175244
assert.Nil(t, span.Tag(ext.Error))
176245
},
177246
func(t *testing.T, span mocktracer.Span) {
178-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
179-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
180-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
181247
assert.Greater(t, span.Tag("db.stmt_size"), 0)
182248
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement))
183249
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName))
@@ -204,7 +270,6 @@ func TestNewClient(t *testing.T) {
204270
Password: valkeyPassword,
205271
},
206272
valkeytraceClientEnvVars: map[string]string{
207-
"DD_TRACE_VALKEY_SERVICE_NAME": "my-valkey-client",
208273
"DD_TRACE_VALKEY_ANALYTICS_ENABLED": "true",
209274
"DD_TRACE_VALKEY_SKIP_RAW_COMMAND": "true",
210275
},
@@ -214,9 +279,6 @@ func TestNewClient(t *testing.T) {
214279
},
215280
assertSpans: []func(t *testing.T, span mocktracer.Span){
216281
func(t *testing.T, span mocktracer.Span) {
217-
assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName))
218-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
219-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
220282
assert.Equal(t, "GET", span.Tag(ext.DBStatement))
221283
assert.Equal(t, "GET", span.Tag(ext.ResourceName))
222284
assert.Greater(t, span.Tag("db.stmt_size"), 0)
@@ -252,9 +314,6 @@ func TestNewClient(t *testing.T) {
252314
},
253315
assertSpans: []func(t *testing.T, span mocktracer.Span){
254316
func(t *testing.T, span mocktracer.Span) {
255-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
256-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
257-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
258317
assert.Greater(t, span.Tag("db.stmt_size"), 0)
259318
assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.DBStatement))
260319
assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.ResourceName))
@@ -276,25 +335,40 @@ func TestNewClient(t *testing.T) {
276335
}
277336
for _, tt := range tests {
278337
t.Run(tt.name, func(t *testing.T) {
279-
ctx := context.Background()
280338
mt := mocktracer.Start()
281339
defer mt.Stop()
282340
for k, v := range tt.valkeytraceClientEnvVars {
283341
t.Setenv(k, v)
284342
}
285-
client, err := valkeytrace.NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...)
343+
span, ctx := tracer.StartSpanFromContext(context.Background(), "test.root", tracer.ServiceName("test-service"))
344+
client, err := NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...)
286345
if tt.assertNewClientError == nil {
287346
require.NoErrorf(t, err, tt.name)
288347
} else {
289348
tt.assertNewClientError(t, err)
349+
span.Finish()
290350
return
291351
}
292352
tt.createSpans(t, ctx, client)
353+
span.Finish() // test.root exists in the last span.
293354
spans := mt.FinishedSpans()
294-
require.Len(t, spans, len(tt.assertSpans))
355+
require.Len(t, spans, len(tt.assertSpans)+1) // +1 for test.root
295356
for i, span := range spans {
357+
if span.OperationName() == "test.root" {
358+
continue
359+
}
296360
tt.assertSpans[i](t, span)
297-
// Following assertions are common to all spans
361+
t.Log("Following assertions are common to all spans")
362+
assert.Equalf(t,
363+
"test-service",
364+
span.Tag(ext.ServiceName),
365+
"service name should not be overwritten as per DD_APM_PEER_TAGS_AGGREGATION in trace-agent",
366+
)
367+
assert.Equal(t, "valkey", span.Tag(ext.PeerService))
368+
assert.Equal(t, "127.0.0.1", span.Tag(ext.PeerHostIPV4))
369+
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
370+
assert.Equal(t, valkeyPort, span.Tag(ext.PeerPort))
371+
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
298372
assert.NotNil(t, span)
299373
assert.True(t, span.Tag(ext.ValkeyClientCommandWithPassword).(bool))
300374
assert.Equal(t, tt.valkeyClientOptions.Username, span.Tag(ext.DBUser))
@@ -304,6 +378,7 @@ func TestNewClient(t *testing.T) {
304378
assert.Equal(t, "valkey-go/valkey", span.Tag(ext.Component))
305379
assert.Equal(t, "valkey", span.Tag(ext.DBType))
306380
assert.Equal(t, "valkey", span.Tag(ext.DBSystem))
381+
assert.Equal(t, "valkey", span.Tag(ext.DBInstance))
307382
}
308383
})
309384
}

0 commit comments

Comments
 (0)