Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions pkg/adapters/kratos/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package kratos

import (
"context"
"fmt"
"net/http"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"

"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/transport"
khttp "github.com/go-kratos/kratos/v2/transport/http"
)

// SentinelClientMiddleware returns new middleware.Handler for kratos http/grpc client.
// Default resource name pattern is {httpMethod}:{apiPath}, such as "GET:/api/:id".
// Default block fallback is to return 429 (Too Many Requests) response.
//
// You may customize your own resource extractor and block handler by setting options.
func SentinelClientMiddleware(opts ...Option) middleware.Middleware {
options := evaluateOptions(opts)
return func(next middleware.Handler) middleware.Handler {
return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
var resourceName string
var resType base.ResourceType
tr, ok := transport.FromClientContext(ctx)
if !ok {
err = errors.New(http.StatusBadRequest, "Bad Request", "failed to extract request from context")
return nil, err
}
if options.resourceExtract != nil {
resourceName = options.resourceExtract(ctx, req)
} else {
switch tr.Kind() {
case transport.KindGRPC:
resourceName = tr.Operation()
resType = base.ResTypeRPC
case transport.KindHTTP:
http_tr := tr.(khttp.Transporter)
resourceName = fmt.Sprintf("%s:%s", http_tr.Request().Method, http_tr.PathTemplate())
resType = base.ResTypeWeb
default:
err = errors.New(http.StatusBadRequest, "Bad Request", fmt.Sprintf("unsupported transport kind: %s", tr.Kind()))
return nil, err
}
}
// start building sentinel entry
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(resType),
sentinel.WithTrafficType(base.Outbound),
)
if blockErr != nil {
if options.blockFallback != nil {
reply, err = options.blockFallback(ctx, req)
return
} else {
switch tr.Kind() {
case transport.KindGRPC:
err = blockErr
case transport.KindHTTP:
err = errors.New(http.StatusTooManyRequests, "Too many requests", blockErr.Error())
}
return nil, err
}
}
defer entry.Exit()

reply, err = next(ctx, req)
return
}

}
}
201 changes: 201 additions & 0 deletions pkg/adapters/kratos/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package kratos

import (
"context"
"fmt"
"io"
"net"
"net/http"
"testing"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/flow"

"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/transport"
khttp "github.com/go-kratos/kratos/v2/transport/http"
"github.com/stretchr/testify/assert"
)

func initClientSentinel(t *testing.T) {
err := sentinel.InitDefault()
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}

_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "GET:/client",
Threshold: 1.0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatIntervalInMs: 1000,
},
{
Resource: "/api/123",
Threshold: 0.0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatIntervalInMs: 1000,
},
})
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
return
}
}

func TestSentinelClientMiddleware(t *testing.T) {
type args struct {
opts []Option
method string
path string
reqPath string
handler khttp.HandlerFunc
body io.Reader
}
type want struct {
code int
}
var (
tests = []struct {
name string
args args
want want
}{
{
name: "default get",
args: args{
opts: []Option{},
method: http.MethodGet,
path: "/client",
reqPath: "/client",
handler: khttp.HandlerFunc(func(ctx khttp.Context) error {
khttp.SetOperation(ctx, "/test/client")
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return "client", nil
})
// may use `ctx.BindQuery()` and `ctx.BindVars()` to build `req` for `h`
out, err := h(ctx, nil)
if err != nil {
return err
}
return ctx.Result(200, fmt.Sprintf("%+v", out))
}),
body: nil,
},
want: want{
code: http.StatusOK,
},
},
{
name: "customize resource extract",
args: args{
opts: []Option{
WithResourceExtractor(func(ctx context.Context, req interface{}) string {
tr, ok := transport.FromClientContext(ctx)
if ok {
http_tr := tr.(khttp.Transporter)
return http_tr.Request().URL.Path
} else {
return ""
}
}),
},
method: http.MethodGet,
path: "/api/{uid}",
reqPath: "/api/123",
handler: khttp.HandlerFunc(func(ctx khttp.Context) error {
khttp.SetOperation(ctx, "/test/api")
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return "api", nil
})
out, err := h(ctx, nil)
if err != nil {
return err
}
return ctx.Result(200, fmt.Sprintf("%+v", out))
}),
body: nil,
},
want: want{
code: http.StatusTooManyRequests,
},
},
{
name: "customize block fallback",
args: args{
opts: []Option{
WithBlockFallback(func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, errors.New(http.StatusBadRequest, "Customized Error", "Blocked by Sentinel")
}),
},
method: http.MethodGet,
path: "/client",
reqPath: "/client",
handler: khttp.HandlerFunc(func(ctx khttp.Context) error {
khttp.SetOperation(ctx, "/test/client")
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return "client", nil
})
out, err := h(ctx, nil)
if err != nil {
return err
}
return ctx.Result(200, fmt.Sprintf("%+v", out))
}),
body: nil,
},
want: want{
code: http.StatusBadRequest,
},
},
}
)
initClientSentinel(t)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
port, _ := getAvailablePort(8000)
// create server
var srvOpts = []khttp.ServerOption{
khttp.Address(fmt.Sprintf(":%d", port)),
}
httpSrv := khttp.NewServer(srvOpts...)
router := httpSrv.Route("/")
router.GET(tt.args.path, tt.args.handler)
go httpSrv.Start(context.Background())
defer httpSrv.Stop(context.Background())
// create client
var connOpts = []khttp.ClientOption{
khttp.WithMiddleware(
SentinelClientMiddleware(tt.args.opts...),
),
khttp.WithEndpoint(fmt.Sprintf("127.0.0.1:%d", port)),
}
conn, _ := khttp.NewClient(context.Background(), connOpts...)
defer conn.Close()
// invoke request
var out interface{}
callOpts := []khttp.CallOption{
khttp.Operation(tt.args.path),
khttp.PathTemplate(tt.args.path),
}
err := conn.Invoke(context.Background(), tt.args.method, tt.args.reqPath, nil, &out, callOpts...)
assert.Equal(t, tt.want.code, errors.Code(err))
})
}
}

func getAvailablePort(init int) (int, error) {
for p := init; p < 65536; p++ {
conn, _ := net.DialTimeout("tcp", net.JoinHostPort("", fmt.Sprint(p)), time.Second)
if conn != nil {
conn.Close()
} else {
return p, nil
}
}
return 0, fmt.Errorf("Cannot get an available port")
}
55 changes: 55 additions & 0 deletions pkg/adapters/kratos/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
This package provides Sentinel integration for Kratos.

Kratos provides unified abstraction for grpc and http middlewares.
Here we take the server side as examples.

For kratos/transport/http, user can call `http.Middleware`, for example,

import (
"github.com/go-kratos/kratos/v2/transport/http"
skratos "github.com/sentinel-go/pkg/adapters/kratos"
)
var opts = []http.ServerOption{
http.Middleware(
skratos.SentinelMiddleware(),
),
}
server := http.NewServer(opts...)

For kratos/transport/grpc, user can call `grpc.Middleware`, for example,

import (
"github.com/go-kratos/kratos/v2/transport/grpc"
skratos "github.com/sentinel-go/pkg/adapters/kratos"
)
var opts = []grpc.ServerOption{
grpc.Middleware(
skratos.SentinelMiddleware(),
),
}
server := grpc.NewServer(opts...)

User can also use sentinel grpc interceptors, for example,

import (
"github.com/go-kratos/kratos/v2/transport/grpc"
sgrpc "github.com/sentinel-go/pkg/adapters/grpc"
)
var opts = []grpc.ServerOption{
grpc.UnaryInterceptor(
sgrpc.NewUnaryServerInterceptor(),
),
}
server := grpc.NewServer(opts...)

The plugin extracts `Request().Method:PathTemplate()` (for http) or `Operation()`
(for grpc) as the resource name by default. Users may provide customized
resource name extractor when creating new Sentinel middlewares (via options).

Fallback logic: the plugin will return the BlockError by default
if current request is blocked by Sentinel rules. Users may also
provide customized fallback logic via WithXxxBlockFallback(handler) options.

*/
package kratos
16 changes: 16 additions & 0 deletions pkg/adapters/kratos/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module github.com/alibaba/sentinel-golang/pkg/adapters/kratos

go 1.13

require (
github.com/alibaba/sentinel-golang v1.0.2
github.com/go-kratos/kratos/v2 v2.1.5
github.com/kr/pretty v0.2.0 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20210913180222-943fd674d43e // indirect
golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
Loading