30
30
streams = flag .Int ("streams" , 1 , "Streams per connection" )
31
31
warmup = flag .Int ("warmup" , 5 , "Warmup in seconds" )
32
32
duration = flag .Int ("duration" , 10 , "Duration in seconds" )
33
+ ctx , cancel = context .WithCancel (context .Background ())
33
34
warmupWg sync.WaitGroup
34
35
finishedWg sync.WaitGroup
35
36
connections []* grpc.ClientConn
@@ -98,6 +99,8 @@ func main() {
98
99
fmt .Print ("Finished warming up\n " )
99
100
time .Sleep (time .Duration (* duration ) * time .Second )
100
101
stopped = true
102
+ cancel ()
103
+ fmt .Print ("Stopping benchmarks\n " )
101
104
}()
102
105
103
106
// Start caller threads for each connection + stream
@@ -107,6 +110,8 @@ func main() {
107
110
// Wait for caller threads to finish
108
111
finishedWg .Wait ()
109
112
113
+ fmt .Print ("Caller threads finished\n " )
114
+
110
115
// Output results
111
116
calculateRequestStatistics ()
112
117
calculateLatencyStatistics ()
@@ -239,8 +244,8 @@ func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int, scenario st
239
244
}
240
245
241
246
start := time .Now ()
242
- if _ , err := client .UnaryCall (context . Background () , request ); err != nil {
243
- handleFailure (connectionID )
247
+ if _ , err := client .UnaryCall (ctx , request ); err != nil {
248
+ handleFailure (connectionID , err )
244
249
} else {
245
250
end := time .Now ()
246
251
handleSuccess (connectionID , start , end )
@@ -259,18 +264,18 @@ func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int, scenario st
259
264
ResponseSize : int32 (* responseSize ),
260
265
}
261
266
262
- stream , err := client .StreamingFromServer (context . Background () , request )
267
+ stream , err := client .StreamingFromServer (ctx , request )
263
268
if err != nil {
264
269
// Wait for warmup to be finished before reporting the call failed
265
270
warmupWg .Wait ()
266
- handleFailure (connectionID )
271
+ handleFailure (connectionID , err )
267
272
return
268
273
}
269
274
270
275
for {
271
276
start := time .Now ()
272
277
if _ , err := stream .Recv (); err != nil {
273
- handleFailure (connectionID )
278
+ handleFailure (connectionID , err )
274
279
} else {
275
280
end := time .Now ()
276
281
handleSuccess (connectionID , start , end )
@@ -284,11 +289,11 @@ func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int, scenario st
284
289
}
285
290
if scenario == "pingpongstreaming" {
286
291
return func () {
287
- stream , err := client .StreamingCall (context . Background () )
292
+ stream , err := client .StreamingCall (ctx )
288
293
if err != nil {
289
294
// Wait for warmup to be finished before reporting the call failed
290
295
warmupWg .Wait ()
291
- handleFailure (connectionID )
296
+ handleFailure (connectionID , err )
292
297
return
293
298
}
294
299
@@ -300,10 +305,10 @@ func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int, scenario st
300
305
301
306
start := time .Now ()
302
307
if err := stream .Send (request ); err != nil {
303
- handleFailure (connectionID )
308
+ handleFailure (connectionID , err )
304
309
} else {
305
310
if _ , err := stream .Recv (); err != nil {
306
- handleFailure (connectionID )
311
+ handleFailure (connectionID , err )
307
312
} else {
308
313
end := time .Now ()
309
314
handleSuccess (connectionID , start , end )
@@ -320,8 +325,12 @@ func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int, scenario st
320
325
return nil
321
326
}
322
327
323
- func handleFailure (connectionID int ) {
324
- if stopped || warmingUp {
328
+ func handleFailure (connectionID int , err error ) {
329
+ if warmingUp {
330
+ return
331
+ }
332
+ if stopped {
333
+ fmt .Printf ("Failure after stop: %v\n " , err )
325
334
return
326
335
}
327
336
connectionLocks [connectionID ].Lock ()
0 commit comments