Skip to content

Commit 0668a43

Browse files
committed
Support streaming calls in go gRPC client
1 parent f1f11b2 commit 0668a43

File tree

1 file changed

+91
-19
lines changed
  • src/BenchmarksApps/Grpc/GoClient

1 file changed

+91
-19
lines changed

src/BenchmarksApps/Grpc/GoClient/main.go

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ var (
3030
streams = flag.Int("streams", 1, "Streams per connection")
3131
warmup = flag.Int("warmup", 5, "Warmup in seconds")
3232
duration = flag.Int("duration", 10, "Duration in seconds")
33-
wg sync.WaitGroup
33+
warmupWg sync.WaitGroup
34+
finishedWg sync.WaitGroup
3435
connections []*grpc.ClientConn
3536
connectionLocks []sync.Mutex
3637
requestsPerConnection []int
@@ -84,24 +85,29 @@ func main() {
8485
opts = append(opts, grpc.WithInsecure())
8586
}
8687

87-
opts = append(opts, grpc.WithBlock())
88-
88+
// Create connections and related collections
8989
buildConnections(context.Background(), opts)
9090

91+
// Start background thread to track warmup and duration
9192
go func() {
9293
warmingUp = true
94+
warmupWg.Add(1)
9395
time.Sleep(time.Duration(*warmup) * time.Second)
9496
warmingUp = false
97+
warmupWg.Done()
9598
fmt.Print("Finished warming up\n")
9699
time.Sleep(time.Duration(*duration) * time.Second)
97100
stopped = true
98101
}()
99102

103+
// Start caller threads for each connection + stream
100104
for connectionID, cc := range connections {
101105
runWithConn(connectionID, cc)
102106
}
103-
wg.Wait()
107+
// Wait for caller threads to finish
108+
finishedWg.Wait()
104109

110+
// Output results
105111
calculateRequestStatistics()
106112
calculateLatencyStatistics()
107113
}
@@ -208,44 +214,110 @@ func calculateLatencyStatistics() {
208214
func runWithConn(connectionID int, cc *grpc.ClientConn) {
209215
for i := 0; i < *streams; i++ {
210216
streamID := i
211-
wg.Add(1)
217+
finishedWg.Add(1)
212218
go func() {
213-
defer wg.Done()
214-
caller := makeCaller(cc, connectionID, streamID)
219+
defer finishedWg.Done()
220+
caller := makeCaller(cc, connectionID, streamID, *scenario)
221+
if caller == nil {
222+
log.Fatalf("Unsupported scenario: %s", *scenario)
223+
}
215224
fmt.Printf("Starting %d %d\n", connectionID, streamID)
216225
caller()
217226
fmt.Printf("Finished %d %d\n", connectionID, streamID)
218227
}()
219228
}
220229
}
221230

222-
func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int) func() {
231+
func makeCaller(cc *grpc.ClientConn, connectionID int, streamID int, scenario string) func() {
223232
client := testpb.NewBenchmarkServiceClient(cc)
224-
if *scenario != "unary" {
225-
log.Fatalf("Unsupported scenario: %s", *scenario)
233+
if scenario == "unary" {
234+
return func() {
235+
for {
236+
request := &testpb.SimpleRequest{
237+
Payload: NewPayload(int(*requestSize)),
238+
ResponseSize: int32(*responseSize),
239+
}
240+
241+
start := time.Now()
242+
if _, err := client.UnaryCall(context.Background(), request); err != nil {
243+
handleFailure(connectionID)
244+
} else {
245+
end := time.Now()
246+
handleSuccess(connectionID, start, end)
247+
}
248+
249+
if stopped {
250+
return
251+
}
252+
}
253+
}
226254
}
227-
return func() {
228-
for {
255+
if scenario == "serverstreaming" {
256+
return func() {
229257
request := &testpb.SimpleRequest{
230258
Payload: NewPayload(int(*requestSize)),
231259
ResponseSize: int32(*responseSize),
232260
}
233261

234-
start := time.Now()
235-
_, err := client.UnaryCall(context.Background(), request)
236-
end := time.Now()
237-
262+
stream, err := client.StreamingFromServer(context.Background(), request)
238263
if err != nil {
264+
// Wait for warmup to be finished before reporting the call failed
265+
warmupWg.Wait()
239266
handleFailure(connectionID)
240-
} else {
241-
handleSuccess(connectionID, start, end)
267+
return
242268
}
243269

244-
if stopped {
270+
for {
271+
start := time.Now()
272+
if _, err := stream.Recv(); err != nil {
273+
handleFailure(connectionID)
274+
} else {
275+
end := time.Now()
276+
handleSuccess(connectionID, start, end)
277+
}
278+
279+
if stopped {
280+
return
281+
}
282+
}
283+
}
284+
}
285+
if scenario == "pingpongstreaming" {
286+
return func() {
287+
stream, err := client.StreamingCall(context.Background())
288+
if err != nil {
289+
// Wait for warmup to be finished before reporting the call failed
290+
warmupWg.Wait()
291+
handleFailure(connectionID)
245292
return
246293
}
294+
295+
for {
296+
request := &testpb.SimpleRequest{
297+
Payload: NewPayload(int(*requestSize)),
298+
ResponseSize: int32(*responseSize),
299+
}
300+
301+
start := time.Now()
302+
if err := stream.Send(request); err != nil {
303+
handleFailure(connectionID)
304+
} else {
305+
if _, err := stream.Recv(); err != nil {
306+
handleFailure(connectionID)
307+
} else {
308+
end := time.Now()
309+
handleSuccess(connectionID, start, end)
310+
}
311+
}
312+
313+
if stopped {
314+
return
315+
}
316+
}
247317
}
248318
}
319+
320+
return nil
249321
}
250322

251323
func handleFailure(connectionID int) {

0 commit comments

Comments
 (0)