Skip to content

Commit 11e7c51

Browse files
committed
as per DD_APM_PEER_TAGS_AGGREGATION in agent
1 parent 6aa8d69 commit 11e7c51

File tree

3 files changed

+124
-54
lines changed

3 files changed

+124
-54
lines changed

contrib/valkey-go/option.go

-16
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@ package valkey
99

1010
import (
1111
"math"
12-
"os"
1312

1413
"gopkg.in/DataDog/dd-trace-go.v1/internal"
1514
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
1615
)
1716

18-
const defaultServiceName = "valkey.client"
19-
2017
type clientConfig struct {
2118
serviceName string
2219
spanName string
@@ -28,18 +25,12 @@ type clientConfig struct {
2825
type ClientOption func(*clientConfig)
2926

3027
func defaults(cfg *clientConfig) {
31-
cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
3228
cfg.spanName = namingschema.OpName(namingschema.ValkeyOutbound)
3329
if internal.BoolEnv("DD_TRACE_VALKEY_ANALYTICS_ENABLED", false) {
3430
cfg.analyticsRate = 1.0
3531
} else {
3632
cfg.analyticsRate = math.NaN()
3733
}
38-
if v := os.Getenv("DD_TRACE_VALKEY_SERVICE_NAME"); v == "" {
39-
cfg.serviceName = defaultServiceName
40-
} else {
41-
cfg.serviceName = v
42-
}
4334
cfg.skipRaw = internal.BoolEnv("DD_TRACE_VALKEY_SKIP_RAW_COMMAND", false)
4435
}
4536

@@ -52,13 +43,6 @@ func WithSkipRawCommand(skip bool) ClientOption {
5243
}
5344
}
5445

55-
// WithServiceName sets the given service name for the client.
56-
func WithServiceName(name string) ClientOption {
57-
return func(cfg *clientConfig) {
58-
cfg.serviceName = name
59-
}
60-
}
61-
6246
// WithAnalytics enables Trace Analytics for all started spans.
6347
func WithAnalytics(on bool) ClientOption {
6448
return func(cfg *clientConfig) {

contrib/valkey-go/valkey.go

+32-11
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,10 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client,
6262
for _, fn := range opts {
6363
fn(&cfg)
6464
}
65-
var host, portStr string
65+
var host string
6666
var port int
6767
if len(option.InitAddress) == 1 {
68-
host, portStr, err = net.SplitHostPort(option.InitAddress[0])
69-
if err != nil {
70-
log.Error("valkey.ClientOption.InitAddress contains invalid address: %s", err)
71-
}
72-
port, _ = strconv.Atoi(portStr)
68+
host, port = splitHostPort(option.InitAddress[0])
7369
}
7470
core := coreClient{
7571
Client: valkeyClient,
@@ -83,6 +79,16 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client,
8379
}, nil
8480
}
8581

82+
func splitHostPort(addr string) (string, int) {
83+
host, portStr, err := net.SplitHostPort(addr)
84+
if err != nil {
85+
log.Error("%q cannot be split: %s", addr, err)
86+
return "", 0
87+
}
88+
port, _ := strconv.Atoi(portStr)
89+
return host, port
90+
}
91+
8692
type commander interface {
8793
Commands() []string
8894
}
@@ -154,6 +160,23 @@ type buildStartSpanOptionsInput struct {
154160
skipRawCommand bool
155161
}
156162

163+
func (c *coreClient) peerTags() []tracer.StartSpanOption {
164+
ipAddr := net.ParseIP(c.host)
165+
var peerHostKey string
166+
if ipAddr == nil {
167+
peerHostKey = ext.PeerHostname
168+
} else if ipAddr.To4() != nil {
169+
peerHostKey = ext.PeerHostIPV4
170+
} else {
171+
peerHostKey = ext.PeerHostIPV6
172+
}
173+
return []tracer.StartSpanOption{
174+
tracer.Tag(ext.PeerService, ext.DBSystemValkey),
175+
tracer.Tag(peerHostKey, c.host),
176+
tracer.Tag(ext.PeerPort, c.port),
177+
}
178+
}
179+
157180
func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []tracer.StartSpanOption {
158181
opts := []tracer.StartSpanOption{
159182
tracer.SpanType(ext.SpanTypeValkey),
@@ -166,13 +189,15 @@ func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []t
166189
tracer.Tag(ext.SpanKind, ext.SpanKindClient),
167190
tracer.Tag(ext.DBType, ext.DBSystemValkey),
168191
tracer.Tag(ext.DBSystem, ext.DBSystemValkey),
192+
tracer.Tag(ext.DBInstance, ext.DBSystemValkey),
169193
tracer.Tag(ext.ValkeyDatabaseIndex, c.option.SelectDB),
170194
tracer.Tag(ext.ValkeyClientCommandWrite, input.isWrite),
171195
tracer.Tag(ext.ValkeyClientCommandBlock, input.isBlock),
172196
tracer.Tag(ext.ValkeyClientCommandMulti, input.isMulti),
173197
tracer.Tag(ext.ValkeyClientCommandStream, input.isStream),
174198
tracer.Tag(ext.ValkeyClientCommandWithPassword, c.option.Password != ""),
175199
}
200+
opts = append(opts, c.peerTags()...)
176201
if input.command != "" {
177202
opts = append(opts, []tracer.StartSpanOption{
178203
// valkeyotel tags
@@ -324,11 +349,7 @@ func (c *client) Dedicate() (client valkey.DedicatedClient, cancel func()) {
324349
func (c *client) Nodes() map[string]valkey.Client {
325350
nodes := c.Client.Nodes()
326351
for addr, valkeyClient := range nodes {
327-
host, portStr, err := net.SplitHostPort(addr)
328-
if err != nil {
329-
log.Error("invalid address is set to valkey client: %s", err)
330-
}
331-
port, _ := strconv.Atoi(portStr)
352+
host, port := splitHostPort(addr)
332353
nodes[addr] = &client{
333354
coreClient: coreClient{
334355
Client: valkeyClient,

contrib/valkey-go/valkey_test.go

+92-27
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// under the Apache License Version 2.0.
33
// This product includes software developed at Datadog (https://www.datadoghq.com/).
44
// Copyright 2016 Datadog, Inc.
5-
package valkey_test
5+
package valkey
66

77
import (
88
"context"
@@ -14,7 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"github.com/valkey-io/valkey-go"
17-
valkeytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/valkey-go"
17+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
1818
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
1919
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
2020
)
@@ -35,11 +35,89 @@ func TestMain(m *testing.M) {
3535
os.Exit(m.Run())
3636
}
3737

38+
func TestPeerTags(t *testing.T) {
39+
tests := []struct {
40+
initAddress string
41+
expectedTags map[string]interface{}
42+
}{
43+
{
44+
initAddress: "127.0.0.1:6379",
45+
expectedTags: map[string]interface{}{
46+
ext.PeerService: "valkey",
47+
ext.PeerHostIPV4: "127.0.0.1",
48+
ext.PeerPort: 6379,
49+
},
50+
},
51+
{
52+
initAddress: "[::1]:6379",
53+
expectedTags: map[string]interface{}{
54+
ext.PeerService: "valkey",
55+
ext.PeerHostIPV6: "::1",
56+
ext.PeerPort: 6379,
57+
},
58+
},
59+
{
60+
initAddress: "[2001:db8::2]:6379",
61+
expectedTags: map[string]interface{}{
62+
ext.PeerService: "valkey",
63+
ext.PeerHostIPV6: "2001:db8::2",
64+
ext.PeerPort: 6379,
65+
},
66+
},
67+
{
68+
initAddress: "[2001:db8::2%lo]:6379",
69+
expectedTags: map[string]interface{}{
70+
ext.PeerService: "valkey",
71+
ext.PeerHostname: "2001:db8::2%lo",
72+
ext.PeerPort: 6379,
73+
},
74+
},
75+
{
76+
initAddress: "::1:7777",
77+
expectedTags: map[string]interface{}{
78+
ext.PeerService: "valkey",
79+
ext.PeerHostname: "",
80+
ext.PeerPort: 0,
81+
},
82+
},
83+
{
84+
initAddress: ":::7777",
85+
expectedTags: map[string]interface{}{
86+
ext.PeerService: "valkey",
87+
ext.PeerHostname: "",
88+
ext.PeerPort: 0,
89+
},
90+
},
91+
{
92+
initAddress: "localhost:7777",
93+
expectedTags: map[string]interface{}{
94+
ext.PeerService: "valkey",
95+
ext.PeerHostname: "localhost",
96+
ext.PeerPort: 7777,
97+
},
98+
},
99+
}
100+
for _, tt := range tests {
101+
t.Run(tt.initAddress, func(t *testing.T) {
102+
host, port := splitHostPort(tt.initAddress)
103+
client := coreClient{
104+
host: host,
105+
port: port,
106+
}
107+
var startSpanConfig ddtrace.StartSpanConfig
108+
for _, tag := range client.peerTags() {
109+
tag(&startSpanConfig)
110+
}
111+
require.Equal(t, tt.expectedTags, startSpanConfig.Tags)
112+
})
113+
}
114+
}
115+
38116
func TestNewClient(t *testing.T) {
39117
tests := []struct {
40118
name string
41119
valkeyClientOptions valkey.ClientOption
42-
valkeytraceClientOptions []valkeytrace.ClientOption
120+
valkeytraceClientOptions []ClientOption
43121
valkeytraceClientEnvVars map[string]string
44122
createSpans func(*testing.T, context.Context, valkey.Client)
45123
assertNewClientError func(*testing.T, error)
@@ -74,19 +152,15 @@ func TestNewClient(t *testing.T) {
74152
Username: valkeyUsername,
75153
Password: valkeyPassword,
76154
},
77-
valkeytraceClientOptions: []valkeytrace.ClientOption{
78-
valkeytrace.WithServiceName("my-valkey-client"),
79-
valkeytrace.WithAnalytics(true),
80-
valkeytrace.WithSkipRawCommand(true),
155+
valkeytraceClientOptions: []ClientOption{
156+
WithAnalytics(true),
157+
WithSkipRawCommand(true),
81158
},
82159
createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) {
83160
assert.NoError(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error())
84161
},
85162
assertSpans: []func(t *testing.T, span mocktracer.Span){
86163
func(t *testing.T, span mocktracer.Span) {
87-
assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName))
88-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
89-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
90164
assert.Equal(t, "SET", span.Tag(ext.DBStatement))
91165
assert.Equal(t, "SET", span.Tag(ext.ResourceName))
92166
assert.Greater(t, span.Tag("db.stmt_size"), 0)
@@ -119,9 +193,6 @@ func TestNewClient(t *testing.T) {
119193
},
120194
assertSpans: []func(t *testing.T, span mocktracer.Span){
121195
func(t *testing.T, span mocktracer.Span) {
122-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
123-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
124-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
125196
assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.DBStatement))
126197
assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.ResourceName))
127198
assert.Greater(t, span.Tag("db.stmt_size"), 0)
@@ -155,9 +226,6 @@ func TestNewClient(t *testing.T) {
155226
},
156227
assertSpans: []func(t *testing.T, span mocktracer.Span){
157228
func(t *testing.T, span mocktracer.Span) {
158-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
159-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
160-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
161229
assert.Greater(t, span.Tag("db.stmt_size"), 0)
162230
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement))
163231
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName))
@@ -175,9 +243,6 @@ func TestNewClient(t *testing.T) {
175243
assert.Nil(t, span.Tag(ext.Error))
176244
},
177245
func(t *testing.T, span mocktracer.Span) {
178-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
179-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
180-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
181246
assert.Greater(t, span.Tag("db.stmt_size"), 0)
182247
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement))
183248
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName))
@@ -204,7 +269,6 @@ func TestNewClient(t *testing.T) {
204269
Password: valkeyPassword,
205270
},
206271
valkeytraceClientEnvVars: map[string]string{
207-
"DD_TRACE_VALKEY_SERVICE_NAME": "my-valkey-client",
208272
"DD_TRACE_VALKEY_ANALYTICS_ENABLED": "true",
209273
"DD_TRACE_VALKEY_SKIP_RAW_COMMAND": "true",
210274
},
@@ -214,9 +278,6 @@ func TestNewClient(t *testing.T) {
214278
},
215279
assertSpans: []func(t *testing.T, span mocktracer.Span){
216280
func(t *testing.T, span mocktracer.Span) {
217-
assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName))
218-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
219-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
220281
assert.Equal(t, "GET", span.Tag(ext.DBStatement))
221282
assert.Equal(t, "GET", span.Tag(ext.ResourceName))
222283
assert.Greater(t, span.Tag("db.stmt_size"), 0)
@@ -252,9 +313,6 @@ func TestNewClient(t *testing.T) {
252313
},
253314
assertSpans: []func(t *testing.T, span mocktracer.Span){
254315
func(t *testing.T, span mocktracer.Span) {
255-
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
256-
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
257-
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
258316
assert.Greater(t, span.Tag("db.stmt_size"), 0)
259317
assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.DBStatement))
260318
assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.ResourceName))
@@ -282,7 +340,7 @@ func TestNewClient(t *testing.T) {
282340
for k, v := range tt.valkeytraceClientEnvVars {
283341
t.Setenv(k, v)
284342
}
285-
client, err := valkeytrace.NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...)
343+
client, err := NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...)
286344
if tt.assertNewClientError == nil {
287345
require.NoErrorf(t, err, tt.name)
288346
} else {
@@ -295,6 +353,12 @@ func TestNewClient(t *testing.T) {
295353
for i, span := range spans {
296354
tt.assertSpans[i](t, span)
297355
// Following assertions are common to all spans
356+
assert.Equal(t, "", span.Tag(ext.ServiceName)) // Do not overwrite service name as per DD_APM_PEER_TAGS_AGGREGATION in agent
357+
assert.Equal(t, "valkey", span.Tag(ext.PeerService))
358+
assert.Equal(t, "127.0.0.1", span.Tag(ext.PeerHostIPV4))
359+
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
360+
assert.Equal(t, valkeyPort, span.Tag(ext.PeerPort))
361+
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
298362
assert.NotNil(t, span)
299363
assert.True(t, span.Tag(ext.ValkeyClientCommandWithPassword).(bool))
300364
assert.Equal(t, tt.valkeyClientOptions.Username, span.Tag(ext.DBUser))
@@ -304,6 +368,7 @@ func TestNewClient(t *testing.T) {
304368
assert.Equal(t, "valkey-go/valkey", span.Tag(ext.Component))
305369
assert.Equal(t, "valkey", span.Tag(ext.DBType))
306370
assert.Equal(t, "valkey", span.Tag(ext.DBSystem))
371+
assert.Equal(t, "valkey", span.Tag(ext.DBInstance))
307372
}
308373
})
309374
}

0 commit comments

Comments
 (0)