Skip to content

Commit b3fbd87

Browse files
authored
interop: add ORCA test cases and functionality (#6266)
1 parent 5e58734 commit b3fbd87

File tree

6 files changed

+357
-5
lines changed

6 files changed

+357
-5
lines changed

interop/client/client.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
*/
1818

1919
// Binary client is an interop client.
20+
//
21+
// See interop test case descriptions [here].
22+
//
23+
// [here]: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
2024
package main
2125

2226
import (
@@ -94,7 +98,9 @@ var (
9498
custom_metadata: server will echo custom metadata;
9599
unimplemented_method: client attempts to call unimplemented method;
96100
unimplemented_service: client attempts to call unimplemented service;
97-
pick_first_unary: all requests are sent to one server despite multiple servers are resolved.`)
101+
pick_first_unary: all requests are sent to one server despite multiple servers are resolved;
102+
orca_per_rpc: the client verifies ORCA per-RPC metrics are provided;
103+
orca_oob: the client verifies ORCA out-of-band metrics are provided.`)
98104

99105
logger = grpclog.Component("interop")
100106
)
@@ -308,6 +314,12 @@ func main() {
308314
case "channel_soak":
309315
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
310316
logger.Infoln("ChannelSoak done")
317+
case "orca_per_rpc":
318+
interop.DoORCAPerRPCTest(tc)
319+
logger.Infoln("ORCAPerRPC done")
320+
case "orca_oob":
321+
interop.DoORCAOOBTest(tc)
322+
logger.Infoln("ORCAOOB done")
311323
default:
312324
logger.Fatal("Unsupported test case: ", *testCase)
313325
}

interop/observability/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ require (
1818
contrib.go.opencensus.io/exporter/stackdriver v0.13.12 // indirect
1919
github.com/aws/aws-sdk-go v1.44.162 // indirect
2020
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
21+
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect
22+
github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect
2123
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
2224
github.com/golang/protobuf v1.5.3 // indirect
2325
github.com/google/go-cmp v0.5.9 // indirect

interop/observability/go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH
638638
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
639639
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
640640
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
641+
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 h1:58f1tJ1ra+zFINPlwLWvQsR9CzAKt2e+EWV2yX9oXQ4=
641642
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
642643
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
643644
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -651,6 +652,7 @@ github.com/envoyproxy/go-control-plane v0.11.1-0.20230406144219-ba92d50b6596/go.
651652
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
652653
github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo=
653654
github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w=
655+
github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8=
654656
github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss=
655657
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
656658
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=

interop/orcalb.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package interop
20+
21+
import (
22+
"context"
23+
"fmt"
24+
"sync"
25+
"time"
26+
27+
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
28+
"google.golang.org/grpc/balancer"
29+
"google.golang.org/grpc/balancer/base"
30+
"google.golang.org/grpc/connectivity"
31+
"google.golang.org/grpc/orca"
32+
)
33+
34+
func init() {
35+
balancer.Register(orcabb{})
36+
}
37+
38+
type orcabb struct{}
39+
40+
func (orcabb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
41+
return &orcab{cc: cc}
42+
}
43+
44+
func (orcabb) Name() string {
45+
return "test_backend_metrics_load_balancer"
46+
}
47+
48+
type orcab struct {
49+
cc balancer.ClientConn
50+
sc balancer.SubConn
51+
cancelWatch func()
52+
53+
reportMu sync.Mutex
54+
report *v3orcapb.OrcaLoadReport
55+
}
56+
57+
func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
58+
if o.sc != nil {
59+
o.sc.UpdateAddresses(s.ResolverState.Addresses)
60+
return nil
61+
}
62+
63+
if len(s.ResolverState.Addresses) == 0 {
64+
o.ResolverError(fmt.Errorf("produced no addresses"))
65+
return fmt.Errorf("resolver produced no addresses")
66+
}
67+
var err error
68+
o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
69+
if err != nil {
70+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
71+
return nil
72+
}
73+
o.cancelWatch = orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
74+
o.sc.Connect()
75+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
76+
return nil
77+
}
78+
79+
func (o *orcab) ResolverError(err error) {
80+
if o.sc == nil {
81+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
82+
}
83+
}
84+
85+
func (o *orcab) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) {
86+
if o.sc != sc {
87+
logger.Errorf("received subconn update for unknown subconn: %v vs %v", o.sc, sc)
88+
return
89+
}
90+
switch scState.ConnectivityState {
91+
case connectivity.Ready:
92+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &scPicker{sc: sc, o: o}})
93+
case connectivity.TransientFailure:
94+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", scState.ConnectionError))})
95+
case connectivity.Connecting:
96+
// Ignore; picker already set to "connecting".
97+
case connectivity.Idle:
98+
sc.Connect()
99+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
100+
case connectivity.Shutdown:
101+
// Ignore; we are closing but handle that in Close instead.
102+
}
103+
}
104+
105+
func (o *orcab) Close() {
106+
o.cancelWatch()
107+
}
108+
109+
func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
110+
o.reportMu.Lock()
111+
defer o.reportMu.Unlock()
112+
logger.Infof("received OOB load report: %v", r)
113+
o.report = r
114+
}
115+
116+
type scPicker struct {
117+
sc balancer.SubConn
118+
o *orcab
119+
}
120+
121+
func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
122+
doneCB := func(di balancer.DoneInfo) {
123+
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
124+
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
125+
// Since all RPCs will respond with a load report due to the
126+
// presence of the DialOption, we need to inspect every field and
127+
// use the out-of-band report instead if all are unset/zero.
128+
setContextCMR(info.Ctx, lr)
129+
} else {
130+
p.o.reportMu.Lock()
131+
defer p.o.reportMu.Unlock()
132+
if lr := p.o.report; lr != nil {
133+
setContextCMR(info.Ctx, lr)
134+
}
135+
}
136+
}
137+
return balancer.PickResult{SubConn: p.sc, Done: doneCB}, nil
138+
}
139+
140+
func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
141+
if r := orcaResultFromContext(ctx); r != nil {
142+
*r = lr
143+
}
144+
}
145+
146+
type orcaKey string
147+
148+
var orcaCtxKey = orcaKey("orcaResult")
149+
150+
// contextWithORCAResult sets a key in ctx with a pointer to an ORCA load
151+
// report that is to be filled in by the "test_backend_metrics_load_balancer"
152+
// LB policy's Picker's Done callback.
153+
//
154+
// If a per-call load report is provided from the server for the call, result
155+
// will be filled with that, otherwise the most recent OOB load report is used.
156+
// If no OOB report has been received, result is not modified.
157+
func contextWithORCAResult(ctx context.Context, result **v3orcapb.OrcaLoadReport) context.Context {
158+
return context.WithValue(ctx, orcaCtxKey, result)
159+
}
160+
161+
// orcaResultFromContext returns the ORCA load report stored in the context.
162+
// The LB policy uses this to communicate the load report back to the interop
163+
// client application.
164+
func orcaResultFromContext(ctx context.Context) **v3orcapb.OrcaLoadReport {
165+
v := ctx.Value(orcaCtxKey)
166+
if v == nil {
167+
return nil
168+
}
169+
return v.(**v3orcapb.OrcaLoadReport)
170+
}

interop/server/server.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,25 @@
1717
*/
1818

1919
// Binary server is an interop server.
20+
//
21+
// See interop test case descriptions [here].
22+
//
23+
// [here]: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
2024
package main
2125

2226
import (
2327
"flag"
2428
"net"
2529
"strconv"
30+
"time"
2631

2732
"google.golang.org/grpc"
2833
"google.golang.org/grpc/credentials"
2934
"google.golang.org/grpc/credentials/alts"
3035
"google.golang.org/grpc/grpclog"
36+
"google.golang.org/grpc/internal"
3137
"google.golang.org/grpc/interop"
38+
"google.golang.org/grpc/orca"
3239
"google.golang.org/grpc/testdata"
3340

3441
testgrpc "google.golang.org/grpc/interop/grpc_testing"
@@ -56,7 +63,7 @@ func main() {
5663
logger.Fatalf("failed to listen: %v", err)
5764
}
5865
logger.Infof("interop server listening on %v", lis.Addr())
59-
var opts []grpc.ServerOption
66+
opts := []grpc.ServerOption{orca.CallMetricsServerOption(nil)}
6067
if *useTLS {
6168
if *certFile == "" {
6269
*certFile = testdata.Path("server1.pem")
@@ -78,6 +85,13 @@ func main() {
7885
opts = append(opts, grpc.Creds(altsTC))
7986
}
8087
server := grpc.NewServer(opts...)
81-
testgrpc.RegisterTestServiceServer(server, interop.NewTestServer())
88+
metricsRecorder := orca.NewServerMetricsRecorder()
89+
sopts := orca.ServiceOptions{
90+
MinReportingInterval: time.Second,
91+
ServerMetricsProvider: metricsRecorder,
92+
}
93+
internal.ORCAAllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&sopts)
94+
orca.Register(server, sopts)
95+
testgrpc.RegisterTestServiceServer(server, interop.NewTestServer(interop.NewTestServerOptions{MetricsRecorder: metricsRecorder}))
8296
server.Serve(lis)
8397
}

0 commit comments

Comments
 (0)