Skip to content

Commit

Permalink
Polish gRPC-go adapter (#102)
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 authored Mar 19, 2020
1 parent 92bdc64 commit 6388225
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 48 deletions.
38 changes: 23 additions & 15 deletions adapter/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"google.golang.org/grpc"
)

// SentinelUnaryClientIntercept returns new grpc.UnaryClientInterceptor instance
func SentinelUnaryClientIntercept(opts ...Option) grpc.UnaryClientInterceptor {
// NewUnaryClientInterceptor creates the unary client interceptor wrapped with Sentinel entry.
func NewUnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
options := evaluateOptions(opts)
return func(
ctx context.Context,
Expand All @@ -25,25 +25,29 @@ func SentinelUnaryClientIntercept(opts ...Option) grpc.UnaryClientInterceptor {
resourceName = options.unaryClientResourceExtract(ctx, method, req, cc)
}

entry, err := sentinel.Entry(
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if err != nil {
if blockErr != nil {
if options.unaryClientBlockFallback != nil {
return options.unaryClientBlockFallback(ctx, method, req, cc, err)
return options.unaryClientBlockFallback(ctx, method, req, cc, blockErr)
}
return err
return blockErr
}
defer entry.Exit()

return invoker(ctx, method, req, reply, cc, opts...)
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
sentinel.TraceErrorToEntry(entry, err)
}
return err
}
}

// SentinelStreamClientIntercept returns new grpc.StreamClientInterceptor instance
func SentinelStreamClientIntercept(opts ...Option) grpc.StreamClientInterceptor {
// NewStreamClientInterceptor creates the stream client interceptor wrapped with Sentinel entry.
func NewStreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
options := evaluateOptions(opts)
return func(
ctx context.Context,
Expand All @@ -59,20 +63,24 @@ func SentinelStreamClientIntercept(opts ...Option) grpc.StreamClientInterceptor
resourceName = options.streamClientResourceExtract(ctx, desc, cc, method)
}

entry, err := sentinel.Entry(
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if err != nil { // blocked
if blockErr != nil { // blocked
if options.streamClientBlockFallback != nil {
return options.streamClientBlockFallback(ctx, desc, cc, method, err)
return options.streamClientBlockFallback(ctx, desc, cc, method, blockErr)
}
return nil, err
return nil, blockErr
}

defer entry.Exit()

return streamer(ctx, desc, cc, method, opts...)
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
sentinel.TraceErrorToEntry(entry, err)
}

return cs, err
}
}
20 changes: 13 additions & 7 deletions adapter/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import (

func TestUnaryClientIntercept(t *testing.T) {
const errMsgFake = "fake error"
interceptor := SentinelUnaryClientIntercept()
interceptor := NewUnaryClientInterceptor(WithUnaryClientResourceExtractor(
func(ctx context.Context, method string, i interface{}, conn *grpc.ClientConn) string {
return "client:" + method
}))
invoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
opts ...grpc.CallOption) error {
return errors.New(errMsgFake)
}
method := "/grpc.testing.TestService/UnaryCall"
t.Run("success", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/UnaryCall",
Resource: "client:" + method,
MetricType: flow.QPS,
Count: 1,
ControlBehavior: flow.Reject,
Expand All @@ -40,7 +43,7 @@ func TestUnaryClientIntercept(t *testing.T) {
t.Run("fail", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/UnaryCall",
Resource: "client:" + method,
MetricType: flow.QPS,
Count: 0,
ControlBehavior: flow.Reject,
Expand All @@ -54,7 +57,10 @@ func TestUnaryClientIntercept(t *testing.T) {

func TestStreamClientIntercept(t *testing.T) {
const errMsgFake = "fake error"
interceptor := SentinelStreamClientIntercept()
interceptor := NewStreamClientInterceptor(WithStreamClientResourceExtractor(
func(ctx context.Context, desc *grpc.StreamDesc, conn *grpc.ClientConn, method string) string {
return "client:" + method
}))
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, errors.New(errMsgFake)
Expand All @@ -63,7 +69,7 @@ func TestStreamClientIntercept(t *testing.T) {
t.Run("success", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/StreamingOutputCall",
Resource: "client:/grpc.testing.TestService/StreamingOutputCall",
MetricType: flow.QPS,
Count: 1,
ControlBehavior: flow.Reject,
Expand All @@ -83,7 +89,7 @@ func TestStreamClientIntercept(t *testing.T) {
t.Run("fail", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/StreamingOutputCall",
Resource: "client:/grpc.testing.TestService/StreamingOutputCall",
MetricType: flow.QPS,
Count: 0,
ControlBehavior: flow.Reject,
Expand Down
18 changes: 10 additions & 8 deletions adapter/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,58 @@ type (
}
)

// WithUnaryClientResourceExtractor set unaryClientResourceExtract
// WithUnaryClientResourceExtractor sets the resource extractor of unary client request.
// The second string parameter is the full method name of current invocation.
func WithUnaryClientResourceExtractor(fn func(context.Context, string, interface{}, *grpc.ClientConn) string) Option {
return func(opts *options) {
opts.unaryClientResourceExtract = fn
}
}

// WithUnaryServerResourceExtractor set unaryServerResourceExtract
// WithUnaryServerResourceExtractor sets the resource extractor of unary server request.
func WithUnaryServerResourceExtractor(fn func(context.Context, interface{}, *grpc.UnaryServerInfo) string) Option {
return func(opts *options) {
opts.unaryServerResourceExtract = fn
}
}

// WithStreamClientResourceExtractor set streamClientResourceExtract
// WithStreamClientResourceExtractor sets the resource extractor of stream client request.
func WithStreamClientResourceExtractor(fn func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string) string) Option {
return func(opts *options) {
opts.streamClientResourceExtract = fn
}
}

// WithStreamServerResourceExtractor set streamServerResourceExtract
// WithStreamServerResourceExtractor sets the resource extractor of stream server request.
func WithStreamServerResourceExtractor(fn func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo) string) Option {
return func(opts *options) {
opts.streamServerResourceExtract = fn
}
}

// WithUnaryClientBlockFallback set unaryClientBlockFallback
// WithUnaryClientBlockFallback sets the block fallback handler of unary client request.
// The second string parameter is the full method name of current invocation.
func WithUnaryClientBlockFallback(fn func(context.Context, string, interface{}, *grpc.ClientConn, *base.BlockError) error) Option {
return func(opts *options) {
opts.unaryClientBlockFallback = fn
}
}

// WithUnaryServerBlockFallback set unaryServerBlockFallback
// WithUnaryServerBlockFallback sets the block fallback handler of unary server request.
func WithUnaryServerBlockFallback(fn func(context.Context, interface{}, *grpc.UnaryServerInfo, *base.BlockError) (interface{}, error)) Option {
return func(opts *options) {
opts.unaryServerBlockFallback = fn
}
}

// WithStreamClientBlockFallback set streamClientBlockFallback
// WithStreamClientBlockFallback sets the block fallback handler of stream client request.
func WithStreamClientBlockFallback(fn func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string, *base.BlockError) (grpc.ClientStream, error)) Option {
return func(opts *options) {
opts.streamClientBlockFallback = fn
}
}

// WithStreamServerBlockFallback set streamServerBlockFallback
// WithStreamServerBlockFallback sets the block fallback handler of stream server request.
func WithStreamServerBlockFallback(fn func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, *base.BlockError) error) Option {
return func(opts *options) {
opts.streamServerBlockFallback = fn
Expand Down
38 changes: 24 additions & 14 deletions adapter/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"google.golang.org/grpc"
)

// SentinelUnaryServerIntercept implements gRPC unary server interceptor interface
func SentinelUnaryServerIntercept(opts ...Option) grpc.UnaryServerInterceptor {
// NewUnaryServerInterceptor creates the unary server interceptor wrapped with Sentinel entry.
func NewUnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
options := evaluateOptions(opts)
return func(
ctx context.Context,
Expand All @@ -22,24 +22,29 @@ func SentinelUnaryServerIntercept(opts ...Option) grpc.UnaryServerInterceptor {
if options.unaryServerResourceExtract != nil {
resourceName = options.unaryServerResourceExtract(ctx, req, info)
}
entry, err := sentinel.Entry(
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
)
if err != nil {
if blockErr != nil {
if options.unaryServerBlockFallback != nil {
return options.unaryServerBlockFallback(ctx, req, info, err)
return options.unaryServerBlockFallback(ctx, req, info, blockErr)
}
return nil, err
return nil, blockErr
}
defer entry.Exit()
return handler(ctx, req)

res, err := handler(ctx, req)
if err != nil {
sentinel.TraceErrorToEntry(entry, err)
}
return res, err
}
}

// SentinelStreamServerIntercept implements gRPC stream server interceptor interface
func SentinelStreamServerIntercept(opts ...Option) grpc.StreamServerInterceptor {
// NewStreamServerInterceptor creates the unary stream interceptor wrapped with Sentinel entry.
func NewStreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
options := evaluateOptions(opts)
return func(
srv interface{},
Expand All @@ -52,18 +57,23 @@ func SentinelStreamServerIntercept(opts ...Option) grpc.StreamServerInterceptor
if options.streamServerResourceExtract != nil {
resourceName = options.streamServerResourceExtract(srv, ss, info)
}
entry, err := sentinel.Entry(
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
)
if err != nil { // blocked
if blockErr != nil { // blocked
if options.streamServerBlockFallback != nil {
return options.streamServerBlockFallback(srv, ss, info, err)
return options.streamServerBlockFallback(srv, ss, info, blockErr)
}
return err
return blockErr
}
defer entry.Exit()
return handler(srv, ss)

err := handler(srv, ss)
if err != nil {
sentinel.TraceErrorToEntry(entry, err)
}
return err
}
}
16 changes: 12 additions & 4 deletions adapter/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"context"
"errors"
"github.com/alibaba/sentinel-golang/core/stat"
"testing"

sentinel "github.com/alibaba/sentinel-golang/api"
Expand All @@ -19,7 +20,7 @@ func TestMain(m *testing.M) {

func TestStreamServerIntercept(t *testing.T) {
const errMsgFake = "fake error"
interceptor := SentinelStreamServerIntercept()
interceptor := NewStreamServerInterceptor()
handler := func(srv interface{}, stream grpc.ServerStream) error {
return errors.New(errMsgFake)
}
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestStreamServerIntercept(t *testing.T) {

func TestUnaryServerIntercept(t *testing.T) {
const errMsgFake = "fake error"
interceptor := SentinelUnaryServerIntercept()
interceptor := NewUnaryServerInterceptor()
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, errors.New(errMsgFake)
}
Expand All @@ -82,13 +83,21 @@ func TestUnaryServerIntercept(t *testing.T) {
rep, err := interceptor(nil, nil, info, handler)
assert.EqualError(t, err, errMsgFake)
assert.Nil(t, rep)
// Test for recording the biz error.
assert.EqualValues(t, 1, int(stat.GetResourceNode(info.FullMethod).GetQPS(base.MetricEventError)))

t.Run("second fail", func(t *testing.T) {
rep, err := interceptor(nil, nil, info, handler)
assert.IsType(t, &base.BlockError{}, err)
assert.Nil(t, rep)

assert.EqualValues(t, 1, int(stat.GetResourceNode(info.FullMethod).GetQPS(base.MetricEventError)))
})
})

successHandler := func(ctx context.Context, req interface{}) (interface{}, error) {
return "abc", nil
}
t.Run("fail", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Expand All @@ -99,9 +108,8 @@ func TestUnaryServerIntercept(t *testing.T) {
},
})
assert.Nil(t, err)
rep, err := interceptor(nil, nil, info, handler)
rep, err := interceptor(nil, nil, info, successHandler)
assert.IsType(t, &base.BlockError{}, err)
assert.Nil(t, rep)
})
}

0 comments on commit 6388225

Please sign in to comment.