@@ -10,7 +10,7 @@ import (
10
10
11
11
"context"
12
12
13
- "github.com/opentracing/opentracing-go"
13
+ opentracing "github.com/opentracing/opentracing-go"
14
14
"github.com/opentracing/opentracing-go/ext"
15
15
"google.golang.org/grpc"
16
16
"google.golang.org/grpc/metadata"
@@ -31,12 +31,13 @@ func tracingUnaryClientInterceptor(fr FlightRecorder, tracer opentracing.Tracer)
31
31
invoker grpc.UnaryInvoker ,
32
32
opts ... grpc.CallOption ,
33
33
) error {
34
- fs , ctx , done := fr .WithNewSpan (ctx , formatRPCName (method ))
34
+ obsName := formatRPCName (method )
35
+ fs , ctx , done := fr .WithNewSpan (ctx , obsName )
35
36
defer done ()
36
37
span := fs .TraceSpan ()
37
38
ext .SpanKind .Set (span , ext .SpanKindRPCClientEnum )
38
39
39
- md , ok := metadata .FromContext (ctx )
40
+ md , ok := metadata .FromOutgoingContext (ctx )
40
41
if ! ok {
41
42
md = metadata .New (nil )
42
43
} else {
@@ -47,9 +48,11 @@ func tracingUnaryClientInterceptor(fr FlightRecorder, tracer opentracing.Tracer)
47
48
fs .Warn ("tracer_inject" , "error injecting trace metadata" , Vals {}.WithError (err ))
48
49
}
49
50
50
- ctx = metadata .NewContext (ctx , md )
51
+ ctx = metadata .NewOutgoingContext (ctx , md )
51
52
52
- if err := invoker (ctx , method , req , reply , cc , opts ... ); err != nil {
53
+ err := invoker (ctx , method , req , reply , cc , opts ... )
54
+ fs .Incr (fmt .Sprintf ("grpc_client.%s.%s" , obsName , grpc .Code (err ).String ()))
55
+ if err != nil {
53
56
if ctx .Err () == nil {
54
57
fs .Trace (fmt .Sprintf ("error in gRPC %s" , method ), Vals {}.WithError (err ))
55
58
ext .Error .Set (span , true )
@@ -77,7 +80,7 @@ func tracingStreamClientInterceptor(fr FlightRecorder, tracer opentracing.Tracer
77
80
span := fs .TraceSpan ()
78
81
ext .SpanKind .Set (span , ext .SpanKindRPCClientEnum )
79
82
80
- md , ok := metadata .FromContext (ctx )
83
+ md , ok := metadata .FromOutgoingContext (ctx )
81
84
if ! ok {
82
85
md = metadata .New (nil )
83
86
} else {
@@ -88,9 +91,12 @@ func tracingStreamClientInterceptor(fr FlightRecorder, tracer opentracing.Tracer
88
91
fs .Warn ("tracer_inject" , "error injecting trace metadata" , Vals {}.WithError (err ))
89
92
}
90
93
91
- ctx = metadata .NewContext (ctx , md )
94
+ ctx = metadata .NewOutgoingContext (ctx , md )
92
95
93
96
cs , err := streamer (ctx , desc , cc , method , opts ... )
97
+
98
+ fs .Incr (fmt .Sprintf ("grpc_client.%s.%s" , obsName , grpc .Code (err ).String ()))
99
+
94
100
if err != nil {
95
101
if ctx .Err () == nil {
96
102
fs .Trace (fmt .Sprintf ("error in gRPC %s" , method ), Vals {}.WithError (err ))
@@ -100,7 +106,7 @@ func tracingStreamClientInterceptor(fr FlightRecorder, tracer opentracing.Tracer
100
106
}
101
107
}
102
108
103
- return & clientStreamInterceptor {cs , fr . ScopeName ( obsName ). WithSpan ( ctx ), span , done , 0 , 0 }, err
109
+ return & clientStreamInterceptor {cs , span , done , 0 , 0 }, err
104
110
}
105
111
}
106
112
@@ -111,13 +117,15 @@ func tracingUnaryServerInterceptor(fr FlightRecorder, tracer opentracing.Tracer)
111
117
info * grpc.UnaryServerInfo ,
112
118
handler grpc.UnaryHandler ,
113
119
) (resp interface {}, err error ) {
114
- md , ok := metadata .FromContext (ctx )
120
+ obsName := formatRPCName (info .FullMethod )
121
+ md , ok := metadata .FromIncomingContext (ctx )
115
122
if ! ok {
116
123
md = metadata .New (nil )
117
124
}
125
+
118
126
spanCtx , err := tracer .Extract (opentracing .TextMap , grpcTraceMD (md ))
119
127
120
- fs , ctx , done := fr .WithNewSpanContext (ctx , formatRPCName ( info . FullMethod ) , spanCtx )
128
+ fs , ctx , done := fr .WithNewSpanContext (ctx , obsName , spanCtx )
121
129
defer done ()
122
130
span := fs .TraceSpan ()
123
131
ext .SpanKind .Set (span , ext .SpanKindRPCServerEnum )
@@ -129,6 +137,9 @@ func tracingUnaryServerInterceptor(fr FlightRecorder, tracer opentracing.Tracer)
129
137
130
138
ctx = opentracing .ContextWithSpan (ctx , span )
131
139
resp , err = handler (ctx , req )
140
+
141
+ fs .Incr (fmt .Sprintf ("grpc_server.%s.%s" , obsName , grpc .Code (err ).String ()))
142
+
132
143
if err != nil {
133
144
if ctx .Err () == nil {
134
145
fs .Trace (fmt .Sprintf ("error in gRPC %s" , info .FullMethod ), Vals {}.WithError (err ))
@@ -151,7 +162,7 @@ func tracingStreamServerInterceptor(fr FlightRecorder, tracer opentracing.Tracer
151
162
handler grpc.StreamHandler ,
152
163
) error {
153
164
ctx := ss .Context ()
154
- md , ok := metadata .FromContext (ctx )
165
+ md , ok := metadata .FromIncomingContext (ctx )
155
166
if ! ok {
156
167
md = metadata .New (nil )
157
168
}
@@ -168,9 +179,12 @@ func tracingStreamServerInterceptor(fr FlightRecorder, tracer opentracing.Tracer
168
179
}
169
180
170
181
ctx = opentracing .ContextWithSpan (ctx , span )
171
- ssi := & serverStreamInterceptor {ss , fr . ScopeName ( obsName ). WithSpan ( ctx ), span , done , 0 , 0 , ctx }
182
+ ssi := & serverStreamInterceptor {ss , span , done , 0 , 0 , ctx }
172
183
defer ssi .finish ()
173
- if err := handler (srv , ssi ); err != nil {
184
+
185
+ err = handler (srv , ssi )
186
+ fs .Incr (fmt .Sprintf ("grpc_server.%s.%s" , obsName , grpc .Code (err ).String ()))
187
+ if err != nil {
174
188
if ctx .Err () == nil {
175
189
fs .Trace (fmt .Sprintf ("error in gRPC %s" , info .FullMethod ), Vals {}.WithError (err ))
176
190
ext .Error .Set (span , true )
@@ -186,7 +200,6 @@ func tracingStreamServerInterceptor(fr FlightRecorder, tracer opentracing.Tracer
186
200
187
201
type clientStreamInterceptor struct {
188
202
cs grpc.ClientStream
189
- fs FlightSpan
190
203
span opentracing.Span
191
204
done func ()
192
205
inCount , outCount int
@@ -209,7 +222,6 @@ func (csi *clientStreamInterceptor) Context() context.Context {
209
222
}
210
223
211
224
func (csi * clientStreamInterceptor ) SendMsg (m interface {}) error {
212
- csi .fs .Incr ("stream_sent" )
213
225
csi .outCount ++
214
226
return csi .cs .SendMsg (m )
215
227
}
@@ -223,15 +235,13 @@ func (csi *clientStreamInterceptor) RecvMsg(m interface{}) error {
223
235
return err
224
236
}
225
237
226
- csi .fs .Incr ("stream_received" )
227
238
csi .inCount ++
228
239
229
240
return err
230
241
}
231
242
232
243
type serverStreamInterceptor struct {
233
244
ss grpc.ServerStream
234
- fs FlightSpan
235
245
span opentracing.Span
236
246
done func ()
237
247
inCount , outCount int
@@ -255,13 +265,11 @@ func (ssi *serverStreamInterceptor) Context() context.Context {
255
265
}
256
266
257
267
func (ssi * serverStreamInterceptor ) SendMsg (m interface {}) error {
258
- ssi .fs .Incr ("stream_sent" )
259
268
ssi .outCount ++
260
269
return ssi .ss .SendMsg (m )
261
270
}
262
271
263
272
func (ssi * serverStreamInterceptor ) RecvMsg (m interface {}) error {
264
- ssi .fs .Incr ("stream_received" )
265
273
ssi .inCount ++
266
274
return ssi .ss .RecvMsg (m )
267
275
}
0 commit comments