Skip to content

Commit ce35fd4

Browse files
stats/opentelemetry: add trace event for name resolution delay (#8074)
1 parent 52c643e commit ce35fd4

File tree

9 files changed

+358
-28
lines changed

9 files changed

+358
-28
lines changed

clientconn.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -689,22 +689,31 @@ func (cc *ClientConn) Connect() {
689689
cc.mu.Unlock()
690690
}
691691

692-
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
693-
// context expires. Returns nil unless the context expires first; otherwise
694-
// returns a status error based on the context.
695-
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
692+
// waitForResolvedAddrs blocks until the resolver provides addresses or the
693+
// context expires, whichever happens first.
694+
//
695+
// Error is nil unless the context expires first; otherwise returns a status
696+
// error based on the context.
697+
//
698+
// The returned boolean indicates whether it did block or not. If the
699+
// resolution has already happened once before, it returns false without
700+
// blocking. Otherwise, it wait for the resolution and return true if
701+
// resolution has succeeded or return false along with error if resolution has
702+
// failed.
703+
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) (bool, error) {
696704
// This is on the RPC path, so we use a fast path to avoid the
697705
// more-expensive "select" below after the resolver has returned once.
698706
if cc.firstResolveEvent.HasFired() {
699-
return nil
707+
return false, nil
700708
}
709+
internal.NewStreamWaitingForResolver()
701710
select {
702711
case <-cc.firstResolveEvent.Done():
703-
return nil
712+
return true, nil
704713
case <-ctx.Done():
705-
return status.FromContextError(ctx.Err()).Err()
714+
return false, status.FromContextError(ctx.Err()).Err()
706715
case <-cc.ctx.Done():
707-
return ErrClientConnClosing
716+
return false, ErrClientConnClosing
708717
}
709718
}
710719

internal/internal.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,13 @@ var (
266266
TimeAfterFunc = func(d time.Duration, f func()) Timer {
267267
return time.AfterFunc(d, f)
268268
}
269+
270+
// NewStreamWaitingForResolver is a test hook that is triggered when a
271+
// new stream blocks while waiting for name resolution. This can be
272+
// used in tests to synchronize resolver updates and avoid race conditions.
273+
// When set, the function will be called before the stream enters
274+
// the blocking state.
275+
NewStreamWaitingForResolver = func() {}
269276
)
270277

271278
// HealthChecker defines the signature of the client-side LB channel health

stats/handlers.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ type RPCTagInfo struct {
3838
// FailFast indicates if this RPC is failfast.
3939
// This field is only valid on client side, it's always false on server side.
4040
FailFast bool
41+
// NameResolutionDelay indicates if the RPC needed to wait for the
42+
// initial name resolver update before it could begin. This should only
43+
// happen if the channel is IDLE when the RPC is started. Note that
44+
// all retry or hedging attempts for an RPC that experienced a delay
45+
// will have it set.
46+
//
47+
// This field is only valid on the client side; it is always false on
48+
// the server side.
49+
NameResolutionDelay bool
4150
}
4251

4352
// Handler defines the interface for the related stats handling (e.g., RPCs, connections).

stats/opentelemetry/client_metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
191191
method: removeLeadingSlash(info.FullMethodName),
192192
}
193193
if h.options.isTracingEnabled() {
194-
ctx, ai = h.traceTagRPC(ctx, ai)
194+
ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay)
195195
}
196196
return setRPCInfo(ctx, &rpcInfo{
197197
ai: ai,

stats/opentelemetry/client_tracing.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,25 @@ import (
2525
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
2626
)
2727

28-
const tracerName = "grpc-go"
28+
const (
29+
delayedResolutionEventName = "Delayed name resolution complete"
30+
tracerName = "grpc-go"
31+
)
2932

3033
// traceTagRPC populates provided context with a new span using the
3134
// TextMapPropagator supplied in trace options and internal itracing.carrier.
3235
// It creates a new outgoing carrier which serializes information about this
3336
// span into gRPC Metadata, if TextMapPropagator is provided in the trace
3437
// options. if TextMapPropagator is not provided, it returns the context as is.
35-
func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo) (context.Context, *attemptInfo) {
38+
func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo, nameResolutionDelayed bool) (context.Context, *attemptInfo) {
39+
// Add a "Delayed name resolution complete" event to the call span
40+
// if there was name resolution delay. In case of multiple retry attempts,
41+
// ensure that event is added only once.
42+
callSpan := trace.SpanFromContext(ctx)
43+
ci := getCallInfo(ctx)
44+
if nameResolutionDelayed && !ci.nameResolutionEventAdded.Swap(true) && callSpan.SpanContext().IsValid() {
45+
callSpan.AddEvent(delayedResolutionEventName)
46+
}
3647
mn := "Attempt." + strings.Replace(ai.method, "/", ".", -1)
3748
tracer := h.options.TraceOptions.TracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(grpc.Version))
3849
ctx, span := tracer.Start(ctx, mn)

stats/opentelemetry/e2e_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"io"
2323
"slices"
24+
"strconv"
2425
"testing"
2526
"time"
2627

@@ -48,9 +49,11 @@ import (
4849
"go.opentelemetry.io/otel/sdk/trace"
4950
"go.opentelemetry.io/otel/sdk/trace/tracetest"
5051
"google.golang.org/grpc"
52+
"google.golang.org/grpc/codes"
5153
"google.golang.org/grpc/credentials/insecure"
5254
"google.golang.org/grpc/encoding/gzip"
5355
experimental "google.golang.org/grpc/experimental/opentelemetry"
56+
"google.golang.org/grpc/internal"
5457
"google.golang.org/grpc/internal/grpcsync"
5558
"google.golang.org/grpc/internal/grpctest"
5659
"google.golang.org/grpc/internal/stubserver"
@@ -59,9 +62,13 @@ import (
5962
setup "google.golang.org/grpc/internal/testutils/xds/e2e/setup"
6063
testgrpc "google.golang.org/grpc/interop/grpc_testing"
6164
testpb "google.golang.org/grpc/interop/grpc_testing"
65+
"google.golang.org/grpc/metadata"
6266
"google.golang.org/grpc/orca"
67+
"google.golang.org/grpc/resolver"
68+
"google.golang.org/grpc/resolver/manual"
6369
"google.golang.org/grpc/stats/opentelemetry"
6470
"google.golang.org/grpc/stats/opentelemetry/internal/testutils"
71+
"google.golang.org/grpc/status"
6572
)
6673

6774
var defaultTestTimeout = 5 * time.Second
@@ -1560,3 +1567,168 @@ func (s) TestRPCSpanErrorStatus(t *testing.T) {
15601567
t.Fatalf("got rpc error %s, want %s", spans[0].Status.Description, rpcErrorMsg)
15611568
}
15621569
}
1570+
1571+
const delayedResolutionEventName = "Delayed name resolution complete"
1572+
1573+
// TestTraceSpan_WithRetriesAndNameResolutionDelay verifies that
1574+
// "Delayed name resolution complete" event is recorded in the call trace span
1575+
// only once if any of the retry attempt encountered a delay in name resolution
1576+
func (s) TestTraceSpan_WithRetriesAndNameResolutionDelay(t *testing.T) {
1577+
tests := []struct {
1578+
name string
1579+
setupStub func() *stubserver.StubServer
1580+
doCall func(context.Context, testgrpc.TestServiceClient) error
1581+
spanName string
1582+
}{
1583+
{
1584+
name: "unary",
1585+
setupStub: func() *stubserver.StubServer {
1586+
return &stubserver.StubServer{
1587+
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
1588+
md, _ := metadata.FromIncomingContext(ctx)
1589+
headerAttempts := 0
1590+
if h := md["grpc-previous-rpc-attempts"]; len(h) > 0 {
1591+
headerAttempts, _ = strconv.Atoi(h[0])
1592+
}
1593+
if headerAttempts < 2 {
1594+
return nil, status.Errorf(codes.Unavailable, "retry (%d)", headerAttempts)
1595+
}
1596+
return &testpb.SimpleResponse{}, nil
1597+
},
1598+
}
1599+
},
1600+
doCall: func(ctx context.Context, client testgrpc.TestServiceClient) error {
1601+
_, err := client.UnaryCall(ctx, &testpb.SimpleRequest{})
1602+
return err
1603+
},
1604+
spanName: "grpc.testing.TestService.UnaryCall",
1605+
},
1606+
{
1607+
name: "streaming",
1608+
setupStub: func() *stubserver.StubServer {
1609+
return &stubserver.StubServer{
1610+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
1611+
md, _ := metadata.FromIncomingContext(stream.Context())
1612+
headerAttempts := 0
1613+
if h := md["grpc-previous-rpc-attempts"]; len(h) > 0 {
1614+
headerAttempts, _ = strconv.Atoi(h[0])
1615+
}
1616+
if headerAttempts < 2 {
1617+
return status.Errorf(codes.Unavailable, "retry (%d)", headerAttempts)
1618+
}
1619+
for {
1620+
_, err := stream.Recv()
1621+
if err == io.EOF {
1622+
return nil
1623+
}
1624+
if err != nil {
1625+
return err
1626+
}
1627+
}
1628+
},
1629+
}
1630+
},
1631+
doCall: func(ctx context.Context, client testgrpc.TestServiceClient) error {
1632+
stream, err := client.FullDuplexCall(ctx)
1633+
if err != nil {
1634+
return err
1635+
}
1636+
if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
1637+
return err
1638+
}
1639+
if err := stream.CloseSend(); err != nil {
1640+
return err
1641+
}
1642+
_, err = stream.Recv()
1643+
if err != nil && err != io.EOF {
1644+
return err
1645+
}
1646+
return nil
1647+
},
1648+
spanName: "grpc.testing.TestService.FullDuplexCall",
1649+
},
1650+
}
1651+
1652+
for _, tt := range tests {
1653+
t.Run(tt.name, func(t *testing.T) {
1654+
resolutionWait := grpcsync.NewEvent()
1655+
prevHook := internal.NewStreamWaitingForResolver
1656+
internal.NewStreamWaitingForResolver = func() { resolutionWait.Fire() }
1657+
defer func() { internal.NewStreamWaitingForResolver = prevHook }()
1658+
1659+
mo, _ := defaultMetricsOptions(t, nil)
1660+
to, exporter := defaultTraceOptions(t)
1661+
rb := manual.NewBuilderWithScheme("delayed")
1662+
ss := tt.setupStub()
1663+
opts := opentelemetry.Options{MetricsOptions: *mo, TraceOptions: *to}
1664+
if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opts)}); err != nil {
1665+
t.Fatal(err)
1666+
}
1667+
defer ss.Stop()
1668+
1669+
retryPolicy := `{
1670+
"methodConfig": [{
1671+
"name": [{"service": "grpc.testing.TestService"}],
1672+
"retryPolicy": {
1673+
"maxAttempts": 3,
1674+
"initialBackoff": "0.05s",
1675+
"maxBackoff": "0.2s",
1676+
"backoffMultiplier": 1.0,
1677+
"retryableStatusCodes": ["UNAVAILABLE"]
1678+
}
1679+
}]
1680+
}`
1681+
cc, err := grpc.NewClient(
1682+
rb.Scheme()+":///test.server",
1683+
grpc.WithTransportCredentials(insecure.NewCredentials()),
1684+
grpc.WithResolvers(rb),
1685+
opentelemetry.DialOption(opts),
1686+
grpc.WithDefaultServiceConfig(retryPolicy),
1687+
)
1688+
if err != nil {
1689+
t.Fatal(err)
1690+
}
1691+
defer cc.Close()
1692+
1693+
client := testgrpc.NewTestServiceClient(cc)
1694+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1695+
defer cancel()
1696+
1697+
go func() {
1698+
<-resolutionWait.Done()
1699+
rb.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}})
1700+
}()
1701+
if err := tt.doCall(ctx, client); err != nil {
1702+
t.Fatalf("%s call failed: %v", tt.name, err)
1703+
}
1704+
1705+
wantSpanInfo := traceSpanInfo{
1706+
name: tt.spanName,
1707+
spanKind: oteltrace.SpanKindClient.String(),
1708+
events: []trace.Event{{Name: delayedResolutionEventName}},
1709+
}
1710+
spans, err := waitForTraceSpans(ctx, exporter, []traceSpanInfo{wantSpanInfo})
1711+
if err != nil {
1712+
t.Fatal(err)
1713+
}
1714+
verifyTrace(t, spans, wantSpanInfo)
1715+
})
1716+
}
1717+
}
1718+
1719+
func verifyTrace(t *testing.T, spans tracetest.SpanStubs, want traceSpanInfo) {
1720+
match := false
1721+
for _, span := range spans {
1722+
if span.Name == want.name && span.SpanKind.String() == want.spanKind {
1723+
match = true
1724+
if diff := cmp.Diff(want.events, span.Events, cmpopts.IgnoreFields(trace.Event{}, "Time")); diff != "" {
1725+
t.Errorf("Span event mismatch for %q (kind: %s) (-want +got):\n%s",
1726+
want.name, want.spanKind, diff)
1727+
}
1728+
break
1729+
}
1730+
}
1731+
if !match {
1732+
t.Errorf("Expected span not found: %q (kind: %s)", want.name, want.spanKind)
1733+
}
1734+
}

stats/opentelemetry/opentelemetry.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package opentelemetry
2525
import (
2626
"context"
2727
"strings"
28+
"sync/atomic"
2829
"time"
2930

3031
otelattribute "go.opentelemetry.io/otel/attribute"
@@ -148,6 +149,10 @@ type callInfo struct {
148149
target string
149150

150151
method string
152+
153+
// nameResolutionEventAdded is set when the resolver delay trace event
154+
// is added. Prevents duplicate events, since it is reported per-attempt.
155+
nameResolutionEventAdded atomic.Bool
151156
}
152157

153158
type callInfoKey struct{}

0 commit comments

Comments
 (0)